Files
c3c/lib/std/threads/fixed_pool.c3
Sander van den Bosch 3f20e5af1d add join for ThreadPool without destroying the threads (#2579)
* add join for ThreadPool without destroying the threads
* Make the main Thread block waiting for the worker threads to finish instead of buzy looping and do proper initialization and freeing of all variables.
* Updated test to use `atomic_store` and  take into account the maximum queue size of the threadpool.
* - Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads.
- Return of Thread/Mutex/CondVar `destroy()` is now "@maydiscard" and should be ignored. It will return void in 0.8.0.
- Return of Mutex `unlock()` and `lock()` is now "@maydiscard" and should be ignored. They will return void in 0.8.0.
- Return of ConditionVariable `signal()` `broadcast()` and `wait()` are now "@maydiscard". They will return void in 0.8.0.
- Return of Thread `detatch()` is now "@maydiscard". It will return void in 0.8.0.
- Buffered/UnbufferedChannel, and both ThreadPools have `@maydiscard` on a set of functions. They will retunr void in 0.8.0.
- Pthread bindings correctly return Errno instead of CInt.
- Return of Thread `join()` is now "@maydiscard".

---------

Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
2025-12-06 23:54:04 +01:00

205 lines
4.2 KiB
Plaintext

module std::thread;
faultdef THREAD_QUEUE_FULL;
module std::thread::threadpool @if (env::POSIX || env::WIN32);
import std::thread;
// Please do not use this one in production.
alias ThreadPoolFn = fn void(any[] args);
struct FixedThreadPool
{
Mutex mu;
QueueItem[] queue;
usz qindex;
usz qworking;
usz num_threads;
bitstruct : char {
bool initialized;
bool stop;
bool stop_now;
bool joining;
}
Thread[] pool;
ConditionVariable notify;
ConditionVariable collect;
}
struct QueueItem @private
{
ThreadPoolFn func;
any[] args;
}
<*
@require !self.initialized : "ThreadPool must not be already initialized"
@require threads > 0 && threads < 0x1000 : `Threads should be greater than 0 and less than 0x1000`
@require queue_size < 0x10000 : `Queue size must be less than 65536`
*>
fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0)
{
if (queue_size == 0) queue_size = threads * 32;
defer catch @ok(self.destroy());
assert(queue_size > 0);
*self = {
.num_threads = threads,
.initialized = true,
.queue = mem::alloc_array(QueueItem, queue_size),
.pool = mem::new_array(Thread, threads)
};
self.mu.init()!;
defer catch self.mu.destroy();
self.notify.init()!;
defer catch self.notify.destroy();
self.collect.init()!;
defer catch self.collect.destroy();
foreach (&thread : self.pool)
{
thread.create(&process_work, self)!;
// The thread resources will be cleaned up when the thread exits.
thread.detach()!;
}
}
<*
Join all threads in the pool.
*>
fn void? FixedThreadPool.join(&self) @maydiscard // Remove optional in 0.8.0
{
if (self.initialized)
{
self.mu.lock();
defer self.mu.unlock();
self.joining = true;
self.notify.broadcast();
self.collect.wait(&self.mu);
self.joining = false;
}
}
<*
Stop all the threads and cleanup the pool.
Any pending work will be dropped.
*>
fn void? FixedThreadPool.destroy(&self)
{
return self.@shutdown(self.stop_now);
}
<*
Stop all the threads and cleanup the pool.
Any pending work will be processed.
*>
fn void? FixedThreadPool.stop_and_destroy(&self) @maydiscard // Remove optional in 0.8.0
{
self.@shutdown(self.stop);
}
macro void FixedThreadPool.@shutdown(&self, #stop) @private
{
if (self.initialized)
{
self.mu.lock();
#stop = true;
self.notify.broadcast();
self.mu.unlock();
// Wait for all threads to shutdown.
while (true)
{
self.mu.lock();
defer self.mu.unlock();
if (self.num_threads == 0) break;
self.notify.signal();
}
self.collect.destroy();
self.notify.destroy();
self.mu.destroy();
self.initialized = false;
while (self.qindex)
{
free_qitem(self.queue[--self.qindex]);
}
free(self.queue);
free(self.pool);
self.queue = {};
}
}
<*
Push a new job to the pool.
return Excuse if the queue is full, in which case the job is ignored.
*>
fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...)
{
self.mu.lock();
defer self.mu.unlock();
if (self.qindex == self.queue.len) return thread::THREAD_QUEUE_FULL?;
any[] data;
if (args.len)
{
data = mem::alloc_array(any, args.len);
foreach (i, arg : args) data[i] = allocator::clone_any(mem, arg);
}
self.queue[self.qindex] = { .func = func, .args = data };
self.qindex++;
self.qworking++;
defer catch
{
free_qitem(self.queue[--self.qindex]);
}
// Notify the threads that work is available.
self.notify.broadcast();
}
fn int process_work(void* self_arg) @private
{
FixedThreadPool* self = self_arg;
while (true)
{
self.mu.lock();
if (self.stop_now)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock();
return 0;
}
// Wait for work.
while (self.qindex == 0)
{
if (self.joining && self.qworking == 0) self.collect.signal();
if (self.stop)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock();
return 0;
}
self.notify.wait(&self.mu);
if (self.stop_now)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock();
return 0;
}
}
// Process the job.
self.qindex--;
QueueItem item = self.queue[self.qindex];
self.mu.unlock();
item.func(item.args);
defer free_qitem(item);
self.mu.lock();
self.qworking--;
self.mu.unlock();
}
}
fn void free_qitem(QueueItem item) @private
{
foreach (arg : item.args) free(arg.ptr);
free(item.args);
}