World Ocean Simulation System (WOSS) library
thread-pool-definitions.h
Go to the documentation of this file.
1/* WOSS - World Ocean Simulation System -
2 *
3 * Copyright (C) 2009 2025 Federico Guerra
4 * and regents of the SIGNET lab, University of Padova
5 *
6 * Author: Federico Guerra - WOSS@guerra-tlc.com
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 as
10 * published by the Free Software Foundation;
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 */
21
22
32#ifndef THREADPOOL_DEFINITIONS_H
33#define THREADPOOL_DEFINITIONS_H
34
35#ifdef WOSS_MULTITHREAD
36
37#include <iostream>
38#include <vector>
39#include <queue>
40#include <thread>
41#include <mutex>
42#include <condition_variable>
43#include <future>
44#include <functional>
45
46namespace woss {
47
52 class ThreadPool {
53
54 public:
55
59 using Task = std::function< void() >;
60
66 explicit ThreadPool(size_t n_threads)
67 : num_threads(n_threads),
68 workers(),
69 tasks(),
72 stop_flag(false)
73 {
74 if (num_threads == 0) {
75 num_threads = std::thread::hardware_concurrency();
76 }
77
78 start();
79 }
80
85 stop();
86 }
87
95 template <class F, class... Args>
96 auto submit(F&& f, Args&&... args) -> std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {
97 // Deduce the return type of the function f when called with its arguments.
98 using return_type = decltype(std::forward<F>(f)(std::forward<Args>(args)...));
99
100 // Create a std::packaged_task. This object "packages" the function and its arguments,
101 // making it executable and providing a std::future for its result.
102 // std::bind creates a callable object that binds the arguments to the function,
103 // resulting in a callable that takes no arguments.
104 auto task = std::make_shared< std::packaged_task< return_type() > > (
105 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
106 );
107
108 // Get a future from the packaged_task. This future will be used to retrieve the result.
109 std::future<return_type> res = task->get_future();
110
111 // Check if the pool has already been stopped. If it has, it will return the future
112 // which won't be run in the thread pool.
113 if (stop_flag == true) {
114 return res;
115 }
116
117 {
118 std::unique_lock<std::mutex> lock(queue_mutex);
119 // Add the task to the queue. The task stored in the queue is a lambda
120 // that simply invokes the packaged_task. This lambda conforms to Task = std::function<void()>.
121 tasks.emplace([task]() {
122 (*task)(); // Execute the packaged_task. This will set the result in the associated future.
123 });
124 }
125
126 // Notify one waiting thread that there is a new task to process.
127 task_cond_var.notify_one();
128 return res;
129 }
130
136 size_t getNumThreads() {
137 return num_threads;
138 }
139
140 private:
148 std::vector<std::thread> workers;
152 std::queue<Task> tasks;
156 std::mutex queue_mutex;
160 std::condition_variable task_cond_var;
165
169 void start() {
170 // Start the worker threads.
171 for (size_t i = 0; i < num_threads; ++i) {
172 workers.emplace_back( [this] {
173 // Each worker thread runs an infinite loop to process tasks.
174 while (stop_flag == false) {
175 Task task;
176
177 { // lock scope
178 std::unique_lock<std::mutex> lock(queue_mutex);
179
180 // Wait until there are tasks in the queue OR the pool is stopped.
181 // The lambda predicate prevents spurious wakeups.
182 task_cond_var.wait(lock, [this] {
183 return ((tasks.empty() == false) || (stop_flag == true));
184 });
185
186 // Exit condition: If the pool is stopped,
187 // this thread can terminate gracefully.
188 if (stop_flag == true) {
189 return;
190 }
191 // Retrieve the task from the front of the queue.
192 task = std::move(tasks.front());
193 tasks.pop();
194 }
195 // Execute the task outside the mutex lock.
196 // This is crucial to avoid blocking the queue while a task is running,
197 // allowing other threads to submit or retrieve tasks.
198 task();
199 }
200 });
201 }
202 }
203
207 void stop() {
208 {
209 std::unique_lock<std::mutex> lock(queue_mutex);
210 stop_flag = true;
211 }
212 // Wake up all waiting threads so they can check the 'stop_flag' flag
213 // and terminate gracefully.
214 task_cond_var.notify_all();
215 // Join all worker threads. join() blocks until the thread has finished its execution.
216 for (std::thread& worker : workers) {
217 worker.join();
218 }
219 }
220
221 };
222
223}
224
225#endif // WOSS_MULTITHREAD
226
227#endif // THREADPOOL_DEFINITIONS_H
Definition thread-pool-definitions.h:52
size_t num_threads
Definition thread-pool-definitions.h:144
std::vector< std::thread > workers
Definition thread-pool-definitions.h:148
std::mutex queue_mutex
Definition thread-pool-definitions.h:156
std::condition_variable task_cond_var
Definition thread-pool-definitions.h:160
~ThreadPool()
Definition thread-pool-definitions.h:84
auto submit(F &&f, Args &&... args) -> std::future< decltype(std::forward< F >(f)(std::forward< Args >(args)...))>
Submits a task to the thread pool.
Definition thread-pool-definitions.h:96
std::function< void() > Task
Definition thread-pool-definitions.h:59
size_t getNumThreads()
Gets the number of threads in the pool.
Definition thread-pool-definitions.h:136
void stop()
Definition thread-pool-definitions.h:207
std::queue< Task > tasks
Definition thread-pool-definitions.h:152
void start()
Definition thread-pool-definitions.h:169
ThreadPool(size_t n_threads)
Definition thread-pool-definitions.h:66
bool stop_flag
Definition thread-pool-definitions.h:164
Definition ac-toolbox-arr-asc-reader.h:44