diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index a4e569f..a0d0bed 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -193,7 +193,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, switch (current->type) { case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE: - thread->order_by = current->version.first; + current->writer->order_by = current->version.first; fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; @@ -201,12 +201,15 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, flush_writer_files(thread); return ERRNO_THREAD_EXIT; case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: - if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + if (current->writer->order_by != + SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) + { logWarning("file: "__FILE__", line: %d, " "subdir_name: %s, invalid order by: %d != %d, " "maybe some mistake happen", __LINE__, - current->writer->fw.cfg.subdir_name, thread->order_by, - SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION); + current->writer->fw.cfg.subdir_name, + current->writer->order_by, + SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION); } if (current->writer->version_ctx.ring.waiting_count != 0) { @@ -236,7 +239,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, current->writer->fw.total_count++; add_to_flush_writer_queue(thread, current->writer); - if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { + if (current->writer->order_by == + SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION) + { /* NOTE: current maybe be released in the deal function */ if ((result=deal_record_by_version(current)) != 0) { return result; @@ -367,6 +372,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, const int buffer_size) { memset(writer, 0, sizeof(*writer)); + writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE; return sf_file_writer_init(&writer->fw, data_path, subdir_name, buffer_size); } @@ -388,6 +394,7 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, writer->version_ctx.ring.waiting_count = 0; writer->version_ctx.ring.max_waitings = 0; writer->version_ctx.change_count = 0; + writer->order_by = SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION; binlog_writer_set_next_version(writer, next_version); writer->flush.in_queue = false; @@ -397,8 +404,8 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const short order_by, const int max_record_size, - const int writer_count, const bool use_fixed_buffer_size) + const int max_record_size, const int writer_count, + const bool use_fixed_buffer_size) { const int alloc_elements_once = 1024; int result; @@ -408,7 +415,6 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, snprintf(thread->name, sizeof(thread->name), "%s", name); thread->order_mode = order_mode; - thread->order_by = order_by; thread->use_fixed_buffer_size = use_fixed_buffer_size; writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; @@ -445,12 +451,12 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, { SFBinlogWriterBuffer *buffer; - if (writer->thread->order_by == order_by) { + if (writer->order_by == order_by) { return 0; } - if (!(order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE || - order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION)) + if (!(order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE || + order_by == SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION)) { logError("file: "__FILE__", line: %d, " "invalid order by: %d!", __LINE__, order_by); diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 4381191..5d92b1c 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -25,8 +25,8 @@ #define SF_BINLOG_THREAD_ORDER_MODE_FIXED 0 #define SF_BINLOG_THREAD_ORDER_MODE_VARY 1 -#define SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE 0 -#define SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION 1 +#define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0 +#define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 @@ -65,7 +65,6 @@ typedef struct binlog_writer_thread { volatile bool running; bool use_fixed_buffer_size; short order_mode; - short order_by; struct { struct sf_binlog_writer_info *head; struct sf_binlog_writer_info *tail; @@ -82,6 +81,7 @@ typedef struct sf_binlog_writer_info { } version_ctx; SFBinlogWriterThread *thread; + short order_by; struct { bool in_queue; struct sf_binlog_writer_info *next; @@ -108,14 +108,13 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const char *name, SFBinlogWriterInfo *writer, const short order_mode, - const short order_by, const int max_record_size, - const int writer_count, const bool use_fixed_buffer_size); + const int max_record_size, const int writer_count, + const bool use_fixed_buffer_size); -#define sf_binlog_writer_init_thread(thread, name, \ - writer, order_by, max_record_size) \ +#define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \ sf_binlog_writer_init_thread_ex(thread, name, writer, \ - SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ - order_by, max_record_size, 1, true) + SF_BINLOG_THREAD_ORDER_MODE_FIXED, \ + max_record_size, 1, true) static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, const char *data_path, const char *subdir_name, @@ -128,9 +127,8 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, return result; } - return sf_binlog_writer_init_thread(&context->thread, subdir_name, - &context->writer, SF_BINLOG_THREAD_TYPE_ORDER_BY_NONE, - max_record_size); + return sf_binlog_writer_init_thread(&context->thread, + subdir_name, &context->writer, max_record_size); } void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);