线程池与线程封装
1. 线程池
1. ThreadPool.hpp
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 
 | #pragma once#include <iostream>
 #include <vector>
 #include <string>
 #include <queue>
 #include <pthread.h>
 #include <unistd.h>
 using namespace std;
 
 
 struct ThreadInfo
 {
 pthread_t tid;
 string name;
 };
 
 static const int default_num = 5;
 
 template <class T>
 class ThreadPool
 {
 public:
 
 void Lock()
 {
 pthread_mutex_lock(&mutex_);
 }
 
 
 void Unlock()
 {
 pthread_mutex_unlock(&mutex_);
 }
 
 
 void Wakeup()
 {
 pthread_cond_signal(&cond_);
 }
 
 
 void ThreadSleep()
 {
 pthread_cond_wait(&cond_, &mutex_);
 }
 
 
 bool IsQueueEmpty()
 {
 return tasks_.empty();
 }
 
 
 string GetThreadName(pthread_t tid)
 {
 for (const auto &ti : threads_)
 {
 if (ti.tid == tid)
 return ti.name;
 }
 
 return "Unknown";
 }
 
 public:
 
 static void *HandlerTask(void *args)
 {
 ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
 string name = tp->GetThreadName(pthread_self());
 
 while (true)
 {
 tp->Lock();
 
 
 while (tp->IsQueueEmpty())
 {
 cout << name << " 等待任务..." << endl;
 tp->ThreadSleep();
 }
 
 T t = tp->Pop();
 tp->Unlock();
 
 t();
 cout << name << " 运行任务,结果是:" << t.get_result() << endl;
 }
 
 return nullptr;
 }
 
 
 void Start()
 {
 int num = threads_.size();
 cout << "启动 " << num << " 个线程..." << endl;
 
 for (int i = 0; i < num; i++)
 {
 threads_[i].name = "[线程 " + to_string(i + 1) + "]";
 
 pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
 cout << "创建线程 " << threads_[i].name << endl;
 }
 }
 
 
 T Pop()
 {
 T t = tasks_.front();
 tasks_.pop();
 return t;
 }
 
 
 void Push(const T &t)
 {
 Lock();
 tasks_.push(t);
 Wakeup();
 Unlock();
 }
 
 
 static ThreadPool<T> *GetInstance()
 {
 
 if (nullptr == tp_)
 {
 pthread_mutex_lock(&lock_);
 
 if (nullptr == tp_)
 {
 cout << "log:单例线程池首次创建完成!" << endl;
 tp_ = new ThreadPool<T>();
 }
 pthread_mutex_unlock(&lock_);
 }
 
 return tp_;
 }
 
 private:
 
 ThreadPool(int num = default_num) : threads_(num)
 {
 pthread_mutex_init(&mutex_, nullptr);
 pthread_cond_init(&cond_, nullptr);
 }
 
 
 ~ThreadPool()
 {
 pthread_mutex_destroy(&mutex_);
 pthread_cond_destroy(&cond_);
 }
 
 
 
 ThreadPool(const ThreadPool<T> &) = delete;
 
 
 
 const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
 
 private:
 vector<ThreadInfo> threads_;
 queue<T> tasks_;
 
 pthread_mutex_t mutex_;
 pthread_cond_t cond_;
 
 
 
 static ThreadPool<T> *tp_;
 
 
 
 static pthread_mutex_t lock_;
 };
 
 
 
 
 template <class T>
 ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
 
 
 template <class T>
 pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
 
 | 
2. Task.hpp
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 
 | #pragma once#include <iostream>
 #include <string>
 using namespace std;
 
 const string opers = "+-*/%";
 
 
 enum
 {
 SUCCESS = 0,
 DIV_ERROR = 1,
 MOD_ERROR = 2,
 UNKNOWN_ERROR = 3
 };
 
 class Task
 {
 public:
 
 Task()
 :_x(0), _y(0), _op('+'), _ret(0), _code(SUCCESS)
 {}
 
 
 Task(int x, int y, char op = '+')
 :_x(x),
 _y(y),
 _op(op),
 _ret(0),
 _code(SUCCESS)
 {
 
 if(op != '+' && op != '-' && op != '*' && op != '/' && op != '%')
 {
 _op = '+';
 _code = UNKNOWN_ERROR;
 }
 }
 
 
 void run()
 {
 
 _ret = 0;
 _code = SUCCESS;
 
 switch(_op)
 {
 case '+':
 _ret = _x + _y;
 break;
 case '-':
 _ret = _x - _y;
 break;
 case '*':
 _ret = _x * _y;
 break;
 case '/':
 if(_y == 0)
 {
 _code = DIV_ERROR;
 _ret = 0;
 }
 else
 {
 _ret = _x / _y;
 }
 break;
 case '%':
 if(_y == 0)
 {
 _code = MOD_ERROR;
 _ret = 0;
 }
 else
 {
 _ret = _x % _y;
 }
 break;
 default:
 _code = UNKNOWN_ERROR;
 _ret = 0;
 break;
 }
 }
 
 
 void operator()()
 {
 run();
 }
 
 
 string get_task() const
 {
 return to_string(_x) + _op + to_string(_y) + "= ???";
 }
 
 
 string get_ret() const
 {
 string ret = to_string(_x) + _op + to_string(_y) + "=" + to_string(_ret) +
 " [错误代码:" + to_string(_code) + "]";
 return ret;
 }
 
 
 char get_operator() const
 {
 return _op;
 }
 
 
 int get_first_operand() const
 {
 return _x;
 }
 
 
 int get_second_operand() const
 {
 return _y;
 }
 
 
 int get_result() const
 {
 return _ret;
 }
 
 
 int get_error_code() const
 {
 return _code;
 }
 
 
 ~Task()
 {
 }
 
 private:
 int _x, _y;
 int _ret;
 char _op;
 int _code;
 };
 
 | 
3. Main.cc
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 
 | #include "ThreadPool.hpp"#include "Task.hpp"
 #include <signal.h>
 
 
 volatile bool running = true;
 
 
 void signalHandler(int signum)
 {
 cout << "\n接收到信号 " << signum << ",正在退出..." << endl;
 running = false;
 }
 
 int main()
 {
 
 signal(SIGINT, signalHandler);
 signal(SIGTERM, signalHandler);
 
 cout << "线程池启动中..." << endl;
 sleep(1);
 
 ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance();
 tp->Start();
 
 srand(time(nullptr) ^ getpid());
 
 while(running)
 {
 
 int x = rand() % 20 + 1;
 usleep(10);
 int y = rand() % 10;
 char op = opers[rand() % opers.size()];
 
 Task t(x, y, op);
 
 
 tp->Push(t);
 
 cout << "[主线程] 创建任务: " << t.get_task() << endl;
 
 sleep(1);
 }
 
 cout << "程序正常退出" << endl;
 return 0;
 }
 
 | 
2. C++ 语言层面上的线程封装 demo(简易)
MyThread.hpp
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 
 | #pragma once#include <iostream>
 #include <pthread.h>
 #include <string>
 #include <ctime>
 using namespace std;
 
 typedef void (*callback_t)();
 static int thread_num = 0;
 
 class Thread
 {
 public:
 static void* thread_func(void* arg)
 {
 Thread* thread = static_cast<Thread*>(arg);
 thread->Enter_callback();
 
 }
 public:
 Thread(callback_t cb)
 :tid_(0),
 name_(""),
 start_timestamp_(0),
 isrunning_(false),
 cb_(cb)
 {
 
 }
 
 ~Thread()
 {
 
 }
 
 void run()
 {
 name_ = "thread-" + to_string(thread_num++);
 start_timestamp_ = time(nullptr);
 isrunning_ = true;
 pthread_create(&tid_, nullptr, thread_func, this);
 }
 
 void Enter_callback()
 {
 cb_();
 }
 
 bool is_runing()
 {
 return isrunning_;
 }
 
 uint64_t start_timestamp()
 {
 return start_timestamp_;
 }
 
 string name()
 {
 return name_;
 }
 
 void join()
 {
 pthread_join(tid_, nullptr);
 isrunning_ = false;
 }
 
 private:
 pthread_t tid_;
 string name_;
 uint64_t start_timestamp_;
 bool isrunning_;
 
 callback_t cb_;
 };
 
 | 
2. Main.cc
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 
 | #include "MyThread.hpp"#include <vector>
 #include <unistd.h>
 
 int threads_num = 3;
 
 void print()
 {
 int count = 0;
 while(count < threads_num)
 {
 cout << "我是一个 C++ 语言层面上封装的线程!" << "代号是:" << count++ << endl;
 sleep(1);
 }
 
 cout << "所有线程都结束了!" << endl;
 }
 
 int main()
 {
 vector<Thread> threads;
 for (int i = 0; i < threads_num; ++i)
 {
 threads.push_back(Thread(print));
 }
 
 cout << "开始启动所有线程喽~" << endl;
 
 for(auto &x : threads)
 {
 x.run();
 cout << "线程 " << x.name() << " 启动成功!其时间戳的值是:" << x.start_timestamp() << endl;
 }
 
 for(auto &x : threads)
 {
 x.join();
 }
 
 cout << "所有线程都结束了!" << endl;
 
 return 0;
 }
 
 | 
当然,这个 demo 仅实现了两个函数的封装,也没有进行加锁,会导致数据竞争,整体来说并不完美,仅为了展示如何实现底层封装。