memcached.c 由于代码太多,在此省略了部分代码,例如UPD连接,二进制协议,某些错误输出和调试输出等,建议从main函数开始看起。
- #include "memcached.h"
- //尝试从socket中读取数据的结果枚举
- enum try_read_result {
- READ_DATA_RECEIVED,
- READ_NO_DATA_RECEIVED,
- READ_ERROR, /** an error occured (on the socket) (or client closed connection) */
- READ_MEMORY_ERROR /** failed to allocate more memory */
- };
- struct stats stats; //全局统计变量
- struct settings settings; //全局配置变量
- static conn *listen_conn = NULL; //全局的监听连接对象
- static int max_fds; //fds上限
- static struct event_base *main_base; //主线程的event_base 对象
- //向客户端发送数据的结果枚举
- enum transmit_result {
- TRANSMIT_COMPLETE, /** All done writing. */
- TRANSMIT_INCOMPLETE, /** More data remaining to write. */
- TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
- TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
- };
- extern pthread_mutex_t conn_lock; //连接锁
-
- /**
- 创建连接函数,参数为:
- sfd 要监听的socket fd
- init_state 连接的初始化状态conn_states
- event_flags 监听的事件
- read_buffer_size 读缓存大小
- transport 监听的socket 类型
- base event_base
-
- 每监听一个fd(listen fd和client fd)都会创建这样一个conn来保存相关信息,表示一个“连接”,
- 无论连接是已经连接上了,还是仅仅处于listen状态。
- */
- conn *conn_new(const int sfd, enum conn_states init_state,
- const int event_flags,
- const int read_buffer_size, enum network_transport transport,
- struct event_base *base) {
- conn *c;
- assert(sfd >= 0 && sfd < max_fds);
- c = conns[sfd];
- if (NULL == c) {
- if (!(c = (conn *)calloc(1, sizeof(conn)))) {
- STATS_LOCK();
- stats.malloc_fails++;
- STATS_UNLOCK();
- fprintf(stderr, "Failed to allocate connection object\n");
- return NULL;
- }
- MEMCACHED_CONN_CREATE(c);
- c->rbuf = c->wbuf = 0;
- //c->xx省略部分初始化代码
- c->hdrsize = 0;
- c->rbuf = (char *)malloc((size_t)c->rsize);
- c->wbuf = (char *)malloc((size_t)c->wsize);
- c->ilist = (item **)malloc(sizeof(item *) * c->isize);
- c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
- c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
- c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
- if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
- c->msglist == 0 || c->suffixlist == 0) {
- conn_free(c);
- STATS_LOCK();
- stats.malloc_fails++;
- STATS_UNLOCK();
- fprintf(stderr, "Failed to allocate buffers for connection\n");
- return NULL;
- }
- STATS_LOCK();
- stats.conn_structs++;
- STATS_UNLOCK();
- c->sfd = sfd;
- conns[sfd] = c;
- }
- c->transport = transport;
- c->protocol = settings.binding_protocol;
- if (!settings.socketpath) {
- c->request_addr_size = sizeof(c->request_addr);
- } else {
- c->request_addr_size = 0;
- }
- if (transport == tcp_transport && init_state == conn_new_cmd) {
- if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
- &c->request_addr_size)) {
- perror("getpeername");
- memset(&c->request_addr, 0, sizeof(c->request_addr));
- }
- }
- if (settings.verbose > 1) {
- //省略向终端输出调试的代码
- }
- c->state = init_state;
- //c->xxx 省略部分初始化代码
- c->noreply = false;
- //创建事件,处理函数为event_handler,并把conn 连接对象传入event_handler中。
- //(主线程调用conn_new时:)在主线程,创建完listenfd后,调用此函数监听网络连接事件,此时conn对象的conn_state为conn_listening
-
- /**(建议看到worker线程调用conn_new时才看以下解析)
- (worker线程调用conn_new时)在worker线程,收到主线程丢过来的client fd时,调用此函数监听来自client fd的网络事件。
-
- 也就是说,无论是主线程还是worker线程,都会调用此函数conn_new,创建conn连接对象,同时监听各自的fd。
- 而且都是调用event_handler作处理,只是不一样的fd, 不一样的conn对象(即下面的(void *) c)
- 进入 event_handler看看都做了啥?
- */
- event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
- event_base_set(base, &c->event); //为事件设置事件基地
- c->ev_flags = event_flags;
- if (event_add(&c->event, 0) == -1) { //把事件加入监听
- perror("event_add");
- return NULL;
- }
- STATS_LOCK();
- stats.curr_conns++;
- stats.total_conns++;
- STATS_UNLOCK();
- MEMCACHED_CONN_ALLOCATE(c->sfd);
- return c;
- }
- //向客户端输出字符串
- static void out_string(conn *c, const char *str) {
- size_t len;
- assert(c != NULL);
- if (c->noreply) {
- if (settings.verbose > 1)
- fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
- c->noreply = false;
- conn_set_state(c, conn_new_cmd);
- return;
- }
- if (settings.verbose > 1)
- fprintf(stderr, ">%d %s\n", c->sfd, str);
- /* Nuke a partial output... */
- c->msgcurr = 0;
- c->msgused = 0;
- c->iovused = 0;
- add_msghdr(c); //添加一个msghdr
- len = strlen(str);
- if ((len + 2) > c->wsize) {
- /* ought to be always enough. just fail for simplicity */
- str = "SERVER_ERROR output line too long";
- len = strlen(str);
- }
- memcpy(c->wbuf, str, len); //把要发送的字符串写入wbuf字段中
- memcpy(c->wbuf + len, "\r\n", 2); //添加换行回车符
- c->wbytes = len + 2;
- c->wcurr = c->wbuf;
- conn_set_state(c, conn_write); //把连接状态设置为conn_write,则状态机进入conn_write分支执行输出
- c->write_and_go = conn_new_cmd; //当状态机完成输出后要切换到的状态为conn_new_cmd
- return;
- }
- /**
- 无法分配内存时经常会调用此函数,例如内存空间已满,但又无法淘汰旧的item时,则向客户端响应一个错误
- */
- static void out_of_memory(conn *c, char *ascii_error) {
- const static char error_prefix[] = "SERVER_ERROR ";
- const static int error_prefix_len = sizeof(error_prefix) - 1;
- if (c->protocol == binary_prot) {
- /* Strip off the generic error prefix; it's irrelevant in binary */
- if (!strncmp(ascii_error, error_prefix, error_prefix_len)) {
- ascii_error += error_prefix_len;
- }
- write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, ascii_error, 0);
- } else {
- out_string(c, ascii_error);
- }
- }
- /*
- * 解析文本协议时,执行set/add/replace 命令并把value数据塞到item之后,调用此函数。
- * 这个函数的作用比较简单,主要是做一些收尾工作。
- *
- */
- static void complete_nread_ascii(conn *c) {
- assert(c != NULL);
- item *it = c->item;
- int comm = c->cmd;
- enum store_item_type ret;
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
- if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
- out_string(c, "CLIENT_ERROR bad data chunk");
- } else {
- ret = store_item(it, comm, c);
- }
- switch (ret) {
- case STORED:
- out_string(c, "STORED");
- break;
- //省略其它
- default:
- out_string(c, "SERVER_ERROR Unhandled storage type.");
- }
- }
- //释放引用,注意仅仅是一个释放引用的逻辑,不一定把内存空间给释放掉
- //具体是否要把内存空间释放,说见item_remove和do_item_remove函数。而在此处,
- //一般都只是释放引用而已。
- item_remove(c->item); /* release the c->item reference */
- c->item = 0;
- }
- //重置命令处理
- static void reset_cmd_handler(conn *c) {
- c->cmd = -1;
- c->substate = bin_no_state;
- if(c->item != NULL) {
- item_remove(c->item);
- c->item = NULL;
- }
- conn_shrink(c);
- /**
- 不知道你有没有在下面这个地方困惑过。。反正我是纠结了很久。。。
-
- 这个c->rbytes到底什么时候开始不为0了?为0的话,怎么执行这次的命令?
- 回顾一下我们是怎么会调用到reset_cmd_handler这个地方:
- 1)主线程监听到listen fd有连接上来,进入drive_machine的conn_listen状态,
- 然后accept连接得到client fd,再dispatch_conn_new把client fd丢给某个worker线程
- 2)某个worker线程收到主线程丢过来的client fd之后,把它加到那个worker线程自己的
- event_base,然后注册event_handler(event_handler主要是调drive_machine)作为事件处理函数,
- 在注册event_handler的同时,初始化了一个conn对象作为drive_machine的参数,
- 而这个对象中的rbytes为0。
- 3)当worker线程监听的client fd第一次有命令请求过来时(注意是第一次),例如set key 0 0 4\r\n,
- worker线程的收到通知,然后陷入了event_handler再到drive_machine中去。。
- 而此时,传过来的conn* c,c->state 为conn_new_cmd,而c->rbytes确实为0!!
- (再次强调是第一次有命令请求过来时)
- 就在这个时候就进入reset_cmd_handler,当前你看到的这个地方了。继续分析之后做了些啥:
- 4)第3)点得知,c->rbytes由此确实为0,所以在这种情况,必然会进入下面的else分支,
- 状态机进入conn_waiting状态。。。。
- 5)进入conn_waiting状态做了啥?就是把连接状态由conn_waiting变成conn_read,然后stop = true,退出
- 状态机。没错,退出状态机了!结束本次event_handler了!这就是比较纠结之处,这次命令请求触发的
- event_handler居然只做了把状态由conn_new_cmd变成conn_read,然后再等待下次事件通知。
- (详见conn_waiting分支代码)
- 6)那刚才那个命令请求怎么办?压根没有去读?例如set key 0 0 4\r\n这个命令的实质作用被忽略了吗?
- 其实没有。。原因是libevent默认的是水平触发,也就是说,这个命令还没被读,下次继续触发。。。
- 下次event_base会因为刚才那个命令再触发通知告诉worker线程,再次进入drive_machine,只是此时
- c->state是conn_read状态,conn_read分支才真正把这个命令执行!
- 也就是说,当worker线程监听的client fd "第一次"有命令过来的时候,会触发两次event_base的通知!!
- */
- //这个rbytes大于0的情况,是当一次event中包含多个命令,
- //或者说多个\r\n时候,程序执行到第二个及以后命令的时候出现。
- if (c->rbytes > 0) {
- conn_set_state(c, conn_parse_cmd);
- } else {
- conn_set_state(c, c
- onn_waiting);
- }
- }
- enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
- char *key = ITEM_key(it);
- item *old_it = do_item_get(key, it->nkey, hv); //拿出旧的item
- enum store_item_type stored = NOT_STORED;
- item *new_it = NULL;
- int flags;
- if (old_it != NULL && comm == NREAD_ADD) {
- do_item_update(old_it); //更新item信息,其实主要是更新最近使用信息,即lru链表,。
- } else if (!old_it && (comm == NREAD_REPLACE
- || comm == NREAD_APPEND || comm == NREAD_PREPEND))
- {
- } else if (comm == NREAD_CAS) { //省略
- } else {
- if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
- if (ITEM_get_cas(it) != 0) {
- // CAS much be equal
- if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
- stored = EXISTS;
- }
- }
- if (stored == NOT_STORED) {
- flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
- new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);
- if (new_it == NULL) {
- if (old_it != NULL)
- do_item_remove(old_it);
- return NOT_STORED;
- }
- if (comm == NREAD_APPEND) {
- memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
- memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
- } else {
- memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
- memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
- }
- it = new_it;
- }
- }
- //SET 命令会直接跑来这里,不仔细看还真不知道 -_-||
- /**
- 会跑来这里的,NOT_STORED的都是还没执行"link"操作的情况。
- 像NREAD_APPEND这些命令都是有link过的。
- 至于什么是link,具体往下看。
- */
- if (stored == NOT_STORED) {
|
请发表评论