在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
在csapp学习或者其他linux底层编程的过程中,一般都会举一些多线程或多进程的例子,配合底层同步原语、系统调用api来解释怎么创建多线程/多进程。 但是这些例子和实际项目中所用到的多线程/多进程编程知识有很大的距离(小例子很好理解,但是为了完成一个任务基本就没有什么思路了)。 我学习多线程/多进程编程由4个月了,一开始就知道有线程池实现的问题(面试现场白板撸代码的),可是怎么都实现不了,这一次说什么都要学会(那怕是从别的blog中copy的代码)。 什么是线程池? 线程池ThreadPool 就是为了存放“线程”的对象池,它的引入就是为了:限制应用程序中同一时刻运行的线程数。根据系统的环境,可以手动或者自动设置线程数量,达到最优效果。 在程序中,如果某个创建某种对象所需要的代价太高,同时这个对象又可以反复使用,那么我们往往就会准备一个容器,用来保存一批这样的对象。于是乎,我们想要用这种对象时,就不需要每次去创建一个,而直接从容器中取出一个现成的对象就可以了。由于节省了创建对象的开销,程序性能自然就上升了。这个容器就是“池”。很容易理解的是,因为有了对象池,因此在用完对象之后必须有一个“归还”的动作,这样便可以把对象放回池中,下次需要的时候就可以再次拿出来使用了。[来自http://www.cnblogs.com/JeffreyZhao/archive/2009/07/22/thread-pool-1-the-goal-and-the-clr-thread-pool.html] 对于线程是稀缺资源,创建和销毁开销比较大。我们可以在一开始就直接创建一批线程对象放在线程池中,然后需要用到线程对象的时候从线程池中取,线程对象用完了放入线程池中。在线程池内部,任务被插入到一个task队列(任务队列,这个在线程池的实现过程中都会用到的数据结构,你可以看看网络上每一个线程池的例子都会包括这个任务队列)上,线程池的线程会去取这个队列的任务。当一个新任务插入到队列时,一个空闲线程就会从任务队列上取job。这是最直观的线程池工作过程。 线程池进程用在多线程服务器上,每个通过网络到达服务器的链接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。 线程池的作用: 合理利用线程池能够带来三个好处。 线程池实现了系统中线程的数量。 线程池的实现: 我们需要解决的是这些线程对象是由谁来创建的,这些线程对象是在什么创建的? Java中jdk1.5之后加入了java.util.concurrent包,封装好了线程池。里面有很多参数和属性,可以编写并发程序。 但是c++中,自己封装同步原语,自己调 系统调用,造个轮子。
使用的同步原语有 pthread_mutex_t mutex_l;//互斥锁 pthread_cond_t condtion_l;//条件变量 使用的系统调用有 pthread_mutex_init(); pthread_cond_init(); pthread_create(&thread_[i],NULL,threadFunc,this) pthread_mutex_lock() pthread_mutex_unlock() pthread_cond_signal() pthread_cond_wait() pthread_cond_broadcast(); pthread_join() pthread_mutex_destory() pthread_cond_destory()
上面函数、变量的意思可以在这里找到答案[ http://www.cnblogs.com/li-daphne/p/5558435.html ]。
实现的思路--->就是怎么组装这些函数和原语: 在ThreadPool中封装基础原语和任务队列,利用面向对象的方法,具体的Mytask任务派生自Task,可以利用多态的机制向ThreadPool中添加一些任务。
具体的代码在这里 #include <iostream> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #include "ThreadPool.h" using namespace std; class Mytask : public lz::Task{ public: Mytask(){} virtual int run(){ printf("thread[%lu] : %s\n",pthread_self(),(char*)this->arg_); sleep(1); return 0; } }; int main() { cout << "begin" << endl; char szTmp[] = "hello world"; Mytask taskobj; taskobj.setArg((void*)szTmp); lz::ThreadPool threadPool(10); threadPool.start(); for(int i = 0;i<20;i++){ threadPool.addTask(&taskobj); } while(1){ printf("there are still %d tasks need to process\n",threadPool.size()); if(threadPool.size()==0){ threadPool.stop(); printf("now i will exit from main\n"); exit(0); } sleep(2); } cout<<"end"<<endl; return 0; } ---------- #ifndef THREADPOOL_H_INCLUDED #define THREADPOOL_H_INCLUDED /********************************************** *Project:TinyThreadPool * *Author:lizhen *email:[email protected] * ***********************************************/ #include <deque> #include <string> #include <pthread.h> #include <string.h> #include <stdlib.h> // 使用C++98 语言规范实现的线程池: 面向对象做法,每一个job都是Task继承类的对象 namespace lz { class Task { public: Task(void* arg = NULL, const std::string taskName = "") : arg_(arg) , taskName_(taskName) { } virtual ~Task() { } void setArg(void* arg) { arg_ = arg; } virtual int run() = 0; protected: void* arg_; std::string taskName_; }; class ThreadPool { public: ThreadPool(int threadNum = 10); ~ThreadPool(); public: size_t addTask(Task *task); void stop(); int size(); void start(); Task* take(); private: int createThreads(); static void* threadFunc(void * threadData); private: ThreadPool& operator=(const ThreadPool&); ThreadPool(const ThreadPool&); private: volatile bool isRunning_; int threadsNum_; pthread_t* threads_; std::deque<Task*> taskQueue_; pthread_mutex_t mutex_; pthread_cond_t condition_; }; } #endif // THREADPOOL_H_INCLUDED ------ /********************************************** *Project:TinyThreadPool * *Author:lizhen *email:[email protected] * ***********************************************/ #include "ThreadPool.h" #include <stdio.h> #include <assert.h> namespace lz { ThreadPool::ThreadPool(int threadNum) { threadsNum_ = threadNum; //isRunning_ = true; } void ThreadPool::start(){ createThreads(); isRunning_ = true; } ThreadPool::~ThreadPool() { stop(); for(std::deque<Task*>::iterator it = taskQueue_.begin(); it != taskQueue_.end(); ++it) { delete *it; } taskQueue_.clear(); } int ThreadPool::createThreads() { pthread_mutex_init(&mutex_, NULL); pthread_cond_init(&condition_, NULL); threads_ = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum_); for (int i = 0; i < threadsNum_; i++) { pthread_create(&threads_[i], NULL, threadFunc, this); } return 0; } size_t ThreadPool::addTask(Task *task) { pthread_mutex_lock(&mutex_); taskQueue_.push_back(task); int size = taskQueue_.size(); pthread_mutex_unlock(&mutex_); pthread_cond_signal(&condition_); return size; } void ThreadPool::stop() { if (!isRunning_) { return; } isRunning_ = false; pthread_cond_broadcast(&condition_); for (int i = 0; i < threadsNum_; i++) { pthread_join(threads_[i], NULL); } free(threads_); threads_ = NULL; pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&condition_); } int ThreadPool::size() { pthread_mutex_lock(&mutex_); int size = taskQueue_.size(); pthread_mutex_unlock(&mutex_); return size; } Task* ThreadPool::take() { Task* task = NULL; while (!task) { pthread_mutex_lock(&mutex_); while (taskQueue_.empty() && isRunning_) { pthread_cond_wait(&condition_, &mutex_); } if (!isRunning_) { pthread_mutex_unlock(&mutex_); break; } else if (taskQueue_.empty()) { pthread_mutex_unlock(&mutex_); continue; } assert(!taskQueue_.empty()); task = taskQueue_.front(); taskQueue_.pop_front(); pthread_mutex_unlock(&mutex_); } return task; } void* ThreadPool::threadFunc(void* arg) { pthread_t tid = pthread_self(); ThreadPool* pool = static_cast<ThreadPool*>(arg); while (pool->isRunning_) { Task* task = pool->take(); if (!task) { printf("thread %lu will exit\n", tid); break; } assert(task); task->run(); } return 0; } //bool ThreadPool::getisRunning_(){return isRunning_;} }
|
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论