From 3dc8efde5c91c1cd0545310166a2ce0a94d356a0 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 18 Feb 2021 16:36:21 +0800 Subject: [PATCH] impl proto_get_group_servers client side --- src/sf_configs.h | 27 --- src/sf_connection_manager.c | 347 +++++++++++++++++++++++++++++++++++- src/sf_connection_manager.h | 21 ++- src/sf_proto.c | 69 +++++++ src/sf_proto.h | 17 ++ src/sf_types.h | 40 +++++ 6 files changed, 478 insertions(+), 43 deletions(-) diff --git a/src/sf_configs.h b/src/sf_configs.h index 98bb9f6..1085e5d 100644 --- a/src/sf_configs.h +++ b/src/sf_configs.h @@ -23,39 +23,12 @@ #include "sf_define.h" #include "sf_types.h" -typedef enum sf_net_retry_interval_mode { - sf_net_retry_interval_mode_fixed, - sf_net_retry_interval_mode_multiple -} SFNetRetryIntervalMode; - -typedef struct sf_net_retry_interval_mode_max_pair { - SFNetRetryIntervalMode mode; - int max_interval_ms; -} SFNetRetryIntervalModeMaxPair; - -typedef struct sf_net_retry_times_interval_pair { - int times; - int interval_ms; -} SFNetRetryTimesIntervalPair; - -typedef struct sf_net_retry_config { - SFNetRetryIntervalModeMaxPair interval_mm; - SFNetRetryTimesIntervalPair connect; - SFNetRetryTimesIntervalPair network; -} SFNetRetryConfig; - typedef struct sf_net_retry_interval_context { SFNetRetryIntervalModeMaxPair *mm; SFNetRetryTimesIntervalPair *ti; int interval_ms; } SFNetRetryIntervalContext; -typedef enum sf_data_read_rule { - sf_data_read_rule_any_available, - sf_data_read_rule_slave_first, - sf_data_read_rule_master_only, -} SFDataReadRule; - #ifdef __cplusplus extern "C" { #endif diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index 4bbfaaa..74ce361 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -23,11 +23,76 @@ #include #include #include -#include "sf/sf_global.h" +#include "sf_global.h" +#include "sf_proto.h" #include "fastcommon/shared_func.h" #include "fastcommon/logger.h" +#include "fastcommon/fc_atomic.h" #include "sf_connection_manager.h" +static ConnectionInfo *get_spec_connection(SFConnectionManager *cm, + const ConnectionInfo *target, int *err_no) +{ + return conn_pool_get_connection(&cm->cpool, target, err_no); +} + +static ConnectionInfo *make_connection(SFConnectionManager *cm, + FCAddressPtrArray *addr_array, int *err_no) +{ + FCAddressInfo **current; + FCAddressInfo **addr; + FCAddressInfo **end; + ConnectionInfo *conn; + + if (addr_array->count <= 0) { + *err_no = ENOENT; + return NULL; + } + + current = addr_array->addrs + addr_array->index; + if ((conn=get_spec_connection(cm, &(*current)->conn, + err_no)) != NULL) + { + return conn; + } + + if (addr_array->count == 1) { + return NULL; + } + + end = addr_array->addrs + addr_array->count; + for (addr=addr_array->addrs; addrconn, + err_no)) != NULL) + { + addr_array->index = addr - addr_array->addrs; + return conn; + } + } + + return NULL; +} + +static int validate_connection_callback(ConnectionInfo *conn, void *args) +{ + SFConnectionManager *cm; + SFResponseInfo response; + int result; + + cm = (SFConnectionManager *)args; + if ((result=sf_active_test(conn, &response, cm->common_cfg-> + network_timeout)) != 0) + { + sf_log_network_error(&response, conn, result); + } + + return result; +} + static int init_group_array(SFCMConnGroupArray *garray, const int group_count, const int min_group_id) { @@ -56,12 +121,30 @@ static int init_group_array(SFCMConnGroupArray *garray, const int group_count, return 0; } -int sf_connection_manager_init(SFConnectionManager *cm, const int group_count, +int sf_connection_manager_init(SFConnectionManager *cm, + const SFClientCommonConfig *common_cfg, const int group_count, const int min_group_id, const int server_group_index, - const SFDataReadRule read_rule) + const int server_count, const int max_count_per_entry, + const int max_idle_time, fc_connection_callback_func + connect_done_callback, void *args) { + const int socket_domain = AF_INET; + int htable_init_capacity; int result; + htable_init_capacity = 4 * server_count; + if (htable_init_capacity < 256) { + htable_init_capacity = 256; + } + if ((result=conn_pool_init_ex1(&cm->cpool, common_cfg->connect_timeout, + max_count_per_entry, max_idle_time, socket_domain, + htable_init_capacity, connect_done_callback, args, + validate_connection_callback, cm, + sizeof(SFConnectionParameters))) != 0) + { + return result; + } + if ((result=init_group_array(&cm->groups, group_count, min_group_id)) != 0) { @@ -69,7 +152,8 @@ int sf_connection_manager_init(SFConnectionManager *cm, const int group_count, } cm->server_group_index = server_group_index; - cm->read_rule = read_rule; + cm->common_cfg = common_cfg; + cm->max_servers_per_group = 0; return 0; } @@ -95,6 +179,7 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, } group = cm->groups.entries + (group_id - cm->groups.min_group_id); + group->id = group_id; group->all.servers = (SFCMServerEntry *)fc_malloc( sizeof(SFCMServerEntry) * count); if (group->all.servers == NULL) { @@ -106,21 +191,224 @@ int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, for (entry=group->all.servers, server=servers; serverserver_id = (*server)->id; + entry->id = (*server)->id; entry->addr_array = &(*server)->group_addrs[ cm->server_group_index].address_array; entry->conn = NULL; } - group->alives.servers = (SFCMServerEntry **)fc_malloc( - sizeof(SFCMServerEntry *) * count); - if (group->alives.servers == NULL) { - return ENOMEM; + if (count > cm->max_servers_per_group) { + cm->max_servers_per_group = count; } return 0; } +static SFCMServerEntry *get_server_by_id(SFCMConnGroupEntry *group, + const int server_id) +{ + SFCMServerEntry *server; + SFCMServerEntry *end; + + end = group->all.servers + group->all.count; + for (server=group->all.servers; serverid == server_id) { + return server; + } + } + + return NULL; +} + +static SFCMServerPtrArray *convert_to_sptr_array(SFConnectionManager *cm, + SFCMConnGroupEntry *group, SFGroupServerArray *sarray, int *err_no) +{ + SFCMServerPtrArray *alives; + SFGroupServerInfo *server; + SFGroupServerInfo *end; + SFCMServerEntry *sentry; + + if (sarray->count > cm->max_servers_per_group) { + logError("file: "__FILE__", line: %d, " + "group id: %d, response server count: %d > " + "max count: %d!", __LINE__, group->id, + sarray->count, cm->max_servers_per_group); + *err_no = EOVERFLOW; + return NULL; + } + + alives = (SFCMServerPtrArray *)fast_mblock_alloc_object( + &cm->sptr_array_allocator); + if (alives == NULL) { + *err_no = ENOMEM; + return NULL; + } + + alives->count = 0; + end = sarray->servers + sarray->count; + for (server=sarray->servers; serverid)) == NULL) { + logError("file: "__FILE__", line: %d, " + "group id: %d, response server count: %d > " + "max count: %d!", __LINE__, group->id, + sarray->count, cm->max_servers_per_group); + *err_no = ENOENT; + fast_mblock_free_object(&cm->sptr_array_allocator, alives); + return NULL; + } + + if (server->is_master) { + FC_ATOMIC_SET(group->master, sentry); + if (cm->common_cfg->read_rule != sf_data_read_rule_slave_first) { + alives->servers[alives->count++] = sentry; + } + } else if (server->is_active) { + alives->servers[alives->count++] = sentry; + } + } + + *err_no = 0; + return alives; +} + +static int sptr_array_compare(SFCMServerPtrArray *a1, + SFCMServerPtrArray *a2) +{ + int sub; + int i; + + if ((sub=(a1->count - a2->count)) != 0) { + return sub; + } + + for (i = 0; i < a1->count; i++) { + if ((sub=(a1->servers[i]->id - a2->servers[i]->id)) != 0) { + return sub; + } + } + + return 0; +} + +static int do_get_group_servers(SFConnectionManager *cm, + SFCMConnGroupEntry *group, ConnectionInfo *conn) +{ +#define MAX_GROUP_SERVER_COUNT 128 + int result; + SFGroupServerInfo fixed_servers[MAX_GROUP_SERVER_COUNT]; + SFGroupServerArray sarray; + SFCMServerPtrArray *old_alives; + SFCMServerPtrArray *new_alives; + + sarray.alloc = MAX_GROUP_SERVER_COUNT; + sarray.count = 0; + sarray.servers = fixed_servers; + if ((result=sf_proto_get_group_servers(conn, cm->common_cfg-> + network_timeout, group->id, &sarray)) != 0) + { + return result; + } + + if ((new_alives=convert_to_sptr_array(cm, group, + &sarray, &result)) == NULL) + { + return result; + } + old_alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (sptr_array_compare(old_alives, new_alives) == 0) { + fast_mblock_free_object(&cm->sptr_array_allocator, new_alives); + return 0; + } + + if (__sync_bool_compare_and_swap(&group->alives, + old_alives, new_alives)) + { + fast_mblock_delay_free_object(&cm->sptr_array_allocator, old_alives, + (cm->common_cfg->connect_timeout + cm->common_cfg-> + network_timeout) * group->all.count); + } else { + fast_mblock_free_object(&cm->sptr_array_allocator, new_alives); + } + + return 0; +} + +static int get_group_servers_by_active(SFConnectionManager *cm, + SFCMConnGroupEntry *group) +{ + SFCMServerPtrArray *alives; + SFCMServerEntry **server; + SFCMServerEntry **end; + ConnectionInfo *conn; + int result; + + result = ENOENT; + alives = (SFCMServerPtrArray *)FC_ATOMIC_GET(group->alives); + if (alives->count == 0) { + return result; + } + + end = alives->servers + alives->count; + for (server=alives->servers; serveraddr_array, + &result)) == NULL) + { + continue; + } + + result = do_get_group_servers(cm, group, conn); + conn_pool_close_connection_ex(&cm->cpool, conn, result != 0); + if (result == 0) { + return 0; + } + } + + return result; +} + +static int get_group_servers_by_all(SFConnectionManager *cm, + SFCMConnGroupEntry *group) +{ + SFCMServerEntry *server; + SFCMServerEntry *end; + ConnectionInfo *conn; + int result; + + result = ENOENT; + if (group->all.count == 0) { + return result; + } + + end = group->all.servers + group->all.count; + for (server=group->all.servers; serveraddr_array, + &result)) == NULL) + { + continue; + } + + result = do_get_group_servers(cm, group, conn); + conn_pool_close_connection_ex(&cm->cpool, conn, result != 0); + if (result == 0) { + return 0; + } + } + + return result; +} + +static int get_group_servers(SFConnectionManager *cm, + SFCMConnGroupEntry *group) +{ + int result; + + if ((result=get_group_servers_by_active(cm, group)) == 0) { + return 0; + } + + return get_group_servers_by_all(cm, group); +} + static void *connection_manager_thread_func(void *arg) { SFConnectionManager *cm; @@ -133,9 +421,50 @@ static void *connection_manager_thread_func(void *arg) return NULL; } +static int sptr_array_alloc_init(void *element, void *args) +{ + SFCMServerPtrArray *sptr_array; + + sptr_array = (SFCMServerPtrArray *)element; + sptr_array->servers = (SFCMServerEntry **)(sptr_array + 1); + return 0; +} + int sf_connection_manager_start(SFConnectionManager *cm) { pthread_t tid; + int result; + int element_size; + SFCMConnGroupEntry *group; + SFCMConnGroupEntry *end; + SFCMServerPtrArray *sptr_array; + + element_size = sizeof(SFCMServerPtrArray) + + sizeof(SFCMServerEntry *) * cm->max_servers_per_group; + if ((result=fast_mblock_init_ex1(&cm->sptr_array_allocator, + "server_ptr_array", element_size, 4 * 1024, 0, + sptr_array_alloc_init, NULL, true)) != 0) + { + return result; + } + + end = cm->groups.entries + cm->groups.count; + for (group=cm->groups.entries; groupall.count == 0) { + logError("file: "__FILE__", line: %d, " + "group id: %d, no servers!", + __LINE__, group->id); + return ENOENT; + } + + sptr_array = (SFCMServerPtrArray *)fast_mblock_alloc_object( + &cm->sptr_array_allocator); + if (sptr_array == NULL) { + return ENOMEM; + } + __sync_bool_compare_and_swap(&group->alives, NULL, sptr_array); + } + return fc_create_thread(&tid, connection_manager_thread_func, cm, SF_G_THREAD_STACK_SIZE); } diff --git a/src/sf_connection_manager.h b/src/sf_connection_manager.h index eb5d8b6..33750a3 100644 --- a/src/sf_connection_manager.h +++ b/src/sf_connection_manager.h @@ -19,11 +19,11 @@ #define _SF_CONNECTION_MANAGER_H #include "fastcommon/server_id_func.h" +#include "fastcommon/connection_pool.h" #include "sf_types.h" -#include "sf_configs.h" typedef struct sf_cm_server_entry { - int server_id; + int id; ConnectionInfo *conn; FCAddressPtrArray *addr_array; } SFCMServerEntry; @@ -39,9 +39,10 @@ typedef struct sf_cm_server_ptr_array { } SFCMServerPtrArray; typedef struct sf_cm_conn_group_entry { - SFCMServerEntry *master; + int id; SFCMServerArray all; - SFCMServerPtrArray alives; + volatile SFCMServerEntry *master; + volatile SFCMServerPtrArray *alives; pthread_mutex_t lock; } SFCMConnGroupEntry; @@ -54,13 +55,19 @@ typedef struct sf_cm_conn_group_array { typedef struct sf_connection_manager { int server_group_index; - SFDataReadRule read_rule; //the rule for read + int max_servers_per_group; + const SFClientCommonConfig *common_cfg; SFCMConnGroupArray groups; + ConnectionPool cpool; + struct fast_mblock_man sptr_array_allocator; //element: SFCMServerPtrArray } SFConnectionManager; -int sf_connection_manager_init(SFConnectionManager *cm, const int group_count, +int sf_connection_manager_init(SFConnectionManager *cm, + const SFClientCommonConfig *common_cfg, const int group_count, const int min_group_id, const int server_group_index, - const SFDataReadRule read_rule); + const int server_count, const int max_count_per_entry, + const int max_idle_time, fc_connection_callback_func + connect_done_callback, void *args); int sf_connection_manager_add(SFConnectionManager *cm, const int group_id, FCServerInfo **servers, const int count); diff --git a/src/sf_proto.c b/src/sf_proto.c index ec9a02c..56cb8ec 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -368,3 +368,72 @@ int sf_proto_rebind_idempotency_channel(ConnectionInfo *conn, return result; } + +int sf_proto_get_group_servers(ConnectionInfo *conn, + const int network_timeout, const int group_id, + SFGroupServerArray *sarray) +{ + char out_buff[sizeof(SFCommonProtoHeader) + + sizeof(SFProtoGetGroupServersReq)]; + char in_buff[1024]; + SFCommonProtoHeader *header; + SFProtoGetGroupServersReq *req; + SFProtoGetGroupServersRespBodyHeader *body_header; + SFProtoGetGroupServersRespBodyPart *body_part; + SFGroupServerInfo *server; + SFGroupServerInfo *end; + SFResponseInfo response; + int result; + int body_len; + int count; + + header = (SFCommonProtoHeader *)out_buff; + req = (SFProtoGetGroupServersReq *)(header + 1); + int2buff(group_id, req->group_id); + SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_GET_GROUP_SERVERS_REQ, + sizeof(SFProtoGetGroupServersReq)); + response.error.length = 0; + if ((result=sf_send_and_recv_response_ex1(conn, out_buff, + sizeof(out_buff), &response, network_timeout, + SF_SERVICE_PROTO_GET_GROUP_SERVERS_RESP, in_buff, + sizeof(in_buff), &body_len)) != 0) + { + sf_log_network_error(&response, conn, result); + return result; + } + + if (body_len < sizeof(SFProtoGetGroupServersRespBodyHeader)) { + logError("file: "__FILE__", line: %d, " + "server %s:%d response body length: %d < %d", + __LINE__, conn->ip_addr, conn->port, body_len, + (int)sizeof(SFProtoGetGroupServersRespBodyHeader)); + return EINVAL; + } + + body_header = (SFProtoGetGroupServersRespBodyHeader *)in_buff; + count = buff2short(body_header->count); + if (count <= 0) { + logError("file: "__FILE__", line: %d, " + "server %s:%d response server count: %d <= 0", + __LINE__, conn->ip_addr, conn->port, count); + return EINVAL; + } + if (count > sarray->alloc) { + logError("file: "__FILE__", line: %d, " + "server %s:%d response server count: %d is too large, " + "exceeds %d", __LINE__, conn->ip_addr, conn->port, + count, sarray->alloc); + return EOVERFLOW; + } + sarray->count = count; + + body_part = (SFProtoGetGroupServersRespBodyPart *)(body_header + 1); + end = sarray->servers + sarray->count; + for (server=sarray->servers; serverid = buff2int(body_part->server_id); + server->is_master = body_part->is_master; + server->is_active = body_part->is_active; + } + + return 0; +} diff --git a/src/sf_proto.h b/src/sf_proto.h index d20d771..09cba36 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -137,6 +137,19 @@ typedef struct sf_proto_report_req_receipt_body { char req_id[8]; } SFProtoReportReqReceiptBody; +typedef struct sf_group_server_info { + int id; + bool is_master; + bool is_active; + char padding[2]; +} SFGroupServerInfo; + +typedef struct sf_group_server_array { + SFGroupServerInfo *servers; + int alloc; + int count; +} SFGroupServerArray; + #ifdef __cplusplus extern "C" { #endif @@ -338,6 +351,10 @@ int sf_proto_deal_ack(struct fast_task_info *task, int sf_proto_rebind_idempotency_channel(ConnectionInfo *conn, const uint32_t channel_id, const int key, const int network_timeout); +int sf_proto_get_group_servers(ConnectionInfo *conn, + const int network_timeout, const int group_id, + SFGroupServerArray *sarray); + #define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ do { \ if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \ diff --git a/src/sf_types.h b/src/sf_types.h index 16621b6..63381cc 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -137,4 +137,44 @@ typedef struct sf_slow_log_context { LogContext ctx; } SFSlowLogContext; +typedef enum sf_data_read_rule { + sf_data_read_rule_any_available, + sf_data_read_rule_slave_first, + sf_data_read_rule_master_only, +} SFDataReadRule; + +typedef enum sf_net_retry_interval_mode { + sf_net_retry_interval_mode_fixed, + sf_net_retry_interval_mode_multiple +} SFNetRetryIntervalMode; + +typedef struct sf_net_retry_interval_mode_max_pair { + SFNetRetryIntervalMode mode; + int max_interval_ms; +} SFNetRetryIntervalModeMaxPair; + +typedef struct sf_net_retry_times_interval_pair { + int times; + int interval_ms; +} SFNetRetryTimesIntervalPair; + +typedef struct sf_net_retry_config { + SFNetRetryIntervalModeMaxPair interval_mm; + SFNetRetryTimesIntervalPair connect; + SFNetRetryTimesIntervalPair network; +} SFNetRetryConfig; + +typedef struct sf_client_common_config { + SFDataReadRule read_rule; //the rule for read + int connect_timeout; + int network_timeout; + SFNetRetryConfig net_retry_cfg; +} SFClientCommonConfig; + +typedef struct sf_connection_parameters { + int buffer_size; + int group_id; + struct idempotency_client_channel *channel; +} SFConnectionParameters; + #endif