Files
c3c/lib/std/threads/buffered_channel.c3
Sander van den Bosch 3f20e5af1d add join for ThreadPool without destroying the threads (#2579)
* add join for ThreadPool without destroying the threads
* Make the main Thread block waiting for the worker threads to finish instead of buzy looping and do proper initialization and freeing of all variables.
* Updated test to use `atomic_store` and  take into account the maximum queue size of the threadpool.
* - Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads.
- Return of Thread/Mutex/CondVar `destroy()` is now "@maydiscard" and should be ignored. It will return void in 0.8.0.
- Return of Mutex `unlock()` and `lock()` is now "@maydiscard" and should be ignored. They will return void in 0.8.0.
- Return of ConditionVariable `signal()` `broadcast()` and `wait()` are now "@maydiscard". They will return void in 0.8.0.
- Return of Thread `detatch()` is now "@maydiscard". It will return void in 0.8.0.
- Buffered/UnbufferedChannel, and both ThreadPools have `@maydiscard` on a set of functions. They will retunr void in 0.8.0.
- Pthread bindings correctly return Errno instead of CInt.
- Return of Thread `join()` is now "@maydiscard".

---------

Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
2025-12-06 23:54:04 +01:00

159 lines
3.0 KiB
Plaintext

module std::thread::channel{Type};
typedef BufferedChannel = void*;
struct BufferedChannelImpl @private
{
Allocator allocator;
Mutex mu;
bool closed;
usz size;
usz elems;
usz sendx;
usz send_waiting;
ConditionVariable send_cond;
usz readx;
usz read_waiting;
ConditionVariable read_cond;
Type[*] buf;
}
fn void? BufferedChannel.init(&self, Allocator allocator, usz size = 1)
{
BufferedChannelImpl* channel = allocator::new_with_padding(allocator, BufferedChannelImpl, Type.sizeof * size)!;
defer catch allocator::free(allocator, channel);
channel.allocator = allocator;
channel.size = size;
channel.elems = 0;
channel.sendx = 0;
channel.send_waiting = 0;
channel.readx = 0;
channel.read_waiting = 0;
channel.mu.init()!;
defer catch channel.mu.destroy();
channel.send_cond.init()!;
defer catch channel.send_cond.destroy();
channel.read_cond.init()!;
defer catch channel.read_cond.destroy();
*self = (BufferedChannel)channel;
}
fn void? BufferedChannel.destroy(&self) @maydiscard // Remove optional in 0.8.0
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
channel.mu.destroy();
channel.send_cond.destroy();
channel.read_cond.destroy();
allocator::free(channel.allocator, channel);
*self = null;
}
fn void? BufferedChannel.push(self, Type val)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
channel.mu.lock();
defer catch channel.mu.unlock();
// if channel is full -> wait
while (channel.elems == channel.size && !channel.closed)
{
channel.send_waiting++;
channel.send_cond.wait(&channel.mu);
channel.send_waiting--;
}
// check if channel is closed
if (channel.closed) return thread::CHANNEL_CLOSED?;
// save value to buf
channel.buf[channel.sendx] = val;
// move pointer
channel.sendx++;
if (channel.sendx == channel.size)
{
channel.sendx = 0;
}
// channelge elems counter
channel.elems++;
// if someone is waiting -> awake him
if (channel.read_waiting > 0)
{
channel.read_cond.signal();
}
channel.mu.unlock();
}
fn Type? BufferedChannel.pop(self)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
channel.mu.lock();
defer catch channel.mu.unlock();
// if chan is empty -> wait for sender
while (channel.elems == 0 && !channel.closed)
{
channel.read_waiting++;
channel.read_cond.wait(&channel.mu);
channel.read_waiting--;
}
// check if chan is closed and empty
if (channel.closed && channel.elems == 0)
{
return thread::CHANNEL_CLOSED?;
}
// read from buf
Type ret = channel.buf[channel.readx];
// move pointer
channel.readx++;
if (channel.readx == channel.size)
{
channel.readx = 0;
}
// change elems counter
channel.elems--;
// if someone is waiting -> awake him
if (channel.send_waiting > 0)
{
channel.send_cond.signal();
}
channel.mu.unlock();
return ret;
}
fn void? BufferedChannel.close(self) @maydiscard // Remove optional in 0.8.0
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
channel.mu.lock();
channel.closed = true;
channel.read_cond.broadcast();
channel.send_cond.broadcast();
channel.mu.unlock();
}