在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
以前写过一篇关于如何使用多线程推升推送速度(http://www.cnblogs.com/bai-jimmy/p/5177433.html),能够到达5000qps,其实已经可以满足现在的业务,不过在看nginx的说明文档时,又提到nginx支持线程池来提升响应速度, 一直对如何实现线程池很感兴趣,利用周末的时间参考别人的代码,自己写了一个初级版,并且调通了,还没在实际开发中应用,不知道效果如何
代码如下:
pd_log.h #ifndef __pd_log_ #define __pd_log_ #define LOG_DEBUG_PATH "debug.log" #define LOG_ERROR_PATH "error.log" /** * define log level */ enum log_level { DEBUG = 0, ERROR = 1 }; #define error(...) \ logger(ERROR, __LINE__, __VA_ARGS__) #define debug(...) \ logger(DEBUG, __LINE__, __VA_ARGS__) #define assert(expr, rc) \ if(!(expr)){ \ error(#expr"is null or 0"); \ return rc; \ } #endif
pd_log.c #include <stdio.h> #include <stdlib.h> #include <stdarg.h> #include <time.h> #include "pd_log.h" /** * get now timestr */ static void get_time(char *time_str, size_t len) { time_t tt; struct tm local_time; time(&tt); localtime_r(&tt, &local_time); strftime(time_str, len, "%m-%d %H:%M:%S", &local_time); } /** * log */ static void logger(int flag, int line, const char *fmt, ...) { FILE *fp = NULL; char time_str[20 + 1]; va_list args; get_time(time_str, sizeof(time_str)); switch (flag) { case DEBUG: fp = fopen(LOG_DEBUG_PATH, "a"); if (!fp) { return; } fprintf(fp, "%s DEBUG (%d:%d) ", time_str, getpid(), line); break; case ERROR: fp = fopen(LOG_ERROR_PATH, "a"); if (!fp) { return; } fprintf(fp, "%s ERROR (%d:%d) ", time_str, getpid(), line); break; default: return; } va_start(args, fmt); vfprintf(fp, fmt, args); va_end(args); fprintf(fp, "\n"); fclose(fp); return; }
pd_pool.h /** * 线程池头文件 * @author jimmy * @date 2016-5-14 */ #ifndef __PD_POOL_ #define __PD_POOL_ /*任务链表*/ typedef struct task_s{ void (*routine)(void *); void *argv; struct task_s *next; } pd_task_t; /*任务队列*/ typedef struct queue_s{ pd_task_t *head; pd_task_t **tail; size_t max_task_num; size_t cur_task_num; }pd_queue_t; /*线程池*/ typedef struct pool_s{ pthread_mutex_t mutex; pthread_cond_t cond; pd_queue_t queue; size_t thread_num; //size_t thread_stack_size; }pd_pool_t; /*初始化线程池*/ //pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_stack_size, size_t thread_max_num); #endif
pd_poo.c /** * 线程池 * @author jimmy * @date 2016-5-14 */ #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include "pd_log.h" #include "pd_log.c" #include "pd_pool.h" /*tsd*/ pthread_key_t key; void *pd_worker_dispatch(void *argv){ ushort exit_flag = 0; pd_task_t *a_task; pd_pool_t *a_pool = (pd_pool_t *)argv; if(pthread_setspecific(key, (void *)&exit_flag) != 0){ return NULL; } /*动态从任务列表中获取任务执行*/ while(!exit_flag){ pthread_mutex_lock(&a_pool->mutex); /*如果此时任务链表为空,则需要等待条件变量为真*/ while(a_pool->queue.head == NULL){ pthread_cond_wait(&a_pool->cond, &a_pool->mutex); } /*从任务链表中任务开支执行*/ a_task = a_pool->queue.head; a_pool->queue.head = a_task->next; a_pool->queue.cur_task_num--; if(a_pool->queue.head == NULL){ a_pool->queue.tail = &a_pool->queue.head; } /*解锁*/ pthread_mutex_unlock(&a_pool->mutex); /*执行任务*/ a_task->routine(a_task->argv); //core free(a_task); a_task = NULL; } pthread_exit(0); } /** * 根据线程数创建所有的线程 */ static int pd_pool_create(pd_pool_t *a_pool){ int i; pthread_t tid; for(i = 0; i < a_pool->thread_num; i++){ pthread_create(&tid, NULL, pd_worker_dispatch, a_pool); } return 0; } /** * 线程退出函数 */ void pd_pool_exit_cb(void *argv){ unsigned int *lock = argv; ushort *exit_flag_ptr = pthread_getspecific(key); *exit_flag_ptr = 1; pthread_setspecific(key, (void *)exit_flag_ptr); *lock = 0; } /** * 线程池初始化 */ pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_max_num){ pd_pool_t *a_pool = NULL; a_pool = calloc(1, sizeof(pd_pool_t)); if(!a_pool){ error("pool_init calloc fail: %s", strerror(errno)); return NULL; } a_pool->thread_num = thread_num; //初始化队列参数 a_pool->queue.max_task_num = thread_max_num; a_pool->queue.cur_task_num = 0; a_pool->queue.head = NULL; a_pool->queue.tail = &a_pool->queue.head; //初始化tsd if(pthread_key_create(&key, NULL) != 0){ error("pthread_key_create fail: %s", strerror(errno)); goto err; } //初始化互斥锁 if(pthread_mutex_init(&a_pool->mutex, NULL) != 0){ error("pthread_mutex_init fail: %s", strerror(errno)); pthread_key_delete(key); goto err; } //初始化条件变量 if(pthread_cond_init(&a_pool->cond, NULL) != 0){ error("pthread_cond_init fail: %s", strerror(errno)); pthread_mutex_destroy(&a_pool->mutex); goto err; } //创建线程池 if(pd_pool_create(a_pool) != 0){ error("pd_pool_create fail: %s", strerror(errno)); pthread_mutex_unlock(&a_pool->mutex); pthread_cond_destroy(&a_pool->cond); goto err; } return a_pool; err: free(a_pool); return NULL; } /** * 向线程池中添加任务.. */ int pd_pool_add_task(pd_pool_t *a_pool, void (*routine)(void *), void *argv){ pd_task_t *a_task = NULL; a_task = (pd_task_t *)calloc(1, sizeof(pd_task_t)); if(!a_task){ error("add task calloc faile: %s", strerror(errno)); return -1; } a_task->routine = routine; a_task->argv = argv; a_task->next = NULL; /*加锁*/ pthread_mutex_lock(&a_pool->mutex); if(a_pool->queue.cur_task_num >= a_pool->queue.max_task_num){ error("cur_task_num >= max_task_num"); goto err; } /*将任务放到末尾*/ *(a_pool->queue.tail) = a_task; a_pool->queue.tail = &a_task->next; a_pool->queue.cur_task_num++; /*通知堵塞的线程*/ pthread_cond_signal(&a_pool->cond); /*解锁*/ pthread_mutex_unlock(&a_pool->mutex); return 0; err: pthread_mutex_unlock(&a_pool->mutex); free(a_task); return -1; } void pd_pool_destroy(pd_pool_t *a_pool){ unsigned int n; unsigned int lock; for(n = 0; n < a_pool->thread_num; n++){ lock = 1; if(pd_pool_add_task(a_pool, pd_pool_exit_cb, &lock) != 0){ error("pd_pool_destroy fail: add_task fail"); return; } while(lock){ usleep(1); } } pthread_mutex_destroy(&a_pool->mutex); pthread_cond_destroy(&a_pool->cond); pthread_key_delete(key); free(a_pool); } /******************************************************************************************/ void testfun(void *argv){ printf("testfun\n"); sleep(1); } int main(){ pd_pool_t *a_pool = pd_pool_init(9, 5); pd_pool_add_task(a_pool, testfun, NULL); pd_pool_add_task(a_pool, testfun, NULL); pd_pool_add_task(a_pool, testfun, NULL); pd_pool_destroy(a_pool); }
|
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论