• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Memcached源码分析之memcached.c

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

memcached.c 由于代码太多,在此省略了部分代码,例如UPD连接,二进制协议,某些错误输出和调试输出等,建议从main函数开始看起。

  1.  #include "memcached.h"
  2. //尝试从socket中读取数据的结果枚举
  3. enum try_read_result {
  4.     READ_DATA_RECEIVED,
  5.     READ_NO_DATA_RECEIVED,
  6.     READ_ERROR, /** an error occured (on the socket) (or client closed connection) */
  7.     READ_MEMORY_ERROR /** failed to allocate more memory */
  8. };
  9. struct stats stats; //全局统计变量
  10. struct settings settings; //全局配置变量
  11. static conn *listen_conn = NULL;  //全局的监听连接对象
  12. static int max_fds; //fds上限
  13. static struct event_base *main_base; //主线程的event_base 对象
  14. //向客户端发送数据的结果枚举
  15. enum transmit_result {
  16.     TRANSMIT_COMPLETE, /** All done writing. */
  17.     TRANSMIT_INCOMPLETE, /** More data remaining to write. */
  18.     TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
  19.     TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
  20. };
  21. extern pthread_mutex_t conn_lock; //连接锁
  22.  
  23. /**
  24. 创建连接函数,参数为:
  25. sfd 要监听的socket fd
  26. init_state 连接的初始化状态conn_states
  27. event_flags 监听的事件
  28. read_buffer_size 读缓存大小
  29. transport 监听的socket 类型
  30. base event_base
  31.  
  32. 每监听一个fd(listen fd和client fd)都会创建这样一个conn来保存相关信息,表示一个“连接”,
  33. 无论连接是已经连接上了,还是仅仅处于listen状态。
  34. */
  35. conn *conn_new(const int sfd, enum conn_states init_state,
  36.                 const int event_flags,
  37.                 const int read_buffer_size, enum network_transport transport,
  38.                 struct event_base *base) {
  39.     conn *c;
  40.     assert(sfd >= 0 && sfd < max_fds);
  41.     c = conns[sfd];
  42.     if (NULL == c) {
  43.         if (!(c = (conn *)calloc(1, sizeof(conn)))) {
  44.             STATS_LOCK();
  45.             stats.malloc_fails++;
  46.             STATS_UNLOCK();
  47.             fprintf(stderr, "Failed to allocate connection object\n");
  48.             return NULL;
  49.         }
  50.         MEMCACHED_CONN_CREATE(c);
  51.         c->rbuf = c->wbuf = 0;
  52.         //c->xx省略部分初始化代码
  53.         c->hdrsize = 0;
  54.         c->rbuf = (char *)malloc((size_t)c->rsize);
  55.         c->wbuf = (char *)malloc((size_t)c->wsize);
  56.         c->ilist = (item **)malloc(sizeof(item *) * c->isize);
  57.         c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
  58.         c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
  59.         c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
  60.         if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
  61.                 c->msglist == 0 || c->suffixlist == 0) {
  62.             conn_free(c);
  63.             STATS_LOCK();
  64.             stats.malloc_fails++;
  65.             STATS_UNLOCK();
  66.             fprintf(stderr, "Failed to allocate buffers for connection\n");
  67.             return NULL;
  68.         }
  69.         STATS_LOCK();
  70.         stats.conn_structs++;
  71.         STATS_UNLOCK();
  72.         c->sfd = sfd;
  73.         conns[sfd] = c;
  74.     }
  75.     c->transport = transport;
  76.     c->protocol = settings.binding_protocol;
  77.     if (!settings.socketpath) {
  78.         c->request_addr_size = sizeof(c->request_addr);
  79.     } else {
  80.         c->request_addr_size = 0;
  81.     }
  82.     if (transport == tcp_transport && init_state == conn_new_cmd) {
  83.         if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
  84.                         &c->request_addr_size)) {
  85.             perror("getpeername");
  86.             memset(&c->request_addr, 0, sizeof(c->request_addr));
  87.         }
  88.     }
  89.     if (settings.verbose > 1) {
  90.        //省略向终端输出调试的代码
  91.     }
  92.     c->state = init_state;
  93.     //c->xxx 省略部分初始化代码
  94.     c->noreply = false;
  95.     //创建事件,处理函数为event_handler,并把conn 连接对象传入event_handler中。
  96.     //(主线程调用conn_new时:)在主线程,创建完listenfd后,调用此函数监听网络连接事件,此时conn对象的conn_state为conn_listening
  97.    
  98.     /**(建议看到worker线程调用conn_new时才看以下解析)
  99.     (worker线程调用conn_new时)在worker线程,收到主线程丢过来的client fd时,调用此函数监听来自client fd的网络事件。
  100.    
  101.      也就是说,无论是主线程还是worker线程,都会调用此函数conn_new,创建conn连接对象,同时监听各自的fd。
  102.      而且都是调用event_handler作处理,只是不一样的fd, 不一样的conn对象(即下面的(void *) c)
  103.      进入 event_handler看看都做了啥?
  104.     */
  105.     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
  106.     event_base_set(base, &c->event); //为事件设置事件基地
  107.     c->ev_flags = event_flags;
  108.     if (event_add(&c->event, 0) == -1) { //把事件加入监听
  109.         perror("event_add");
  110.         return NULL;
  111.     }
  112.     STATS_LOCK();
  113.     stats.curr_conns++;
  114.     stats.total_conns++;
  115.     STATS_UNLOCK();
  116.     MEMCACHED_CONN_ALLOCATE(c->sfd);
  117.     return c;
  118. }
  119. //向客户端输出字符串
  120. static void out_string(conn *c, const char *str) {
  121.     size_t len;
  122.     assert(c != NULL);
  123.     if (c->noreply) {
  124.         if (settings.verbose > 1)
  125.             fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
  126.         c->noreply = false;
  127.         conn_set_state(c, conn_new_cmd);
  128.         return;
  129.     }
  130.     if (settings.verbose > 1)
  131.         fprintf(stderr, ">%d %s\n", c->sfd, str);
  132.     /* Nuke a partial output... */
  133.     c->msgcurr = 0;
  134.     c->msgused = 0;
  135.     c->iovused = 0;
  136.     add_msghdr(c); //添加一个msghdr
  137.     len = strlen(str);
  138.     if ((len + 2) > c->wsize) {
  139.         /* ought to be always enough. just fail for simplicity */
  140.         str = "SERVER_ERROR output line too long";
  141.         len = strlen(str);
  142.     }
  143.     memcpy(c->wbuf, str, len); //把要发送的字符串写入wbuf字段中
  144.     memcpy(c->wbuf + len, "\r\n", 2); //添加换行回车符
  145.     c->wbytes = len + 2;
  146.     c->wcurr = c->wbuf;
  147.     conn_set_state(c, conn_write); //把连接状态设置为conn_write,则状态机进入conn_write分支执行输出
  148.     c->write_and_go = conn_new_cmd; //当状态机完成输出后要切换到的状态为conn_new_cmd
  149.     return;
  150. }
  151. /**
  152. 无法分配内存时经常会调用此函数,例如内存空间已满,但又无法淘汰旧的item时,则向客户端响应一个错误
  153. */
  154. static void out_of_memory(conn *c, char *ascii_error) {
  155.     const static char error_prefix[] = "SERVER_ERROR ";
  156.     const static int error_prefix_len = sizeof(error_prefix) - 1;
  157.     if (c->protocol == binary_prot) {
  158.         /* Strip off the generic error prefix; it's irrelevant in binary */
  159.         if (!strncmp(ascii_error, error_prefix, error_prefix_len)) {
  160.             ascii_error += error_prefix_len;
  161.         }
  162.         write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, ascii_error, 0);
  163.     } else {
  164.         out_string(c, ascii_error);
  165.     }
  166. }
  167. /*
  168.  * 解析文本协议时,执行set/add/replace 命令并把value数据塞到item之后,调用此函数。
  169.  * 这个函数的作用比较简单,主要是做一些收尾工作。
  170.  *
  171.  */
  172. static void complete_nread_ascii(conn *c) {
  173.     assert(c != NULL);
  174.     item *it = c->item;
  175.     int comm = c->cmd;
  176.     enum store_item_type ret;
  177.     pthread_mutex_lock(&c->thread->stats.mutex);
  178.     c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
  179.     pthread_mutex_unlock(&c->thread->stats.mutex);
  180.     if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
  181.         out_string(c, "CLIENT_ERROR bad data chunk");
  182.     } else {
  183.       ret = store_item(it, comm, c);
  184.     }
  185.       switch (ret) {
  186.       case STORED:
  187.           out_string(c, "STORED");
  188.           break;
  189.       //省略其它
  190.       default:
  191.           out_string(c, "SERVER_ERROR Unhandled storage type.");
  192.       }
  193.     }
  194.     //释放引用,注意仅仅是一个释放引用的逻辑,不一定把内存空间给释放掉
  195.     //具体是否要把内存空间释放,说见item_remove和do_item_remove函数。而在此处,
  196.     //一般都只是释放引用而已。
  197.     item_remove(c->item); /* release the c->item reference */
  198.     c->item = 0;
  199. }
  200. //重置命令处理
  201. static void reset_cmd_handler(conn *c) {
  202.     c->cmd = -1;
  203.     c->substate = bin_no_state;
  204.     if(c->item != NULL) {
  205.         item_remove(c->item);
  206.         c->item = NULL;
  207.     }
  208.     conn_shrink(c);
  209.     /**
  210.     不知道你有没有在下面这个地方困惑过。。反正我是纠结了很久。。。
  211.    
  212.     这个c->rbytes到底什么时候开始不为0了?为0的话,怎么执行这次的命令?
  213.     回顾一下我们是怎么会调用到reset_cmd_handler这个地方:
  214.     1)主线程监听到listen fd有连接上来,进入drive_machine的conn_listen状态,
  215.         然后accept连接得到client fd,再dispatch_conn_new把client fd丢给某个worker线程
  216.     2)某个worker线程收到主线程丢过来的client fd之后,把它加到那个worker线程自己的
  217.         event_base,然后注册event_handler(event_handler主要是调drive_machine)作为事件处理函数,
  218.         在注册event_handler的同时,初始化了一个conn对象作为drive_machine的参数,
  219.         而这个对象中的rbytes为0。
  220.     3)当worker线程监听的client fd第一次有命令请求过来时(注意是第一次),例如set key 0 0 4\r\n,
  221.         worker线程的收到通知,然后陷入了event_handler再到drive_machine中去。。
  222.         而此时,传过来的conn* c,c->state 为conn_new_cmd,而c->rbytes确实为0!!
  223.         (再次强调是第一次有命令请求过来时)
  224.     就在这个时候就进入reset_cmd_handler,当前你看到的这个地方了。继续分析之后做了些啥:
  225.     4)第3)点得知,c->rbytes由此确实为0,所以在这种情况,必然会进入下面的else分支,
  226.         状态机进入conn_waiting状态。。。。
  227.     5)进入conn_waiting状态做了啥?就是把连接状态由conn_waiting变成conn_read,然后stop = true,退出
  228.         状态机。没错,退出状态机了!结束本次event_handler了!这就是比较纠结之处,这次命令请求触发的
  229.         event_handler居然只做了把状态由conn_new_cmd变成conn_read,然后再等待下次事件通知。
  230.         (详见conn_waiting分支代码)
  231.     6)那刚才那个命令请求怎么办?压根没有去读?例如set key 0 0 4\r\n这个命令的实质作用被忽略了吗?
  232.         其实没有。。原因是libevent默认的是水平触发,也就是说,这个命令还没被读,下次继续触发。。。
  233.         下次event_base会因为刚才那个命令再触发通知告诉worker线程,再次进入drive_machine,只是此时
  234.         c->state是conn_read状态,conn_read分支才真正把这个命令执行!
  235.     也就是说,当worker线程监听的client fd "第一次"有命令过来的时候,会触发两次event_base的通知!!
  236.     */
  237.     //这个rbytes大于0的情况,是当一次event中包含多个命令,
  238.     //或者说多个\r\n时候,程序执行到第二个及以后命令的时候出现。
  239.     if (c->rbytes > 0) {
  240.         conn_set_state(c, conn_parse_cmd);
  241.     } else {
  242.         conn_set_state(c, c
  243.             onn_waiting);
  244.     }
  245. }
  246. enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
  247.     char *key = ITEM_key(it);
  248.     item *old_it = do_item_get(key, it->nkey, hv); //拿出旧的item
  249.     enum store_item_type stored = NOT_STORED;
  250.     item *new_it = NULL;
  251.     int flags;
  252.     if (old_it != NULL && comm == NREAD_ADD) {
  253.         do_item_update(old_it); //更新item信息,其实主要是更新最近使用信息,即lru链表,。
  254.     } else if (!old_it && (comm == NREAD_REPLACE
  255.         || comm == NREAD_APPEND || comm == NREAD_PREPEND))
  256.     {
  257.     } else if (comm == NREAD_CAS) { //省略
  258.     } else {
  259.         if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
  260.             if (ITEM_get_cas(it) != 0) {
  261.                 // CAS much be equal
  262.                 if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
  263.                     stored = EXISTS;
  264.                 }
  265.             }
  266.             if (stored == NOT_STORED) {
  267.                 flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
  268.                 new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);
  269.                 if (new_it == NULL) {
  270.                     if (old_it != NULL)
  271.                         do_item_remove(old_it);
  272.                     return NOT_STORED;
  273.                 }
  274.                 if (comm == NREAD_APPEND) {
  275.                     memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
  276.                     memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
  277.                 } else {
  278.                     memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
  279.                     memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
  280.                 }
  281.                 it = new_it;
  282.             }
  283.         }
  284.         //SET 命令会直接跑来这里,不仔细看还真不知道 -_-||
  285.         /**
  286.             会跑来这里的,NOT_STORED的都是还没执行"link"操作的情况。
  287.             像NREAD_APPEND这些命令都是有link过的。
  288.             至于什么是link,具体往下看。
  289.         */
  290.         if (stored == NOT_STORED) {

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
C#使用CancellationTokenSource终止线程发布时间:2022-07-13
下一篇:
关于c语言中的字符串问题发布时间:2022-07-13
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap