From a0192a0116d5bf5a06ad63c85c43a91009b5318e Mon Sep 17 00:00:00 2001 From: Snikimonkd <72211350+Snikimonkd@users.noreply.github.com> Date: Tue, 21 Jan 2025 01:51:00 +0300 Subject: [PATCH] [FEAT] add golang like channel (#1843) * [feat] add golang like channels * Updated new_init/init. Some fixes for init. --------- Co-authored-by: Christoffer Lerno --- lib/std/threads/buffered_channel.c3 | 165 +++++++++++++ lib/std/threads/thread.c3 | 1 + lib/std/threads/unbuffered_channel.c3 | 140 +++++++++++ test/unit/stdlib/threads/channel.c3 | 337 ++++++++++++++++++++++++++ 4 files changed, 643 insertions(+) create mode 100644 lib/std/threads/buffered_channel.c3 create mode 100644 lib/std/threads/unbuffered_channel.c3 create mode 100644 test/unit/stdlib/threads/channel.c3 diff --git a/lib/std/threads/buffered_channel.c3 b/lib/std/threads/buffered_channel.c3 new file mode 100644 index 000000000..2e45a584e --- /dev/null +++ b/lib/std/threads/buffered_channel.c3 @@ -0,0 +1,165 @@ +module std::thread::channel(); + +distinct BufferedChannel = void*; + +struct BufferedChannelImpl @private +{ + Allocator allocator; + Mutex mu; + bool closed; + usz size; + usz elems; + + usz sendx; + usz send_waiting; + ConditionVariable send_cond; + + usz readx; + usz read_waiting; + ConditionVariable read_cond; + + Type[*] buf; +} + +fn void! BufferedChannel.new_init(&self, usz size = 1, Allocator allocator = allocator::heap()) +{ + return self.init(size, allocator::heap()); +} + +fn void! BufferedChannel.init(&self, usz size = 1, Allocator allocator) +{ + BufferedChannelImpl* channel = allocator::new_with_padding(allocator, BufferedChannelImpl, Type.sizeof * size)!; + defer catch allocator::free(allocator, channel); + + channel.allocator = allocator; + channel.size = size; + channel.elems = 0; + channel.sendx = 0; + channel.send_waiting = 0; + channel.readx = 0; + channel.read_waiting = 0; + + channel.mu.init()!; + defer catch (void)channel.mu.destroy(); + channel.send_cond.init()!; + defer catch (void)channel.send_cond.destroy(); + channel.read_cond.init()!; + defer catch (void)channel.read_cond.destroy(); + + *self = (BufferedChannel)channel; +} + +fn void! BufferedChannel.destroy(&self) +{ + BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self); + + anyfault err = @catch(channel.mu.destroy()); + err = @catch(channel.send_cond.destroy()) ?: err; + err = @catch(channel.read_cond.destroy()) ?: err; + allocator::free(channel.allocator, channel); + + *self = null; + + if (err) return err?; +} + +fn void! BufferedChannel.push(self, Type val) +{ + BufferedChannelImpl* channel = (BufferedChannelImpl*)self; + + channel.mu.lock()!; + defer catch (void)channel.mu.unlock(); + + // if channel is full -> wait + while (channel.elems == channel.size && !channel.closed) + { + channel.send_waiting++; + channel.send_cond.wait(&channel.mu)!; + channel.send_waiting--; + } + + // check if channel is closed + if (channel.closed) return ThreadFault.CHANNEL_CLOSED?; + + // save value to buf + channel.buf[channel.sendx] = val; + + // move pointer + channel.sendx++; + if (channel.sendx == channel.size) + { + channel.sendx = 0; + } + + // channelge elems counter + channel.elems++; + + // if someone is waiting -> awake him + if (channel.read_waiting > 0) + { + channel.read_cond.signal()!; + } + + channel.mu.unlock()!; +} + +fn Type! BufferedChannel.pop(self) +{ + BufferedChannelImpl* channel = (BufferedChannelImpl*)self; + + channel.mu.lock()!; + defer catch (void)channel.mu.unlock(); + + // if chan is empty -> wait for sender + while (channel.elems == 0 && !channel.closed) + { + channel.read_waiting++; + channel.read_cond.wait(&channel.mu)!; + channel.read_waiting--; + } + + // check if chan is closed and empty + if (channel.closed && channel.elems == 0) + { + return ThreadFault.CHANNEL_CLOSED?; + } + + // read from buf + Type ret = channel.buf[channel.readx]; + + // move pointer + channel.readx++; + if (channel.readx == channel.size) + { + channel.readx = 0; + } + + // change elems counter + channel.elems--; + + // if someone is waiting -> awake him + if (channel.send_waiting > 0) + { + channel.send_cond.signal()!; + } + + channel.mu.unlock()!; + + return ret; +} + +fn void! BufferedChannel.close(self) +{ + BufferedChannelImpl* channel = (BufferedChannelImpl*)self; + + anyfault err = @catch(channel.mu.lock()); + + channel.closed = true; + + err = @catch(channel.read_cond.broadcast()) ?: err; + err = @catch(channel.send_cond.broadcast()) ?: err; + err = @catch(channel.mu.unlock()) ?: err; + + if (err) return err?; +} + diff --git a/lib/std/threads/thread.c3 b/lib/std/threads/thread.c3 index dd9554acf..d6f208839 100644 --- a/lib/std/threads/thread.c3 +++ b/lib/std/threads/thread.c3 @@ -32,6 +32,7 @@ fault ThreadFault DETACH_FAILED, JOIN_FAILED, INTERRUPTED, + CHANNEL_CLOSED, } macro void! Mutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, MUTEX_PLAIN); diff --git a/lib/std/threads/unbuffered_channel.c3 b/lib/std/threads/unbuffered_channel.c3 new file mode 100644 index 000000000..c27321a2b --- /dev/null +++ b/lib/std/threads/unbuffered_channel.c3 @@ -0,0 +1,140 @@ +module std::thread::channel(); + +distinct UnbufferedChannel = void*; + +struct UnbufferedChannelImpl @private +{ + Allocator allocator; + Mutex mu; + Type buf; + bool closed; + + Mutex send_mu; + usz send_waiting; + ConditionVariable send_cond; + + Mutex read_mu; + usz read_waiting; + ConditionVariable read_cond; +} + +fn void! UnbufferedChannel.new_init(&self) => self.init(allocator::heap()); + +fn void! UnbufferedChannel.init(&self, Allocator allocator) +{ + UnbufferedChannelImpl* channel = allocator::alloc(allocator, UnbufferedChannelImpl); + defer catch (void)allocator::free(allocator, channel); + + channel.allocator = allocator; + channel.send_waiting = 0; + channel.read_waiting = 0; + + channel.mu.init()!; + defer catch (void)channel.mu.destroy(); + channel.send_mu.init()!; + defer catch (void)channel.send_mu.destroy(); + channel.send_cond.init()!; + defer catch (void)channel.send_cond.destroy(); + channel.read_mu.init()!; + defer catch (void)channel.read_mu.destroy(); + channel.read_cond.init()!; + + *self = (UnbufferedChannel)channel; +} + +fn void! UnbufferedChannel.destroy(&self) +{ + UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self); + + anyfault err = @catch(channel.mu.destroy()); + err = @catch(channel.send_mu.destroy()) ?: err; + err = @catch(channel.send_cond.destroy()) ?: err; + err = @catch(channel.read_mu.destroy()) ?: err; + err = @catch(channel.read_cond.destroy()) ?: err; + allocator::free(channel.allocator, channel); + + if (err) return err?; + + *self = null; +} + +fn void! UnbufferedChannel.push(self, Type val) +{ + UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self; + + channel.mu.lock()!; + defer catch (void)channel.mu.unlock(); + channel.send_mu.lock()!; + defer catch (void)channel.send_mu.unlock(); + + if (channel.closed) + { + return ThreadFault.CHANNEL_CLOSED?; + } + + // store value in the buffer + channel.buf = val; + // show that we are waiting for reader + channel.send_waiting++; + + // if reader is already waiting for us -> awake him + if (channel.read_waiting > 0) + { + channel.read_cond.signal()!; + } + + // wait until reader takes value from buffer + channel.send_cond.wait(&channel.mu)!; + + if (channel.closed) return ThreadFault.CHANNEL_CLOSED?; + + channel.mu.unlock()!; + channel.send_mu.unlock()!; +} + +fn Type! UnbufferedChannel.pop(self) +{ + UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self; + + channel.mu.lock()!; + defer catch (void)channel.mu.unlock(); + channel.read_mu.lock()!; + defer catch (void)channel.read_mu.unlock(); + + // if no one is waiting, then there is nothing in the buffer + while (!channel.closed && channel.send_waiting == 0) + { + channel.read_waiting++; + channel.read_cond.wait(&channel.mu)!; + channel.read_waiting--; + } + + if (channel.closed) return ThreadFault.CHANNEL_CLOSED?; + + // take value from buffer + Type ret = channel.buf; + + // awake sender + channel.send_waiting--; + channel.send_cond.signal()!; + + channel.mu.unlock()!; + channel.read_mu.unlock()!; + + return ret; +} + +fn void! UnbufferedChannel.close(self) +{ + UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self; + + anyfault err = @catch(channel.mu.lock()); + + channel.closed = true; + + err = @catch(channel.read_cond.broadcast()) ?: err; + err = @catch(channel.send_cond.broadcast()) ?: err; + err = @catch(channel.mu.unlock()) ?: err; + + if (err) return err?; +} \ No newline at end of file diff --git a/test/unit/stdlib/threads/channel.c3 b/test/unit/stdlib/threads/channel.c3 new file mode 100644 index 000000000..0b708763a --- /dev/null +++ b/test/unit/stdlib/threads/channel.c3 @@ -0,0 +1,337 @@ +module thread_test; + +import std::thread::channel; +import std::thread; +import std::time; +import std::io; + +fn void init_destroy_buffered() @test +{ + for (usz i = 0; i < 20; i++) + { + BufferedChannel() c; + c.new_init(1)!!; + defer c.destroy()!!; + } +} + +fn void init_destroy_unbuffered() @test +{ + for (usz i = 0; i < 20; i++) + { + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + } +} + +fn void push_to_buffered_channel_no_lock() @test +{ + BufferedChannel() c; + c.new_init(1)!!; + defer c.destroy()!!; + + c.push(1)!!; +} + +fn void push_pop_buffered_no_locks() @test +{ + BufferedChannel() c; + c.new_init(1)!!; + defer c.destroy()!!; + + c.push(123)!!; + int got = c.pop()!!; + assert(got == 123); +} + +fn void push_pop_unbuffered_with_locks() @test +{ + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + UnbufferedChannel() c = (UnbufferedChannel())arg; + c.push(123)!!; + c.push(321)!!; + return 0; + }, (void*)c)!!; + + int got = c.pop()!!; + assert(got == 123); + got = c.pop()!!; + assert(got == 321); +} + +fn void sending_to_closed_unbuffered_chan_is_forbidden() @test +{ + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.push(123)) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + assert(false); +} + +fn void sending_to_closed_buffered_chan_is_forbidden() @test +{ + BufferedChannel() c; + c.new_init(1)!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.push(123)) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + assert(false); +} + +fn void reading_from_empty_closed_unbuffered_chan_is_forbidden() @test +{ + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.pop()) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + assert(false); +} + +fn void reading_from_empty_closed_buffered_chan_is_forbidden() @test +{ + BufferedChannel() c; + c.new_init(1)!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.pop()) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + assert(false); +} + +fn void reading_from_non_empty_closed_buffered_chan_is_ok() @test +{ + BufferedChannel() c; + c.new_init(3)!!; + defer c.destroy()!!; + + c.push(1)!!; + c.push(2)!!; + c.push(3)!!; + + c.close()!!; + + int got = c.pop()!!; + assert(got == 1); + got = c.pop()!!; + assert(got == 2); + got = c.pop()!!; + assert(got == 3); + + int! got_err = c.pop(); + if (catch err = got_err) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + + assert(false); +} + +fn void reading_from_empty_buffered_chan_aborted_by_close() @test +{ + BufferedChannel() c; + c.new_init(3)!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + BufferedChannel() c = (BufferedChannel())arg; + c.close()!!; + return 0; + }, (void*)c)!!; + + int! res = c.pop(); + if (catch err = res) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + + assert(false); +} + +fn void reading_from_unbuffered_chan_aborted_by_close() @test +{ + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + UnbufferedChannel() c = (UnbufferedChannel())arg; + c.close()!!; + return 0; + }, (void*)c)!!; + + int! res = c.pop(); + if (catch err = res) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + + assert(false); +} + +fn void sending_to_full_buffered_chan_aborted_by_close() @test +{ + BufferedChannel() c; + c.new_init(1)!!; + defer c.destroy()!!; + + c.push(1)!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + BufferedChannel() c = (BufferedChannel())arg; + c.close()!!; + return 0; + }, (void*)c)!!; + + anyfault err = @catch(c.push(1)); + if (err) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + + assert(false); +} + +fn void sending_to_unbuffered_chan_aborted_by_close() @test +{ + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + UnbufferedChannel() c = (UnbufferedChannel())arg; + c.close()!!; + return 0; + }, (void*)c)!!; + + anyfault err = @catch(c.push(1)); + if (err) + { + assert(err == ThreadFault.CHANNEL_CLOSED); + return; + } + + assert(false); +} + +fn void multiple_actions_unbuffered() @test +{ + UnbufferedChannel() c; + c.new_init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + UnbufferedChannel() c = (UnbufferedChannel())arg; + for (int i = 0; i <= 100; i++) + { + c.push(i)!!; + } + return 0; + }, (void*)c)!!; + + int sum; + + for (int i = 0; i <= 100; i++) + { + int! res = c.pop(); + if (catch err = res) + { + assert(false); + } + sum += res; + } + + assert(sum == 5050); +} + +fn void multiple_actions_buffered() @test +{ + BufferedChannel() c; + c.new_init(10)!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) + { + BufferedChannel() c = (BufferedChannel())arg; + for (int i = 0; i <= 100; i++) + { + c.push(i)!!; + } + return 0; + }, (void*)c)!!; + + int sum; + + for (int i = 0; i <= 100; i++) + { + int! res = c.pop(); + if (catch err = res) + { + assert(false); + } + sum += res; + } + + assert(sum == 5050); +} +