diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 16b066d..b9c0324 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -256,9 +256,6 @@ static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer, const uint64_t next_version) { writer->version_ctx.next = next_version; - writer->version_ctx.ring.start = writer->version_ctx.ring.end = - writer->version_ctx.ring.entries + next_version % - writer->version_ctx.ring.size; } static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb) @@ -295,83 +292,36 @@ static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb) return 0; } -static void repush_to_queue(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *wb) -{ - SFBinlogWriterBuffer *previous; - SFBinlogWriterBuffer *current; +#define GET_WBUFFER_VERSION_COUNT(wb) \ + (((wb)->version.last - (wb)->version.first) + 1) - PTHREAD_MUTEX_LOCK(&thread->queue.lc_pair.lock); - if (thread->queue.head == NULL) { - wb->next = NULL; - thread->queue.head = thread->queue.tail = wb; - } else if (wb->version.first <= ((SFBinlogWriterBuffer *) - thread->queue.head)->version.first) - { - wb->next = thread->queue.head; - thread->queue.head = wb; - } else if (wb->version.first > ((SFBinlogWriterBuffer *) - thread->queue.tail)->version.last) - { - wb->next = NULL; - ((SFBinlogWriterBuffer *)thread->queue.tail)->next = wb; - thread->queue.tail = wb; - } else { - previous = thread->queue.head; - current = ((SFBinlogWriterBuffer *)thread->queue.head)->next; - while (current != NULL && wb->version.first > current->version.last) { - previous = current; - current = current->next; - } - - wb->next = previous->next; - previous->next = wb; - } - PTHREAD_MUTEX_UNLOCK(&thread->queue.lc_pair.lock); -} - -#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count) \ +#define DEAL_CURRENT_VERSION_WBUFFER(writer, wb) \ do { \ if ((result=deal_binlog_one_record(wb)) != 0) { \ return result; \ } \ - fast_mblock_free_object(&writer->thread->mblock, wb); \ - writer->version_ctx.next += version_count; \ + writer->version_ctx.next += GET_WBUFFER_VERSION_COUNT(wb); \ + fast_mblock_free_object(&writer->thread->mblock, wb); \ } while (0) -#define GET_WBUFFER_VERSION_COUNT(wb) \ - (((wb)->version.last - (wb)->version.first) + 1) - static int deal_record_by_version(SFBinlogWriterBuffer *wb) { SFBinlogWriterInfo *writer; - SFBinlogWriterBuffer **current; - int64_t distance; - int version_count; + SFBinlogWriterBuffer *current; + SFBinlogWriterBuffer *previous; + SFBinlogWriterSlot *slot; int result; - int next_index; - bool expand; writer = wb->writer; - distance = (int64_t)wb->version.first - (int64_t)writer->version_ctx.next; - if (distance >= (writer->version_ctx.ring.size - 1)) { - logWarning("file: "__FILE__", line: %d, subdir_name: %s, " - "current version: %"PRId64" is too large which " - "exceeds %"PRId64" + %d", __LINE__, - writer->cfg.subdir_name, wb->version.first, - writer->version_ctx.next, - writer->version_ctx.ring.size - 1); - repush_to_queue(writer->thread, wb); - fc_sleep_ms(10); - return EAGAIN; - } else if (distance < 0) { + if (wb->version.first < writer->version_ctx.next) { logError("file: "__FILE__", line: %d, subdir_name: %s, " "current version: %"PRId64" is too small which " - "less than %"PRId64", tag: %d, buffer(%d): %.*s", __LINE__, - writer->cfg.subdir_name, wb->version.first, + "less than %"PRId64", tag: %"PRId64", buffer(%d): %.*s", + __LINE__, writer->cfg.subdir_name, wb->version.first, writer->version_ctx.next, wb->tag, wb->bf.length, wb->bf.length, wb->bf.buff); fast_mblock_free_object(&writer->thread->mblock, wb); - return EINVAL; + return 0; } /* @@ -380,81 +330,57 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb) writer->version_ctx.next, writer); */ - current = writer->version_ctx.ring.entries + wb->version.first % - writer->version_ctx.ring.size; - if (current == writer->version_ctx.ring.start) { - version_count = GET_WBUFFER_VERSION_COUNT(wb); - DEAL_CURRENT_VERSION_WBUFFER(writer, wb, version_count); - next_index = (writer->version_ctx.ring.start - - writer->version_ctx.ring.entries) + version_count; - if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { - writer->version_ctx.ring.start = writer->version_ctx.ring.end = - writer->version_ctx.ring.entries + next_index % - writer->version_ctx.ring.size; - return 0; - } + if (wb->version.first == writer->version_ctx.next) { + DEAL_CURRENT_VERSION_WBUFFER(writer, wb); - writer->version_ctx.ring.start = writer->version_ctx.ring.entries + - next_index % writer->version_ctx.ring.size; - while (writer->version_ctx.ring.start != writer->version_ctx.ring.end && - *(writer->version_ctx.ring.start) != NULL) + slot = writer->version_ctx.ring.slots + + writer->version_ctx.next % writer->version_ctx.ring.size; + while (slot->head.next != NULL && slot->head.next-> + version.first == writer->version_ctx.next) { - current = writer->version_ctx.ring.start; - version_count = GET_WBUFFER_VERSION_COUNT(*current); - DEAL_CURRENT_VERSION_WBUFFER(writer, *current, version_count); - *current = NULL; + current = slot->head.next; + slot->head.next = current->next; - next_index += version_count; - writer->version_ctx.ring.start = writer->version_ctx.ring.entries + - next_index % writer->version_ctx.ring.size; - writer->version_ctx.ring.count--; + DEAL_CURRENT_VERSION_WBUFFER(writer, current); + writer->version_ctx.ring.waiting_count--; + + slot = writer->version_ctx.ring.slots + writer-> + version_ctx.next % writer->version_ctx.ring.size; } return 0; } - version_count = GET_WBUFFER_VERSION_COUNT(wb); - distance = (int64_t)wb->version.last - (int64_t)writer->version_ctx.next; - if (distance >= (writer->version_ctx.ring.size - 1)) { - logWarning("file: "__FILE__", line: %d, subdir_name: %s, " - "current version: %"PRId64" is too large which " - "exceeds %"PRId64" + %d", __LINE__, - writer->cfg.subdir_name, wb->version.last, - writer->version_ctx.next, - writer->version_ctx.ring.size - 1); - repush_to_queue(writer->thread, wb); - fc_sleep_ms(10); - return EAGAIN; - } - - *current = wb; - writer->version_ctx.ring.count++; - if (writer->version_ctx.ring.count > writer->version_ctx.ring.max_count) { - writer->version_ctx.ring.max_count = writer->version_ctx.ring.count; - logDebug("%s max ring.count ==== %d", writer->cfg.subdir_name, - writer->version_ctx.ring.count); - } - - if (writer->version_ctx.ring.start == writer->version_ctx.ring.end) { //empty - expand = true; - } else if (writer->version_ctx.ring.end > writer->version_ctx.ring.start) { - SFBinlogWriterBuffer **last; - last = writer->version_ctx.ring.entries + wb->version.last % - writer->version_ctx.ring.size; - expand = !(current > writer->version_ctx.ring.start && - last < writer->version_ctx.ring.end); + slot = writer->version_ctx.ring.slots + wb->version.first % + writer->version_ctx.ring.size; + if (slot->head.next == NULL) { + wb->next = NULL; + slot->head.next = wb; + } else if (wb->version.first < slot->head.next->version.first) { + wb->next = slot->head.next; + slot->head.next = wb; } else { - expand = (current >= writer->version_ctx.ring.end && - current < writer->version_ctx.ring.start); + previous = slot->head.next; + while (previous->next != NULL && wb->version.first > + previous->next->version.first) + { + previous = previous->next; + } + + wb->next = previous->next; + previous->next = wb; } - if (expand) { - writer->version_ctx.ring.end = writer->version_ctx.ring.entries + - (wb->version.last + 1) % writer->version_ctx.ring.size; + writer->version_ctx.ring.waiting_count++; + if (writer->version_ctx.ring.waiting_count > + writer->version_ctx.ring.max_waitings) + { + writer->version_ctx.ring.max_waitings = + writer->version_ctx.ring.waiting_count; } - return EAGAIN; + return 0; } static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread, @@ -507,9 +433,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, wbuffer = wbuffer->next; if (current->type == SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION) { - if (current->writer->version_ctx.ring.start != - current->writer->version_ctx.ring.end) - { + if (current->writer->version_ctx.ring.waiting_count != 0) { logWarning("file: "__FILE__", line: %d, " "subdir_name: %s, ring not empty, " "maybe some mistake happen", __LINE__, @@ -526,15 +450,17 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); } else { + current->writer->total_count++; if ((result=deal_record_by_version(current)) == 0) { add_to_flush_writer_queue(thread, current->writer); - } else if (!(result == EAGAIN || result == EINVAL)) { + } else { return result; } } } while (wbuffer != NULL); } else { do { + wbuffer->writer->total_count++; if ((result=deal_binlog_one_record(wbuffer)) != 0) { return result; } @@ -639,6 +565,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, bool create; char filepath[PATH_MAX]; + writer->total_count = 0; writer->flush.in_queue = false; if ((result=sf_binlog_buffer_init(&writer->binlog_buffer, buffer_size)) != 0) @@ -681,18 +608,15 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, { int bytes; - logDebug("init writer %s ===== next version: %"PRId64", writer: %p", - subdir_name, next_version, writer); - - bytes = sizeof(SFBinlogWriterBuffer *) * ring_size; - writer->version_ctx.ring.entries = (SFBinlogWriterBuffer **)fc_malloc(bytes); - if (writer->version_ctx.ring.entries == NULL) { + bytes = sizeof(SFBinlogWriterSlot) * ring_size; + writer->version_ctx.ring.slots = (SFBinlogWriterSlot *)fc_malloc(bytes); + if (writer->version_ctx.ring.slots == NULL) { return ENOMEM; } - memset(writer->version_ctx.ring.entries, 0, bytes); + memset(writer->version_ctx.ring.slots, 0, bytes); writer->version_ctx.ring.size = ring_size; - writer->version_ctx.ring.count = 0; - writer->version_ctx.ring.max_count = 0; + writer->version_ctx.ring.waiting_count = 0; + writer->version_ctx.ring.max_waitings = 0; binlog_writer_set_next_version(writer, next_version); return sf_binlog_writer_init_normal(writer, subdir_name, buffer_size); diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 0293960..a38d635 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -46,18 +46,20 @@ struct sf_binlog_writer_info; typedef struct sf_binlog_writer_buffer { SFVersionRange version; BufferInfo bf; - int tag; + int64_t tag; int type; //for versioned writer struct sf_binlog_writer_info *writer; struct sf_binlog_writer_buffer *next; } SFBinlogWriterBuffer; +typedef struct sf_binlog_writer_slot { + SFBinlogWriterBuffer head; +} SFBinlogWriterSlot; + typedef struct sf_binlog_writer_buffer_ring { - SFBinlogWriterBuffer **entries; - SFBinlogWriterBuffer **start; //for consumer - SFBinlogWriterBuffer **end; //for producer - int count; - int max_count; + SFBinlogWriterSlot *slots; + int waiting_count; + int max_waitings; int size; } SFBinlogWriterBufferRing; @@ -91,6 +93,7 @@ typedef struct sf_binlog_writer_info { char *name; } file; + int64_t total_count; struct { SFBinlogWriterBufferRing ring; int64_t next;