1、异步 1、集群在启动时会初始化出异步线程连接队列,其中cluster->async_max_conns_per_node为每个阶段异步最大连接数,可配。 2、 1)先申请as_event_loop_capacity(为)个异步队列空间 2)将最大连接数分配到每个异步队列中。Max=最大连接/队列个数,rem=0 3、初始化每个队列。队列为空 4、 //1、 as_node_create: if (as_event_loop_capacity > 0) { node->async_conn_qs=as_node_create_async_queues(cluster->async_max_conns_per_node); node->pipe_conn_qs= as_node_create_async_queues(cluster->pipe_max_conns_per_node); } //2、 as_node_create_async_queues(uint32_tmax_conns_per_node): as_queue* queues = cf_malloc(sizeof(as_queue)* as_event_loop_capacity); uint32_t max = max_conns_per_node /as_event_loop_capacity; uint32_t rem = max_conns_per_node - (max *as_event_loop_capacity); uint32_t capacity; for (uint32_t i = 0; i data= cf_malloc(capacity * item_size); if(! queue->data) { returnfalse; } queue->capacity= capacity;//capacity为每个队列的容量 queue->head= queue->tail = 0;//队列为空 queue->item_size= item_size;//item大学 queue->total= 0; queue->flags= ITEMS_ON_HEAP; returntrue; } 4、异步,连接server之前,需要先创建eventloop。该函数入参即为as_event_loop_capacity。 每个event_loop有一个as_ev_worker线程 as_event_create_loops 5、aerospike_key_put_async异步接口,有个入参指定event_loop, 调用aerospike_key_put_async_ex 5.1、异步接口会调用as_async_write_command_create,拼接CMD 该cmd->event_loop为接口中传进的值,如果不从接口传入,采用轮询方式分配 cmd->event_loop= as_event_assign(event_loop); returnevent_loop ? event_loop : as_event_loop_get(); 5.2、异步接口拼接完cmd后,调用as_event_command_execute(cmd,err)去执行 5.3、as_event_command_execute执行方法: 1)如果已经在该线程中,则直接执行 2)否则保存到event_loop队列,并唤起event_loop线程进行处理 if (cmd->event_loop->thread == pthread_self()) { //We are already in event loop thread, so start processing. as_event_command_begin(cmd); } else{ if(cmd->timeout_ms) { //Store current time in first 8 bytes which is not used yet. *(uint64_t*)cmd= cf_getms(); } //Send command through queue so it can be executed in event loop thread. if(! as_event_send(cmd)) { as_event_command_free(cmd); returnas_error_set_message(err, AEROSPIKE_ERR_CLIENT, "Failed to queuecommand"); } } as_event_send as_event_loop*event_loop = cmd->event_loop; pthread_mutex_lock(&event_loop->lock); boolqueued = as_queue_push(&event_loop->queue, &cmd); pthread_mutex_unlock(&event_loop->lock); if(queued) { ev_async_send(event_loop->loop,&event_loop->wakeup); } 6、执行event的函数as_event_command_begin 6.1、获取连接 as_connection_statusstatus = cmd->pipe_listener != NULL ? as_pipe_get_connection(cmd) :as_event_get_connection(cmd); 6.2、执行 if(status == AS_CONNECTION_FROM_POOL) { as_ev_command_write_start(cmd); } elseif (status == AS_CONNECTION_NEW) { as_ev_connect(cmd);//创建新连接后,进行连接 } 6.3、获取连接的方法: 1)as_queue*queue = &cmd->node->async_conn_qs[cmd->event_loop->index];得到异步连接队列 2)如果队列不为空,则从中pop一个连接(怎么不使用锁保护?) 3)否则新创建一个连接。这里有个限制。 每新创建一个连接queue->total++; 当queue->total>=queue->capicity的时候报连接耗尽错误。 6.4、 aerospike_key_put_async->aerospike_key_put_async_ex->as_async_write_command_create 中有个回调函数as_event_command_parse_header 6.5、该回调函数调用as_event_response_complete(cmd);将异步新创建的连接放到池子里 as_event_response_complete->as_event_put_connection
网站首页 > 开源技术 正文
猜你喜欢
- 2024-10-22 分布式系统中的延迟和成本(分布式低时延金融技术)
- 2024-10-22 关于混合云成本的5个误解(混合云优势)
- 2024-10-22 短短四个月!这款火箭发动机完成了设计、3D打印,还通过了测试
- 2024-10-22 外星逆向工程飞行器可以超越光速(外星科技逆向研究)
- 2024-10-22 Java缓存框架大PK:Guava Cache VS EVCache VS Tair VS Aerospike
- 2024-10-22 Intel发布傲腾DDR4内存:非易失性设计、单条128GB起步
- 2024-10-22 Intel傲腾SSD使用体验:存储界“法拉利”
- 2024-10-22 云上如何不停机更换关键大数据服务?
- 2024-07-16 常见的四种非关系型数据库都适合什么业务场景?
- 2024-07-16 超详细的大数据学习资源大全!(大数据 资源)
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)