这两天剖析了一下socket.lua,整体不是很难,主要是数据缓冲区的实现需要好好分析一下。
这里读写数据也是用到了缓冲池的思想,为了更加直观的说明代码,还有方便测试,我去掉lua代码,把核心接口直接用C++实现了一遍:
#include <stdio.h>
#include <string.h>
#include <vector>
using std::vector;
#define LARGE_PAGE_NODE 12
struct buffer_node {
char* msg;
int sz;
struct buffer_node* next;
};
struct socket_buffer {
int size;
int offset;
struct buffer_node *head;
struct buffer_node *tail;
};
struct buffer_pool {
buffer_node** pool;
buffer_node* head;
int len;
buffer_pool() {
len = 0;
pool = NULL;
head = NULL;
}
buffer_node* new_pool(int size) { //构建size个缓冲池
len = size;
pool = new buffer_node*[size];
for (int i = 0; i < size; i++) {
pool[i] = new buffer_node;
}
for (int i = 0; i < size; i++) {
pool[i]->msg = NULL;
pool[i]->sz = 0;
pool[i]->next = pool[i + 1];
}
pool[len - 1]->next = 0;
head = pool[0];
return head;
}
};
int push_buffer(socket_buffer* sb, buffer_pool* pool, char* msg, int sz) //写数据到缓冲区
{
buffer_node* free_node = NULL;
if (pool->head == NULL) {
int len = pool->len + 1;
int size = 8;
if (len <= LARGE_PAGE_NODE - 3 ) {
size <= len;
} else {
size <= LARGE_PAGE_NODE - 3;
}
free_node = pool->new_pool(size);
}
else {
free_node = pool->head;
}
pool->head = free_node->next;
free_node->msg = msg;
free_node->sz = sz;
free_node->next = NULL;
if (sb->head == NULL) {
sb->head = sb->tail = free_node;
} else {
sb->tail->next = free_node;
sb->tail = free_node;
}
sb->size += sz;
return sb->size;
}
void return_free_node(buffer_pool* pool, socket_buffer* sb) //返回给缓冲池
{
buffer_node* free_node = sb->head;
sb->offset = 0;
sb->head = free_node->next;
if (sb->head == NULL) {
sb->tail = NULL;
}
free_node->next = pool->head;
//delete free_node->msg; //如果是堆上数据这里必须要释放
free_node->msg = NULL;
free_node->sz = 0;
pool->head = free_node;
}
char* read_buffer(buffer_pool* pool, socket_buffer* sb, int sz) //读取缓冲区sz个字节
{
if (sb->size < sz || sz == 0) {
return NULL;
} else {
sb->size -= sz;
buffer_node* cur = sb->head;
char* msg = new char[sz + 1];
if (sz < cur->sz - sb->offset) {
memcpy(msg, cur->msg + sb->offset, sz);
sb->offset += sz;
msg[sz] = 0;
return msg;
}
if (sz == cur->sz - sb->offset) {
memcpy(msg, cur->msg + sb->offset, sz);
return_free_node(pool, sb);
return msg;
}
}
}
char* readall(buffer_pool* pool, socket_buffer* sb) //读取缓冲区所有的字节
{
vector<char> vt;
while(sb->head) {
sb->head;
buffer_node* cur = sb->head;
int len = cur->sz - sb->offset;
int raw_len = vt.size();
vt.resize(len + raw_len);
memcpy(&vt[raw_len], cur->msg + sb->offset, len);
return_free_node(pool, sb);
}
sb->size = 0;
int len = vt.size();
char* msg = new char[len + 1];
memcpy(msg, &vt[0], len);
msg[len] = 0;
return msg;
}
int main()
{
socket_buffer* sb = new socket_buffer;
buffer_pool* pool = new buffer_pool;
char msg1[10] = "123456";
push_buffer(sb, pool, msg1, 10); //第一次往缓冲区写10个字节
push_buffer(sb, pool, "abcdef", 6); //第二次往缓冲区写6个字节
char* msg = read_buffer(pool, sb, 3); //从缓冲区读3个字节
printf("msg is %s\n", &msg[0]);
delete sb;
delete pool;
}
为了更加清楚的讲明白代码的原理,我喜欢用一张张图来描述其中的要点,这篇也不例外。
当socket数据到来时要用缓冲区来保存数据。刚开始的时候,缓冲池是空的,所以遇到缓冲池为空的情况我们就要创建几个缓冲区数据构成缓冲池。每个缓冲区有数据指针以及数据大小字段,当然缓冲区是以链表的形式连在一块的,如下图所示:
构建好缓冲池之后就可以往里面写数据了,我们取下头结点作为容器来盛装数据,剩下的作为空闲的缓冲区。为了描述这个缓冲区数据,用一个指针指向这个缓冲区,且具有大小字段。由于读缓冲区数据也需要记录一些数据,例如读取了多少,所以我们定义一个数据结构来描述当前缓冲区的状况:
如果我们一次性读取的数据的大小超过了一个缓冲区数据大小,触及到了下一个缓冲,区怎么办呢?所以这个数据结构应该是这样的:
可以看到他有一头一尾两个指针,来指向所需要用到缓冲区,构成一个闭环。发现云风很喜欢用这个数据结构,在全局消息队列中也用到了这个。注意,size字段是指所有结点数据长度之和,offset字段仅仅是当前结点的偏移量。例如读取缓冲池中的数据时,第一个结点的数据全部读完,第二个结点只读了部分,那么此时的offset就是指在第二个结点读取的数据长度,而不是整个已读取的长度。这是因为要记录当前结点已读取了多少数据,以便下次读取。当结点数据全部读完时,offset值为0,再读取新一个结点时才又开始重新计算offset。
当某个缓冲区的数据读取完毕之后,该缓冲区就要被回收,重复使用,这也是池的理念之一,如下图:
空闲缓冲区的头结点将指向这个新回收的缓冲区,以备下次使用。
下面中重点介绍写入和读取缓冲区数据的几个函数。
socket.read(id,sz)将会读取指定字节的数据,他会调用底层的lpopbuffer函数,就是上面的read_buffer函数。如果缓冲池中没有足够的数据,那么将会返回nil,此时read函数调用suspend阻塞,直到有新的数据到来,缓冲池中的数据满足要求。这个是利用协程的暂停和恢复实现的。
有新的数据到来时socket回调函数会被触发,数据被写入缓冲池中,这个底层是调用lpushbuffer实现的,对应上面的push_buffer函数。注意这个函数会返回缓冲池中所有未读数据的长度,所有一旦缓冲池中的数据达到socket.read的长度要求,那么将会调用wakeup唤醒之前被挂起的协程,从而再次调用lpopbuffer函数获取数据。
如果sz参数是空,则将读取整个缓冲区的数据。他在调用底层的lreadall函数,读取整个缓冲池中的数据。
测试例子参考:
https://github.com/shonm520/mu_event
欢迎加入QQ群 858791125 讨论skynet,游戏后台开发,lua脚本语言等问题。
|
请发表评论