实现一个高效的网络日志库要解决那些问题?
首先明确一下问题的模型,这是一个典型的多生产者 单消费者问题,对于前端的日志库使用者来说,应该做到非阻塞添加,作为后端的文件写入,应该注意磁盘IO的瓶颈。
功能需求
- 日志的级别分级
- 发生时间和具体线程信息
- 线程安全
实现思路
多个线程共有一个前端,通过后端写入磁盘文件
异步日志是必须的,所以需要一个缓冲区,在这里我们使用的是多缓冲技术,基本思路是准备多块Buffer,前端负责向Buffer中填数据,后端负责将Buffer中数据取出来写入文件,这种实现的好处在于在新建日志消息的时候不必等待磁盘IO操作,前端写的时候也不会阻塞。
实现结构
LogStream 负责写入消息的格式化
LogFile 负责文件写入
AsyncLogging 负责实现 多缓冲技术 协调前后端
LogFile
如果有必要就给日至文件加锁
void LogFile::append(const char* logline, int len)
{
if (mutex_)
{
MutexLockGuard lock(*mutex_);
append_unlocked(logline, len);
}
else
{
append_unlocked(logline, len);
}
}
LogStream
重写流操作符
构造一个格式转化类,给日志中消息提供一个统一的格式
AsyncLogging
是及实现采用了四个缓冲区,这样可以进一步减少前端等待,数据结构
typedef boost::ptr_vector<LargeBuffer> BufferVector;
typedef BufferVector::auto_type BufferPtr;
MutexLock lock;
Condition cond;
BufferPtr nextBuffer;
BufferVector buffers_;
append的具体实现
在当前的缓冲区和备用缓冲区中选择一个足够使用的进行写入。
void AsynLogging::append(const char* logline, int len){
LockGuard(mutex);
if(curbuf->avail() > len){//当前缓冲区足够
curbuf->append(logline,len);
}
else{
buffers.push_back(curbuf->release());
if(nextbuf){
curbuf = std::move(nextbuf);
}
else{
curbuf.reset(new LargeBuffer);
}
curbuf->append(logline, len);
cond.notify();
}
}
- 接收方的后端实现
首先准备好两块空闲的buffer,已备在临界区内交换,等待条件标量出发的条件又两个,超时或者是前端写满了一个或者多个Buffer,当条件满足时,先将当前缓冲移入buffer,并且立刻将空闲的newBuffer1作为当前缓冲,接下来将buffers和buffersToWrite交换,随后将buffersToWrite写入文件,重新设计设置Buffer。
void AsyncLogging::threadFunc(){
BufferPtr nweBuffer1(new LargeBuffer);
BufferPtr newBuffer2(new LargeBuffer);
BufferVector bufferToWrite;
while(running_){
{
MutexLockGuard lock(mutex);
if(buffers.empty()){
cond.wait_for(muted,flushInterval_);
}
buffers.push_back(currentBuffer_.release());
currentBuffer = move(newBuffer1);
buffersTowrite.swap(buffers_);
if(!nextBuf){
nextBuf = std::move(newBuffer2);
}
}
}
}
交给后端去写入,以及重新设置两个缓冲区
for (size_t i = 0; i < buffersToWrite.size(); ++i)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffersToWrite[i].data(), buffersToWrite[i].length());
}
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
|
请发表评论