mirror of
https://github.com/c3lang/c3c.git
synced 2026-02-27 03:51:18 +00:00
[FEAT] add golang like channel (#1843)
* [feat] add golang like channels * Updated new_init/init. Some fixes for init. --------- Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
This commit is contained in:
165
lib/std/threads/buffered_channel.c3
Normal file
165
lib/std/threads/buffered_channel.c3
Normal file
@@ -0,0 +1,165 @@
|
||||
module std::thread::channel(<Type>);
|
||||
|
||||
distinct BufferedChannel = void*;
|
||||
|
||||
struct BufferedChannelImpl @private
|
||||
{
|
||||
Allocator allocator;
|
||||
Mutex mu;
|
||||
bool closed;
|
||||
usz size;
|
||||
usz elems;
|
||||
|
||||
usz sendx;
|
||||
usz send_waiting;
|
||||
ConditionVariable send_cond;
|
||||
|
||||
usz readx;
|
||||
usz read_waiting;
|
||||
ConditionVariable read_cond;
|
||||
|
||||
Type[*] buf;
|
||||
}
|
||||
|
||||
fn void! BufferedChannel.new_init(&self, usz size = 1, Allocator allocator = allocator::heap())
|
||||
{
|
||||
return self.init(size, allocator::heap());
|
||||
}
|
||||
|
||||
fn void! BufferedChannel.init(&self, usz size = 1, Allocator allocator)
|
||||
{
|
||||
BufferedChannelImpl* channel = allocator::new_with_padding(allocator, BufferedChannelImpl, Type.sizeof * size)!;
|
||||
defer catch allocator::free(allocator, channel);
|
||||
|
||||
channel.allocator = allocator;
|
||||
channel.size = size;
|
||||
channel.elems = 0;
|
||||
channel.sendx = 0;
|
||||
channel.send_waiting = 0;
|
||||
channel.readx = 0;
|
||||
channel.read_waiting = 0;
|
||||
|
||||
channel.mu.init()!;
|
||||
defer catch (void)channel.mu.destroy();
|
||||
channel.send_cond.init()!;
|
||||
defer catch (void)channel.send_cond.destroy();
|
||||
channel.read_cond.init()!;
|
||||
defer catch (void)channel.read_cond.destroy();
|
||||
|
||||
*self = (BufferedChannel)channel;
|
||||
}
|
||||
|
||||
fn void! BufferedChannel.destroy(&self)
|
||||
{
|
||||
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
|
||||
|
||||
anyfault err = @catch(channel.mu.destroy());
|
||||
err = @catch(channel.send_cond.destroy()) ?: err;
|
||||
err = @catch(channel.read_cond.destroy()) ?: err;
|
||||
allocator::free(channel.allocator, channel);
|
||||
|
||||
*self = null;
|
||||
|
||||
if (err) return err?;
|
||||
}
|
||||
|
||||
fn void! BufferedChannel.push(self, Type val)
|
||||
{
|
||||
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
|
||||
|
||||
channel.mu.lock()!;
|
||||
defer catch (void)channel.mu.unlock();
|
||||
|
||||
// if channel is full -> wait
|
||||
while (channel.elems == channel.size && !channel.closed)
|
||||
{
|
||||
channel.send_waiting++;
|
||||
channel.send_cond.wait(&channel.mu)!;
|
||||
channel.send_waiting--;
|
||||
}
|
||||
|
||||
// check if channel is closed
|
||||
if (channel.closed) return ThreadFault.CHANNEL_CLOSED?;
|
||||
|
||||
// save value to buf
|
||||
channel.buf[channel.sendx] = val;
|
||||
|
||||
// move pointer
|
||||
channel.sendx++;
|
||||
if (channel.sendx == channel.size)
|
||||
{
|
||||
channel.sendx = 0;
|
||||
}
|
||||
|
||||
// channelge elems counter
|
||||
channel.elems++;
|
||||
|
||||
// if someone is waiting -> awake him
|
||||
if (channel.read_waiting > 0)
|
||||
{
|
||||
channel.read_cond.signal()!;
|
||||
}
|
||||
|
||||
channel.mu.unlock()!;
|
||||
}
|
||||
|
||||
fn Type! BufferedChannel.pop(self)
|
||||
{
|
||||
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
|
||||
|
||||
channel.mu.lock()!;
|
||||
defer catch (void)channel.mu.unlock();
|
||||
|
||||
// if chan is empty -> wait for sender
|
||||
while (channel.elems == 0 && !channel.closed)
|
||||
{
|
||||
channel.read_waiting++;
|
||||
channel.read_cond.wait(&channel.mu)!;
|
||||
channel.read_waiting--;
|
||||
}
|
||||
|
||||
// check if chan is closed and empty
|
||||
if (channel.closed && channel.elems == 0)
|
||||
{
|
||||
return ThreadFault.CHANNEL_CLOSED?;
|
||||
}
|
||||
|
||||
// read from buf
|
||||
Type ret = channel.buf[channel.readx];
|
||||
|
||||
// move pointer
|
||||
channel.readx++;
|
||||
if (channel.readx == channel.size)
|
||||
{
|
||||
channel.readx = 0;
|
||||
}
|
||||
|
||||
// change elems counter
|
||||
channel.elems--;
|
||||
|
||||
// if someone is waiting -> awake him
|
||||
if (channel.send_waiting > 0)
|
||||
{
|
||||
channel.send_cond.signal()!;
|
||||
}
|
||||
|
||||
channel.mu.unlock()!;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
fn void! BufferedChannel.close(self)
|
||||
{
|
||||
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
|
||||
|
||||
anyfault err = @catch(channel.mu.lock());
|
||||
|
||||
channel.closed = true;
|
||||
|
||||
err = @catch(channel.read_cond.broadcast()) ?: err;
|
||||
err = @catch(channel.send_cond.broadcast()) ?: err;
|
||||
err = @catch(channel.mu.unlock()) ?: err;
|
||||
|
||||
if (err) return err?;
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ fault ThreadFault
|
||||
DETACH_FAILED,
|
||||
JOIN_FAILED,
|
||||
INTERRUPTED,
|
||||
CHANNEL_CLOSED,
|
||||
}
|
||||
|
||||
macro void! Mutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, MUTEX_PLAIN);
|
||||
|
||||
140
lib/std/threads/unbuffered_channel.c3
Normal file
140
lib/std/threads/unbuffered_channel.c3
Normal file
@@ -0,0 +1,140 @@
|
||||
module std::thread::channel(<Type>);
|
||||
|
||||
distinct UnbufferedChannel = void*;
|
||||
|
||||
struct UnbufferedChannelImpl @private
|
||||
{
|
||||
Allocator allocator;
|
||||
Mutex mu;
|
||||
Type buf;
|
||||
bool closed;
|
||||
|
||||
Mutex send_mu;
|
||||
usz send_waiting;
|
||||
ConditionVariable send_cond;
|
||||
|
||||
Mutex read_mu;
|
||||
usz read_waiting;
|
||||
ConditionVariable read_cond;
|
||||
}
|
||||
|
||||
fn void! UnbufferedChannel.new_init(&self) => self.init(allocator::heap());
|
||||
|
||||
fn void! UnbufferedChannel.init(&self, Allocator allocator)
|
||||
{
|
||||
UnbufferedChannelImpl* channel = allocator::alloc(allocator, UnbufferedChannelImpl);
|
||||
defer catch (void)allocator::free(allocator, channel);
|
||||
|
||||
channel.allocator = allocator;
|
||||
channel.send_waiting = 0;
|
||||
channel.read_waiting = 0;
|
||||
|
||||
channel.mu.init()!;
|
||||
defer catch (void)channel.mu.destroy();
|
||||
channel.send_mu.init()!;
|
||||
defer catch (void)channel.send_mu.destroy();
|
||||
channel.send_cond.init()!;
|
||||
defer catch (void)channel.send_cond.destroy();
|
||||
channel.read_mu.init()!;
|
||||
defer catch (void)channel.read_mu.destroy();
|
||||
channel.read_cond.init()!;
|
||||
|
||||
*self = (UnbufferedChannel)channel;
|
||||
}
|
||||
|
||||
fn void! UnbufferedChannel.destroy(&self)
|
||||
{
|
||||
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self);
|
||||
|
||||
anyfault err = @catch(channel.mu.destroy());
|
||||
err = @catch(channel.send_mu.destroy()) ?: err;
|
||||
err = @catch(channel.send_cond.destroy()) ?: err;
|
||||
err = @catch(channel.read_mu.destroy()) ?: err;
|
||||
err = @catch(channel.read_cond.destroy()) ?: err;
|
||||
allocator::free(channel.allocator, channel);
|
||||
|
||||
if (err) return err?;
|
||||
|
||||
*self = null;
|
||||
}
|
||||
|
||||
fn void! UnbufferedChannel.push(self, Type val)
|
||||
{
|
||||
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self;
|
||||
|
||||
channel.mu.lock()!;
|
||||
defer catch (void)channel.mu.unlock();
|
||||
channel.send_mu.lock()!;
|
||||
defer catch (void)channel.send_mu.unlock();
|
||||
|
||||
if (channel.closed)
|
||||
{
|
||||
return ThreadFault.CHANNEL_CLOSED?;
|
||||
}
|
||||
|
||||
// store value in the buffer
|
||||
channel.buf = val;
|
||||
// show that we are waiting for reader
|
||||
channel.send_waiting++;
|
||||
|
||||
// if reader is already waiting for us -> awake him
|
||||
if (channel.read_waiting > 0)
|
||||
{
|
||||
channel.read_cond.signal()!;
|
||||
}
|
||||
|
||||
// wait until reader takes value from buffer
|
||||
channel.send_cond.wait(&channel.mu)!;
|
||||
|
||||
if (channel.closed) return ThreadFault.CHANNEL_CLOSED?;
|
||||
|
||||
channel.mu.unlock()!;
|
||||
channel.send_mu.unlock()!;
|
||||
}
|
||||
|
||||
fn Type! UnbufferedChannel.pop(self)
|
||||
{
|
||||
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self;
|
||||
|
||||
channel.mu.lock()!;
|
||||
defer catch (void)channel.mu.unlock();
|
||||
channel.read_mu.lock()!;
|
||||
defer catch (void)channel.read_mu.unlock();
|
||||
|
||||
// if no one is waiting, then there is nothing in the buffer
|
||||
while (!channel.closed && channel.send_waiting == 0)
|
||||
{
|
||||
channel.read_waiting++;
|
||||
channel.read_cond.wait(&channel.mu)!;
|
||||
channel.read_waiting--;
|
||||
}
|
||||
|
||||
if (channel.closed) return ThreadFault.CHANNEL_CLOSED?;
|
||||
|
||||
// take value from buffer
|
||||
Type ret = channel.buf;
|
||||
|
||||
// awake sender
|
||||
channel.send_waiting--;
|
||||
channel.send_cond.signal()!;
|
||||
|
||||
channel.mu.unlock()!;
|
||||
channel.read_mu.unlock()!;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
fn void! UnbufferedChannel.close(self)
|
||||
{
|
||||
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self;
|
||||
|
||||
anyfault err = @catch(channel.mu.lock());
|
||||
|
||||
channel.closed = true;
|
||||
|
||||
err = @catch(channel.read_cond.broadcast()) ?: err;
|
||||
err = @catch(channel.send_cond.broadcast()) ?: err;
|
||||
err = @catch(channel.mu.unlock()) ?: err;
|
||||
|
||||
if (err) return err?;
|
||||
}
|
||||
Reference in New Issue
Block a user