/* * The QUAFFLER library * Copyright (c) 2006 Clifford Wolf * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included * in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ #include #include #include #include #include "quaffler.h" #include "utils.h" #define MAX_PENDING_REQ 4 struct ntdata { // the first three fields are identical to nndata char *options; int handshake_done; struct qfl_join *join; // already reported blocks unsigned char *reported_blocks; }; struct nndata { // the first three fields are identical to ntdata char *options; int handshake_done; struct qfl_join *join; struct qfl_conn *tracker; // additional info int expensive; // peer status unsigned char peer_flags; unsigned char *peer_blocks; // current request char req_type[MAX_PENDING_REQ]; // '\000', 'i', 'a', 'b' or 'c' int req_block_num[MAX_PENDING_REQ]; int req_chunk_num[MAX_PENDING_REQ]; // accounting long long acc_up; long long acc_down; }; #define NTDATA(conn) ((struct ntdata*)((conn)->appdata)) #define NNDATA(conn) ((struct nndata*)((conn)->appdata)) static void send_status(struct qfl_conn *conn) { qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_status(NNDATA(conn)->join->flags)); } static void send_block_status(struct qfl_conn *conn) { struct qfl_join *join = NNDATA(conn)->join; unsigned char *block_bitmap = malloc(join->size_block_bitmap); qfl_os_read(&join->qmdf, join->offset_block_bitmap_complete, block_bitmap, join->size_block_bitmap); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_block_status(block_bitmap, join->size_block_bitmap)); } static int req_free(struct qfl_conn *conn) { int i, result = 0; for (i=0; ireq_type[i]) result++; return result; } static int req_check(struct qfl_join *join, char req_type, int block_num, int chunk_num) { struct qfl_join_conn *jc; int i; for (jc = join->node_list; jc; jc = jc->next) { if (!jc->conn) continue; for (i=0; iconn)->req_type[i] == req_type && NNDATA(jc->conn)->req_block_num[i] == block_num && NNDATA(jc->conn)->req_chunk_num[i] == chunk_num) return 1; } return 0; } static void req_set(struct qfl_conn *conn, char req_type, int block_num, int chunk_num) { int i; for (i=0; i<=MAX_PENDING_REQ; i++) { assert(i != MAX_PENDING_REQ); if (!NNDATA(conn)->req_type[i]) { NNDATA(conn)->req_type[i] = req_type; NNDATA(conn)->req_block_num[i] = block_num; NNDATA(conn)->req_chunk_num[i] = chunk_num; return; } } } static int req_flush(struct qfl_conn *conn, char req_type, int block_num, int chunk_num) { int i; for (i=0; ireq_type[i] == req_type && NNDATA(conn)->req_block_num[i] == block_num && NNDATA(conn)->req_chunk_num[i] == chunk_num) { NNDATA(conn)->req_type[i] = 0; NNDATA(conn)->req_block_num[i] = 0; NNDATA(conn)->req_chunk_num[i] = 0; return 1; } return 0; } static void sort_priority_map(struct qfl_join *join) { int compar(const void *a_p, const void *b_p) { const unsigned short *a = a_p; const unsigned short *b = b_p; int a_is_better = -1; int b_is_better = +1; // same block status, use random numbers if (join->block_status[a[0]] == join->block_status[b[0]]) { if (a[1] > b[1]) return a_is_better; if (a[1] < b[1]) return b_is_better; return 0; } // finished blocks are not very interesing if (join->block_status[a[0]] == join->chunks_per_block) return b_is_better; if (join->block_status[b[0]] == join->chunks_per_block) return a_is_better; // prefer blocks which are more complete if (join->block_status[a[0]] > join->block_status[b[0]]) return a_is_better; if (join->block_status[a[0]] < join->block_status[b[0]]) return b_is_better; // never reached return 0; } qsort(join->block_priority_map, join->total_blocks, sizeof(unsigned short)*2, compar); } static void conn_node_action(struct qfl_conn *conn) { struct qfl_join *join = NNDATA(conn)->join; unsigned int i; // are we ready for actions? if (!NNDATA(conn)->handshake_done) return; // got already the whole file? if ((join->flags & 1) != 0) return; // do we have free request slots? for (i=0; i<=MAX_PENDING_REQ; i++) { if (i == MAX_PENDING_REQ) return; if (!NNDATA(conn)->req_type[i]) break; } // no metadata array? if ((join->flags & 2) == 0) { // Does this peer have it? if ((NNDATA(conn)->peer_flags & 2) == 0) return; // Already downloading from another peer? if (req_check(join, 'i', 0, 0)) return; req_set(conn, 'i', 0, 0); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_request_index()); return; } // no attribute block? if ((join->flags & 4) == 0) { // Does this peer have it? if ((NNDATA(conn)->peer_flags & 4) == 0) return; // Already downloading from another peer? if (req_check(join, 'a', 0, 0)) return; req_set(conn, 'a', 0, 0); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_request_attr()); return; } // download blocks.. int freecount = req_free(conn); for (i=0; freecount > 0 && i < join->total_blocks; i++) { int block = join->block_priority_map[i*2]; unsigned int chunk; // Do we have that block already? if (join->block_status[block] == join->chunks_per_block) continue; // Does the peer have the block? if (!test_bit(NNDATA(conn)->peer_blocks, block)) continue; // Is this an expensive connection if (NNDATA(conn)->expensive) { int found_alternative = 0; struct qfl_join_conn *jc; for (jc = join->node_list; jc; jc = jc->next) { if (!jc->conn) continue; if (jc->expensive) continue; if (test_bit(NNDATA(jc->conn)->peer_blocks, block)) { found_alternative = 1; break; } } if (found_alternative) continue; } // Do we have the block metadata? if (!test_bit_file(&join->qmdf, join->offset_block_bitmap_progres, block)) { if (!req_check(join, 'b', block, 0)) { req_set(conn, 'b', block, 0); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_request_block(block)); freecount--; } continue; } // Request chunks unsigned char bitmap[join->size_chunk_bitmap]; qfl_os_read(&join->qmdf, join->offset_block_info + block*join->size_block_info, bitmap, join->size_chunk_bitmap); for (chunk=0; freecount > 0 && chunk < join->chunks_per_block; chunk++) { if (!test_bit(bitmap, chunk) && !req_check(join, 'c', block, chunk)) { req_set(conn, 'c', block, chunk); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_request_chunk(block, chunk)); freecount--; } } } } static void conn_node_input(struct qfl_conn *conn) { struct qfl_pkg *pkg = qfl_pkgfifo_shift(conn->fifo[QFL_FIFO_IN]); // printf("From %s:\n", conn->peer); // qfl_pkg_dump(pkg); if (!NNDATA(conn)->handshake_done && pkg->opcode != QFL_PKG_INITIATOR && pkg->opcode != QFL_PKG_RESPONDER) { conn->conndrop_flags |= 4; goto protocol_error; } switch (pkg->opcode) { case QFL_PKG_INITIATOR: { struct qfl_pkg_initiator *p = &pkg->payload.initiator; if (conn->initiator || NNDATA(conn)->handshake_done) { conn->conndrop_flags |= 4; goto protocol_error; } NNDATA(conn)->options = strdup(p->options); // check if the mode is correct if (strcmp(p->mode, "node-node")) { conn->conndrop_flags |= 4; goto protocol_error; } // find the join { struct qfl_conn *c; for (c = conn->ctx->conn_list; c; c = c->next) { if (strcmp(c->mode, "node-tracker")) continue; if (strcmp(NTDATA(c)->join->normalized_quafflerid, p->quafflerid)) continue; NNDATA(conn)->join = NTDATA(c)->join; } } if (!NNDATA(conn)->join) { conn->conndrop_flags |= 4; goto protocol_error; } // Find this connection in join conn list struct qfl_join_conn *jc = NNDATA(conn)->join->node_list; while (jc) { if (!jc->conn && !strcmp(jc->connid, p->connid)) break; jc = jc->next; } if (!jc) { conn->conndrop_flags |= 4; goto protocol_error; } NNDATA(conn)->expensive = jc->expensive; jc->conn = conn; // allocate peer block bitmap NNDATA(conn)->peer_blocks = calloc(1, NNDATA(conn)->join->size_block_bitmap); // send a responder package back qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_responder("")); // send current status send_status(conn); send_block_status(conn); // mark the handshake as finished NNDATA(conn)->handshake_done = 1; break; } case QFL_PKG_RESPONDER: { if (!conn->initiator || NNDATA(conn)->handshake_done) { conn->conndrop_flags |= 4; goto protocol_error; } // send current status send_status(conn); send_block_status(conn); struct qfl_pkg_responder *p = &pkg->payload.responder; NNDATA(conn)->options = strdup(p->options); NNDATA(conn)->handshake_done = 1; break; } case QFL_PKG_ERROR: { // struct qfl_pkg_error *p = &pkg->payload.error; conn->conndrop_flags |= 4; conn->closeit = 1; break; } case QFL_PKG_MESSAGE: { // struct qfl_pkg_FOOBAR *p = &pkg->payload.FOOBAR; break; } case QFL_PKG_STATUS: { struct qfl_pkg_status *p = &pkg->payload.status; NNDATA(conn)->peer_flags = p->status_bitmap; break; } case QFL_PKG_BLOCK_STATUS: { struct qfl_pkg_block_status *p = &pkg->payload.block_status; if (p->block_bitmap_len > NNDATA(conn)->join->size_block_bitmap) { conn->conndrop_flags |= 1; goto protocol_error; } memcpy(NNDATA(conn)->peer_blocks, p->block_bitmap, p->block_bitmap_len); break; } case QFL_PKG_BLOCK_GOT: { struct qfl_pkg_block_got *p = &pkg->payload.block_got; if (p->block_number >= NNDATA(conn)->join->total_blocks) { conn->conndrop_flags |= 1; goto protocol_error; } set_bit(NNDATA(conn)->peer_blocks, p->block_number); break; } case QFL_PKG_REQUEST_CHUNK: { struct qfl_pkg_request_chunk *p = &pkg->payload.request_chunk; struct qfl_join *join = NNDATA(conn)->join; if (p->block_number >= join->total_blocks) { conn->conndrop_flags |= 1; goto protocol_error; } if (p->chunk_number >= join->chunks_per_block) { conn->conndrop_flags |= 1; goto protocol_error; } if (join->block_status[p->block_number] != join->chunks_per_block) { conn->conndrop_flags |= 1; goto protocol_error; } int data_len = 0; unsigned char *data = 0; if (!join->ctx->skip_io) { data_len = join->chunk_size; data = malloc(data_len); qfl_os_read(&join->data, join->block_size * p->block_number + join->chunk_size * p->chunk_number, data, data_len); while (data_len > 0 && data[data_len-1] == 0) data_len--; } qfl_pkgfifo_push(conn->fifo[QFL_FIFO_DATA], qfl_mkpkg_send_chunk(p->block_number, p->chunk_number, data, data_len)); NNDATA(conn)->acc_up += data_len; break; } case QFL_PKG_REQUEST_BLOCK: { struct qfl_pkg_request_block *p = &pkg->payload.request_block; struct qfl_join *join = NNDATA(conn)->join; if (p->block_number >= join->total_blocks) { conn->conndrop_flags |= 1; goto protocol_error; } if (join->block_status[p->block_number] != join->chunks_per_block) { conn->conndrop_flags |= 1; goto protocol_error; } int data_len = join->size_block_info - join->size_chunk_bitmap; unsigned char *data = malloc(data_len); qfl_os_read(&join->qmdf, join->offset_block_info + p->block_number * join->size_block_info + join->size_chunk_bitmap, data, data_len); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_DATA], qfl_mkpkg_send_block(p->block_number, data, data_len)); NNDATA(conn)->acc_up += data_len; break; } case QFL_PKG_REQUEST_ATTR: { struct qfl_join *join = NNDATA(conn)->join; if ((join->flags & 4) == 0) { conn->conndrop_flags |= 1; goto protocol_error; } unsigned short data_len; qfl_os_read(&join->qmdf, join->offset_attributes, &data_len, 2); data_len = ntohs(data_len); unsigned char *data = malloc(data_len); qfl_os_read(&join->qmdf, join->offset_attributes+2, data, data_len); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_DATA], qfl_mkpkg_send_attr(data, data_len)); NNDATA(conn)->acc_up += data_len; break; } case QFL_PKG_REQUEST_INDEX: { struct qfl_join *join = NNDATA(conn)->join; if ((join->flags & 2) == 0) { conn->conndrop_flags |= 1; goto protocol_error; } int data_len = (join->total_blocks+1)*20; unsigned char *data = malloc(data_len); qfl_os_read(&join->qmdf, join->offset_index_metadata_array, data, data_len); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_DATA], qfl_mkpkg_send_index(data, data_len)); NNDATA(conn)->acc_up += data_len; break; } case QFL_PKG_SEND_CHUNK: { struct qfl_pkg_send_chunk *p = &pkg->payload.send_chunk; struct qfl_join *join = NNDATA(conn)->join; // Is this what we expected? if (!req_flush(conn, 'c', p->block_number, p->chunk_number)) { conn->conndrop_flags |= 1; goto protocol_error; } if (p->data_len > join->chunk_size) { conn->conndrop_flags |= 1; goto protocol_error; } NNDATA(conn)->acc_down += p->data_len; if (p->data_len < join->chunk_size) { int i; p->data = realloc(p->data, join->chunk_size); for (i = p->data_len; ichunk_size; i++) p->data[i] = 0; p->data_len = join->chunk_size; } // Check sha1 if (!conn->ctx->skip_sha1_checks) { unsigned char sha1[20], qmdf_sha1[20]; qfl_sha1(p->data, p->data_len, sha1); qfl_os_read(&join->qmdf, join->offset_block_info + p->block_number*join->size_block_info + join->size_chunk_bitmap + p->chunk_number*20, qmdf_sha1, 20); if (memcmp(sha1, qmdf_sha1, 20)) { conn->conndrop_flags |= 1; goto protocol_error; } } // Calculate offset in data file unsigned long long offset = join->block_size * p->block_number + join->chunk_size * p->chunk_number; // Calculate final data_len (no padding) int data_len = p->data_len; if (offset + data_len > join->file_size) data_len = join->file_size - offset; // Save to data file (unless this block is pure padding) if (data_len > 0 && !join->ctx->skip_io) qfl_os_write(&join->data, offset, p->data, data_len); // Set bit in chunk bitmap set_bit_file(&join->qmdf, join->offset_block_info + p->block_number*join->size_block_info, p->chunk_number); // update block_status and block_priority_map join->block_status[p->block_number]++; sort_priority_map(join); // Is this block finished? if (join->block_status[p->block_number] == join->chunks_per_block) { unsigned int i; int file_complete = 1; // mangle bitmaps clear_bit_file(&join->qmdf, join->offset_block_bitmap_progres, p->block_number); set_bit_file(&join->qmdf, join->offset_block_bitmap_complete, p->block_number); // is the file complete? for (i=0; itotal_blocks; i++) if (join->block_status[i] != join->chunks_per_block) { file_complete = 0; break; } // if so, update flags and other stuff if (file_complete) { join->flags |= 1; qfl_os_write(&join->qmdf, 317, &join->flags, 1); // generate .qpart filename int qpart_filename_len = strlen(join->filename) + 7; char qpart_filename[qpart_filename_len]; snprintf(qpart_filename, qpart_filename_len, "%s.qpart", join->filename); // rename data file qfl_os_close(&join->data); qfl_os_rename(qpart_filename, join->filename); qfl_os_open(&join->data, join->filename); // and let the user know fprintf(stderr, "Finished downloading %s.\n", join->filename); } // let it know everyone else.. struct qfl_join_conn *jc; for (jc = join->node_list; jc; jc = jc->next) { if (!jc->conn) continue; if (file_complete) send_status(jc->conn); qfl_pkgfifo_push(jc->conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_block_got(p->block_number)); } } break; } case QFL_PKG_SEND_BLOCK: { struct qfl_pkg_send_block *p = &pkg->payload.send_block; struct qfl_join *join = NNDATA(conn)->join; // Is this what we expected? if (!req_flush(conn, 'b', p->block_number, 0)) { conn->conndrop_flags |= 1; goto protocol_error; } unsigned int data_len = join->size_block_info - join->size_chunk_bitmap; if (p->data_len != data_len) { conn->conndrop_flags |= 1; goto protocol_error; } NNDATA(conn)->acc_down += p->data_len; // Check sha1 if (!conn->ctx->skip_sha1_checks) { unsigned char sha1[20], qmdf_sha1[20]; qfl_sha1(p->data, p->data_len, sha1); qfl_os_read(&join->qmdf, join->offset_index_metadata_array + p->block_number*20, qmdf_sha1, 20); if (memcmp(sha1, qmdf_sha1, 20)) { conn->conndrop_flags |= 1; goto protocol_error; } } // Save to .qmdf file qfl_os_write(&join->qmdf, join->offset_block_info + p->block_number * join->size_block_info + join->size_chunk_bitmap, p->data, p->data_len); // Set the in-progres bit set_bit_file(&join->qmdf, join->offset_block_bitmap_progres, p->block_number); break; } case QFL_PKG_SEND_ATTR: { struct qfl_pkg_send_attr *p = &pkg->payload.send_attr; struct qfl_join *join = NNDATA(conn)->join; // Is this what we expected? if (!req_flush(conn, 'a', 0, 0)) { conn->conndrop_flags |= 1; goto protocol_error; } NNDATA(conn)->acc_down += p->data_len; // Check sha1 if (!conn->ctx->skip_sha1_checks) { unsigned char sha1[20], qmdf_sha1[20]; qfl_sha1(p->data, p->data_len, sha1); qfl_os_read(&join->qmdf, join->offset_index_metadata_array + join->total_blocks*20, qmdf_sha1, 20); if (memcmp(sha1, qmdf_sha1, 20)) { conn->conndrop_flags |= 1; goto protocol_error; } } // Save to .qmdf file unsigned short attr_len = htons(p->data_len); qfl_os_write(&join->qmdf, join->offset_attributes, &attr_len, 2); qfl_os_write(&join->qmdf, join->offset_attributes+2, p->data, p->data_len); // We have the attributes! let it know everyone.. { join->flags |= 4; qfl_os_write(&join->qmdf, 317, &join->flags, 1); struct qfl_join_conn *jc; for (jc = join->node_list; jc; jc = jc->next) if (jc->conn) send_status(jc->conn); } break; } case QFL_PKG_SEND_INDEX: { struct qfl_pkg_send_index *p = &pkg->payload.send_index; struct qfl_join *join = NNDATA(conn)->join; // Is this what we expected? if (!req_flush(conn, 'i', 0, 0)) { conn->conndrop_flags |= 1; goto protocol_error; } if (p->data_len != (join->total_blocks+1)*20) { conn->conndrop_flags |= 1; goto protocol_error; } NNDATA(conn)->acc_down += p->data_len; // Check sha1 if (!conn->ctx->skip_sha1_checks) { unsigned char sha1[20]; qfl_sha1(p->data, p->data_len, sha1); if (memcmp(sha1, join->index_sha1, 20)) { conn->conndrop_flags |= 1; goto protocol_error; } } // Save to .qmdf file qfl_os_write(&join->qmdf, join->offset_index_metadata_array, p->data, p->data_len); // We have the index! let it know everyone.. { join->flags |= 2; qfl_os_write(&join->qmdf, 317, &join->flags, 1); struct qfl_join_conn *jc; for (jc = join->node_list; jc; jc = jc->next) if (jc->conn) send_status(jc->conn); } break; } default: conn->conndrop_flags |= 4; goto protocol_error; } qfl_pkg_destroy(pkg); return; protocol_error: fprintf(stderr, "==> N-N Protocol error\n"); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_error(0, "Protocol error")); qfl_pkg_destroy(pkg); conn->closeit = 1; } static void conn_node_close(struct qfl_conn *conn) { if (NNDATA(conn)->join) { struct qfl_join *join = NNDATA(conn)->join; struct qfl_join_conn *jc = join->node_list; struct qfl_join_conn **jcp = &join->node_list; while (jc) { struct qfl_join_conn *next = jc->next; if (jc->conn == conn) { if (jc->tracker) qfl_pkgfifo_push(jc->tracker->fifo[QFL_FIFO_STATUS], qfl_mkpkg_conndrop(conn->conndrop_flags, jc->connid)); *jcp = jc->next; free(jc->connid); free(jc); } else jcp = &jc->next; jc = next; } } free(NNDATA(conn)->options); free(NNDATA(conn)->peer_blocks); free(conn->appdata); } static void conn_open(struct qfl_conn *conn) { conn->process_packages = conn_node_input; conn->process_action = conn_node_action; conn->process_close = conn_node_close; conn->appdata = calloc(1, sizeof(struct nndata)); conn->mode = strdup("node-node"); } static void conn_tracker_input(struct qfl_conn *conn) { struct qfl_pkg *pkg = qfl_pkgfifo_shift(conn->fifo[QFL_FIFO_IN]); // printf("From %s:\n", conn->peer); // qfl_pkg_dump(pkg); if (!NTDATA(conn)->handshake_done && pkg->opcode != QFL_PKG_RESPONDER && pkg->opcode != QFL_PKG_PING) goto protocol_error; switch (pkg->opcode) { case QFL_PKG_RESPONDER: { if (NTDATA(conn)->handshake_done) goto protocol_error; struct qfl_pkg_responder *p = &pkg->payload.responder; NTDATA(conn)->options = strdup(p->options); NTDATA(conn)->handshake_done = 1; break; } case QFL_PKG_PING: { if (NTDATA(conn)->handshake_done) goto protocol_error; qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_pong()); conn->closeit = 1; break; } case QFL_PKG_ERROR: { // struct qfl_pkg_error *p = &pkg->payload.error; conn->closeit = 1; break; } case QFL_PKG_MESSAGE: { // struct qfl_pkg_FOOBAR *p = &pkg->payload.FOOBAR; break; } case QFL_PKG_REQSTATUS: { struct qfl_join *join = NTDATA(conn)->join; struct qfl_join_conn *jc = join->node_list; qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_status(NTDATA(conn)->join->flags)); if (NTDATA(conn)->join->flags & 1) { free(NTDATA(conn)->reported_blocks); NTDATA(conn)->reported_blocks = 0; } else { if (!NTDATA(conn)->reported_blocks) { seams_better_to_resend_entire_bitmap:; unsigned char *block_bitmap = malloc(join->size_block_bitmap); qfl_os_read(&join->qmdf, join->offset_block_bitmap_complete, block_bitmap, join->size_block_bitmap); NTDATA(conn)->reported_blocks = malloc(join->size_block_bitmap); memcpy(NTDATA(conn)->reported_blocks, block_bitmap, join->size_block_bitmap); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_block_status(block_bitmap, join->size_block_bitmap)); } else { int i, j = 0; for (i=0; i < join->total_blocks; i++) if (!test_bit(NTDATA(conn)->reported_blocks, i) && join->block_status[i] == join->chunks_per_block) j++; if (j*4 > join->size_block_bitmap) { free(NTDATA(conn)->reported_blocks); NTDATA(conn)->reported_blocks = 0; goto seams_better_to_resend_entire_bitmap; } for (i=0; i < join->total_blocks; i++) if (!test_bit(NTDATA(conn)->reported_blocks, i) && join->block_status[i] == join->chunks_per_block) { qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_block_got(i)); set_bit(NTDATA(conn)->reported_blocks, i); } } } while (jc) { if (jc->conn && NNDATA(jc->conn)->peer_blocks && jc->tracker == conn) { int i, peerblocks_counter = 0; for (i=0; i < join->total_blocks; i++) if (test_bit(NNDATA(jc->conn)->peer_blocks, i)) peerblocks_counter++; qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_connstatus( jc->connid, NNDATA(jc->conn)->acc_up, NNDATA(jc->conn)->acc_down, peerblocks_counter )); NNDATA(jc->conn)->acc_up = 0; NNDATA(jc->conn)->acc_down = 0; } jc = jc->next; } qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_reqstatus()); break; } case QFL_PKG_CONNECT: { struct qfl_pkg_connect *p = &pkg->payload.connect; struct qfl_join *join = NTDATA(conn)->join; struct qfl_join_conn *jc; struct qfl_conn *peer; switch (p->flags & 0005) { case 0000: jc = calloc(1, sizeof(struct qfl_join_conn)); jc->next = join->node_list; join->node_list = jc; jc->tracker = conn; jc->connid = strdup(p->connid); jc->expensive = (p->flags & 0002) != 0; qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_responderok( p->connid )); break; case 0001: peer = qfl_conn_create(conn->ctx, p->responder); if (!peer) { qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_conndrop( 2, p->connid )); break; } peer->process_packages = conn_node_input; peer->process_action = conn_node_action; peer->process_close = conn_node_close; peer->appdata = calloc(1, sizeof(struct nndata)); peer->mode = strdup("node-node"); NNDATA(peer)->join = join; NNDATA(peer)->peer_blocks = calloc(1, NTDATA(conn)->join->size_block_bitmap); jc = calloc(1, sizeof(struct qfl_join_conn)); jc->next = join->node_list; join->node_list = jc; jc->tracker = conn; jc->connid = strdup(p->connid); jc->expensive = (p->flags & 0002) != 0; jc->conn = peer; NNDATA(peer)->expensive = jc->expensive; qfl_pkgfifo_push(peer->fifo[QFL_FIFO_STATUS], qfl_mkpkg_initiator( "QUAFFLER V1", p->connid, "node-node", "", NTDATA(conn)->join->normalized_quafflerid, 0, 0, 0, 0 )); break; case 0005: assert(!"Not implemented"); break; default: assert(0); } break; } case QFL_PKG_DISCONNECT: { struct qfl_pkg_disconnect *p = &pkg->payload.disconnect; struct qfl_join *join = NTDATA(conn)->join; struct qfl_join_conn *jc = join->node_list; struct qfl_join_conn **jcp = &join->node_list; while (jc) { if (jc->connid && !strcmp(jc->connid, p->connid)) { if (!jc->conn) { *jcp = jc->next; free(jc->connid); free(jc); } else jc->conn->closeit = 1; break; } jcp = &jc->next; jc = jc->next; } break; } case QFL_PKG_CONNFLAGS: { struct qfl_pkg_connflags *p = &pkg->payload.connflags; struct qfl_join *join = NTDATA(conn)->join; struct qfl_join_conn *jc = join->node_list; while (jc) { if (!strcmp(jc->connid, p->connid)) jc->expensive = (p->flags & 0002) != 0; jc = jc->next; } break; } default: goto protocol_error; } qfl_pkg_destroy(pkg); return; protocol_error: fprintf(stderr, "==> N-T Protocol error\n"); qfl_pkgfifo_push(conn->fifo[QFL_FIFO_STATUS], qfl_mkpkg_error(0, "Protocol error")); qfl_pkg_destroy(pkg); conn->closeit = 1; } static void close_join(struct qfl_join *join) { struct qfl_join_conn *list[2] = { join->tracker_list, join->node_list }; struct qfl_join_conn *jc, *next_jc; int i; for (i=0; i<2; i++) for (jc = list[i]; jc; jc = next_jc) { if (jc->conn) { NNDATA(jc->conn)->join = 0; jc->conn->closeit = 1; } next_jc = jc->next; free(jc->connid); free(jc); } join->tracker_list = 0; join->node_list = 0; } static void conn_tracker_close(struct qfl_conn *conn) { // remove from tracker list and close join if this is the last tracker if (NTDATA(conn)->join) { struct qfl_join *join = NTDATA(conn)->join; struct qfl_join_conn *jc = join->tracker_list; struct qfl_join_conn **jcp = &join->tracker_list; while (jc) { struct qfl_join_conn *next = jc->next; if (jc->conn == conn) { *jcp = jc->next; free(jc->connid); free(jc); } else jcp = &jc->next; jc = next; } for (jc=join->node_list; jc; jc=jc->next) if (jc->tracker == conn) { jc->tracker = 0; jc->conn->closeit = 1; } if (!join->tracker_list) close_join(join); } // free appdata free(NTDATA(conn)->options); free(NTDATA(conn)); conn->appdata = 0; } struct qfl_join *qfl_join(struct qfl_ctx *ctx, const char *connid, const char *tracker_peer, const char *quafflerid, const char *filename) { struct qfl_conn *tracker = qfl_conn_create(ctx, tracker_peer); if (!tracker) { fprintf(stderr, "Can not connect to tracker at `%s'!\n", tracker_peer); return QFL_JOIN_ERROR_CONNECT; } tracker->process_packages = conn_tracker_input; tracker->process_close = conn_tracker_close; tracker->appdata = calloc(1, sizeof(struct ntdata)); tracker->mode = strdup("node-tracker"); struct qfl_join *join = calloc(1, sizeof(struct qfl_join)); NTDATA(tracker)->join = join; join->ctx = ctx; struct qfl_join_conn *jc = calloc(1, sizeof(struct qfl_join_conn)); jc->next = join->tracker_list; join->tracker_list = jc; jc->connid = strdup(connid); jc->conn = tracker; join->quafflerid = strdup(quafflerid); join->filename = strdup(filename); { char *p = strdup(quafflerid); char *sp = p; int i; char *sha1_txt = strsep(&sp, ":"); char *file_size_txt = strsep(&sp, ":"); char *frag_scheme_txt = strsep(&sp, ":"); assert(strlen(sha1_txt) == 40); for (i=0; i<20; i++) { char buffer[3] = { sha1_txt[i*2], sha1_txt[i*2+1], 0 }; join->index_sha1[i] = strtol(buffer, 0, 16); } join->file_size = atoll(file_size_txt); join->frag_scheme = *frag_scheme_txt; char frag_scheme = 'A'; int block_size = 1*1024*1024; int chunk_size = 64*1024; unsigned long long file_size = join->file_size; if (file_size > 64*1024*1024*1024ll) { frag_scheme = 'B'; block_size = 2*1024*1024; } if (file_size > 128*1024*1024*1024ll) { frag_scheme = 'C'; block_size = 4*1024*1024; } if (file_size > 256*1024*1024*1024ll) { frag_scheme = 'D'; block_size = 8*1024*1024; } if (file_size > 512*1024*1024*1024ll) { frag_scheme = 'E'; block_size = 16*1024*1024; } assert(frag_scheme == join->frag_scheme); join->block_size = block_size; join->chunk_size = chunk_size; int total_blocks = file_size / block_size; if (file_size % block_size) total_blocks++; join->total_blocks = total_blocks; join->chunks_per_block = block_size / chunk_size; join->normalized_quafflerid = malloc(100); snprintf(join->normalized_quafflerid, 100, "%s:%Ld:%c", sha1_txt, (unsigned long long)block_size*total_blocks, frag_scheme); free(p); } int qmdf_filename_len = strlen(filename) + 6; char qmdf_filename[qmdf_filename_len]; snprintf(qmdf_filename, qmdf_filename_len, "%s.qmdf", filename); if (!qfl_os_test(qmdf_filename)) { if (qfl_os_test(filename)) goto data_file_already_there; qfl_os_open(&join->qmdf, qmdf_filename); qfl_os_write(&join->qmdf, 0, "-- QUAFFLER METADATA FILE V1 --\n", 32); qfl_os_write(&join->qmdf, 32, join->index_sha1, 20); unsigned long long llbuf = htonll(join->file_size); qfl_os_write(&join->qmdf, 52, &llbuf, 8); qfl_os_write(&join->qmdf, 60, &join->frag_scheme, 1); } else { char buffer[32]; qfl_os_open(&join->qmdf, qmdf_filename); qfl_os_read(&join->qmdf, 0, buffer, 32); if (memcmp(buffer, "-- QUAFFLER METADATA FILE V1 --\n", 32)) goto found_error_in_qmdf; qfl_os_read(&join->qmdf, 32, buffer, 20); if (memcmp(buffer, join->index_sha1, 20)) goto found_error_in_qmdf; unsigned long long llbuf; qfl_os_read(&join->qmdf, 52, &llbuf, 8); if (llbuf != (unsigned long long)htonll(join->file_size)) goto found_error_in_qmdf; qfl_os_read(&join->qmdf, 60, buffer, 1); if (buffer[0] != join->frag_scheme) goto found_error_in_qmdf; if (0) { found_error_in_qmdf: fprintf(stderr, "Existing .qmdf file does not match!\n"); return QFL_JOIN_ERROR_CONNECT; } } // initialize sizes join->size_block_bitmap = join->total_blocks / 8; join->size_chunk_bitmap = join->chunks_per_block / 8; if (join->total_blocks % 8) join->size_block_bitmap++; join->size_block_info = join->size_chunk_bitmap + join->chunks_per_block*20; // initialize offsets join->offset_block_bitmap_complete = 318; join->offset_block_bitmap_progres = join->offset_block_bitmap_complete + join->size_block_bitmap; join->offset_index_metadata_array = join->offset_block_bitmap_progres + join->size_block_bitmap; join->offset_block_info = join->offset_index_metadata_array + (join->total_blocks+1)*20; join->offset_attributes = join->offset_block_info + join->total_blocks*join->size_block_info; // read flags qfl_os_read(&join->qmdf, 317, &join->flags, 1); // fill join->block_status and join->block_priority_map { unsigned char block_bitmap_complete[join->size_block_bitmap]; unsigned char block_bitmap_progres[join->size_block_bitmap]; unsigned int i, j; qfl_os_read(&join->qmdf, join->offset_block_bitmap_complete, block_bitmap_complete, join->size_block_bitmap); qfl_os_read(&join->qmdf, join->offset_block_bitmap_progres, block_bitmap_progres, join->size_block_bitmap); join->block_status = calloc(join->total_blocks, sizeof(unsigned short)); join->block_priority_map = calloc(join->total_blocks*2, sizeof(unsigned short)); for (i=0; itotal_blocks; i++) { if (test_bit(block_bitmap_complete, i)) join->block_status[i] = join->chunks_per_block; else if (test_bit(block_bitmap_progres, i)) { unsigned char chunk_bitmap[join->size_chunk_bitmap]; qfl_os_read(&join->qmdf, join->offset_block_info + join->size_block_info*i, chunk_bitmap, join->size_chunk_bitmap); for (j=0; jchunks_per_block; j++) if (test_bit(chunk_bitmap, j)) join->block_status[i]++; } join->block_priority_map[i*2] = i; join->block_priority_map[i*2 + 1] = qfl_os_rand(); } sort_priority_map(join); } // Check if we don't overwrite anexisting data file.. if (qfl_os_test(filename) && (join->flags & 1) == 0) { data_file_already_there: fprintf(stderr, "Found %s but %s does not exist or marks file as unfinished.\n", filename, qmdf_filename); fprintf(stderr, "Maybe you forgot to run `quaffler -qmdf-gen ..' before joining?\n"); return QFL_JOIN_ERROR_CONNECT; } // Open data file (with or without .qpart extension) if ((join->flags & 1) == 0) { int qpart_filename_len = strlen(filename) + 7; char qpart_filename[qpart_filename_len]; snprintf(qpart_filename, qpart_filename_len, "%s.qpart", filename); qfl_os_open(&join->data, qpart_filename); } else qfl_os_open(&join->data, filename); char options[100]; snprintf(options, 100, "listenport=%d", ctx->listenport); qfl_pkgfifo_push(tracker->fifo[QFL_FIFO_STATUS], qfl_mkpkg_initiator( "QUAFFLER V1", connid, "node-tracker", options, join->normalized_quafflerid, 0, 0, 0, 0 // FIXME: RSA-Auth )); ctx->process_newconn = conn_open; return join; } struct qfl_join *qfl_join_by_url(struct qfl_ctx *ctx, const char *connid, const char *url) { char *method = "quaffler://"; int method_len = strlen(method); if (strncmp(url, method, method_len)) { fprintf(stderr, "Format error in quaffler URL: No such method!"); return QFL_JOIN_ERROR_URL; } url += method_len; int tracker_peer_len = strcspn(url, "/"); if (url[tracker_peer_len] != '/') { fprintf(stderr, "Format error in quaffler URL!"); return QFL_JOIN_ERROR_URL; } char tracker_peer[tracker_peer_len+5]; strcpy(tracker_peer, "TCP:"); memcpy(tracker_peer+4, url, tracker_peer_len); tracker_peer[tracker_peer_len+4] = 0; url += tracker_peer_len+1; int quafflerid_len = strcspn(url, "/"); if (url[quafflerid_len] != '/') { fprintf(stderr, "Format error in quaffler URL!"); return QFL_JOIN_ERROR_URL; } char quafflerid[quafflerid_len+1]; memcpy(quafflerid, url, quafflerid_len); quafflerid[quafflerid_len] = 0; url += quafflerid_len+1; if (*url == 0) { fprintf(stderr, "Format error in quaffler URL!"); return QFL_JOIN_ERROR_URL; } const char *filename = url; return qfl_join(ctx, connid, tracker_peer, quafflerid, filename); } void qfl_join_destroy(struct qfl_join *join) { close_join(join); qfl_os_close(&join->data); qfl_os_close(&join->qmdf); free(join->quafflerid); free(join->normalized_quafflerid); free(join->filename); free(join->block_status); free(join->block_priority_map); free(join); } void qfl_join_statusdump(struct qfl_join *join) { struct qfl_join_conn *jc; unsigned int i; // print header printf("\n--- %s (%s) ---\n", join->filename, join->quafflerid); // print block status for (i=0; i < join->total_blocks; i++) { if (i%70 == 0) printf("\n"); if (join->block_status[i] == join->chunks_per_block) printf("X"); else if (join->block_status[i] == 0) printf("."); else printf("%d", join->block_status[i]*10 / join->chunks_per_block); } printf("\n\n"); // print conn status for (jc = join->node_list; jc; jc = jc->next) { if (!jc->conn) continue; int peer_blocks = 0; for (i=0; i < join->total_blocks; i++) if (test_bit(NNDATA(jc->conn)->peer_blocks, i)) peer_blocks++; printf("%c %s [%03d%%]:", jc->expensive ? '*' : '-', jc->conn->peer, peer_blocks*100 / join->total_blocks); for (i=0; iconn)->req_type[i]) printf(" ----"); else printf(" %c[%d,%d]", NNDATA(jc->conn)->req_type[i], NNDATA(jc->conn)->req_block_num[i], NNDATA(jc->conn)->req_chunk_num[i]); } printf("\n"); } } void qfl_join_progresdump(struct qfl_join *join) { unsigned int total_chunks = 0; unsigned int i, progres50; for (i=0; i < join->total_blocks; i++) total_chunks += join->block_status[i]; progres50 = (total_chunks * 50) / (join->total_blocks * join->chunks_per_block); printf("["); for (i=0; ifilename); }