diff --git a/lib/std/collections/linked_blockingqueue.c3 b/lib/std/collections/linked_blockingqueue.c3 new file mode 100644 index 000000000..a2325a937 --- /dev/null +++ b/lib/std/collections/linked_blockingqueue.c3 @@ -0,0 +1,327 @@ +module std::collections::blockingqueue { Value }; +import std::thread, std::time; + + +const INITIAL_CAPACITY = 16; + +struct QueueEntry +{ + Value value; + QueueEntry* next; // Next in queue order + QueueEntry* prev; // Previous in queue order +} + +struct LinkedBlockingQueue +{ + QueueEntry* head; // First element in queue + QueueEntry* tail; // Last element in queue + usz count; // Current number of elements + usz capacity; // Maximum capacity (0 for unbounded) + Mutex lock; + ConditionVariable not_empty; + ConditionVariable not_full; + Allocator allocator; +} + +<* + @param [&inout] allocator : "The allocator to use" + @param capacity : "Maximum capacity (0 for unbounded)" + @require !self.is_initialized() : "Queue was already initialized" +*> +fn LinkedBlockingQueue* LinkedBlockingQueue.init(&self, Allocator allocator, usz capacity = 0) +{ + self.allocator = allocator; + self.capacity = capacity; + self.count = 0; + self.head = null; + self.tail = null; + + self.lock.init()!!; + self.not_empty.init()!!; + self.not_full.init()!!; + return self; +} + +fn LinkedBlockingQueue* LinkedBlockingQueue.tinit(&self, usz capacity = 0) +{ + return self.init(tmem, capacity) @inline; +} + +<* + @require self.is_initialized() : "Queue must be initialized" +*> +fn void LinkedBlockingQueue.free(&self) +{ + self.lock.@in_lock() + { + // Free all remaining entries + QueueEntry* entry = self.head; + while (entry != null) + { + QueueEntry* next = entry.next; + allocator::free(self.allocator, entry); + entry = next; + } + }; + + (void)self.lock.destroy(); + (void)self.not_empty.destroy(); + (void)self.not_full.destroy(); +} + +fn void LinkedBlockingQueue.link_entry(&self, QueueEntry* entry) @private +{ + entry.next = null; + entry.prev = self.tail; + + if (self.tail == null) + { + // First element in queue + self.head = entry; + } + else + { + // Append to tail + self.tail.next = entry; + } + self.tail = entry; + self.count++; +} + + +fn QueueEntry* LinkedBlockingQueue.unlink_head(&self) @private +{ + if (self.head == null) return null; + + QueueEntry* entry = self.head; + self.head = entry.next; + + if (self.head != null) + { + self.head.prev = null; + } + else + { + // Queue is now empty + self.tail = null; + } + + self.count--; + return entry; +} + +<* + @param value : "Value to add to the queue" + @require self.is_initialized() : "Queue must be initialized" +*> +fn void LinkedBlockingQueue.push(&self, Value value) +{ + self.lock.@in_lock() + { + while (self.capacity > 0 && self.count >= self.capacity) + { + self.not_full.wait(&self.lock)!!; + } + + QueueEntry* entry = allocator::new(self.allocator, QueueEntry, { + .value = value, + .next = null, + .prev = null + }); + self.link_entry(entry); + + // Signal that queue is no longer empty + self.not_empty.signal()!!; + }; +} + +<* + Get a value from the queue, blocking if there is no element in the queue. + + @require self.is_initialized() : "Queue must be initialized" + @return "The removed value" +*> +fn Value LinkedBlockingQueue.poll(&self) +{ + self.lock.@in_lock() + { + while (self.count == 0) + { + self.not_empty.wait(&self.lock)!!; + } + + QueueEntry* entry = self.unlink_head(); + Value value = entry.value; + allocator::free(self.allocator, entry); + if (self.capacity > 0) + { + self.not_full.signal()!!; + } + return value; + }; +} + +<* + Pop an element from the queue, fail is it is empty. + + @require self.is_initialized() : "Queue must be initialized" + @return "The removed value" + @return? NO_MORE_ELEMENT : "If the queue is empty" +*> +fn Value? LinkedBlockingQueue.pop(&self) +{ + self.lock.@in_lock() + { + if (self.count == 0) return NO_MORE_ELEMENT?; + + QueueEntry* entry = self.unlink_head(); + Value value = entry.value; + allocator::free(self.allocator, entry); + + if (self.capacity > 0) + { + self.not_full.signal()!!; + } + return value; + }; +} + +<* + Poll with a timeout. + + @param timeout : "Timeout in microseconds" + @require self.is_initialized() : "Queue must be initialized" + @return "The removed value or null if timeout occurred" + @return? NO_MORE_ELEMENT : "If we reached the timeout" +*> +fn Value? LinkedBlockingQueue.poll_timeout(&self, Duration timeout) +{ + self.lock.@in_lock() + { + // Use while loop to handle spurious wakeups + if (!self.count) + { + Time start = time::now(); + Time end = start + timeout; + while (!self.count) + { + if (end <= time::now()) break; + if (catch self.not_empty.wait_until(&self.lock, end)) break; + } + if (!self.count) return NO_MORE_ELEMENT?; + } + + QueueEntry* entry = self.unlink_head(); + Value value = entry.value; + allocator::free(self.allocator, entry); + + // Must signal not_full after removing an item + if (self.capacity > 0) + { + self.not_full.signal()!!; + } + return value; + }; +} + + +<* + @require self.is_initialized() : "Queue must be initialized" + @return "Current size of the queue" +*> +fn usz LinkedBlockingQueue.size(&self) +{ + self.lock.@in_lock() + { + return self.count; + }; +} + +<* + @require self.is_initialized() : "Queue must be initialized" + @return "True if queue is empty" +*> +fn bool LinkedBlockingQueue.is_empty(&self) +{ + self.lock.@in_lock() + { + return self.count == 0; + }; +} + +<* + Try to push, return CAPACITY_EXCEEDED if the queue is full. + + @param value : "Value to add to the queue" + @require self.is_initialized() : "Queue must be initialized" + @return? CAPACITY_EXCEEDED : "If the queue is full" +*> +fn void? LinkedBlockingQueue.try_push(&self, Value value) +{ + self.lock.@in_lock() + { + if (self.capacity > 0 && self.count >= self.capacity) return CAPACITY_EXCEEDED?; + + QueueEntry* entry = allocator::new(self.allocator, QueueEntry, { + .value = value, + .next = null, + .prev = null + }); + self.link_entry(entry); + self.not_empty.signal()!!; + }; +} + +<* + Try to push, return CAPACITY_EXCEEDED if the queue is still full after timeout is reached. + + @param value : "Value to add to the queue" + @param timeout : "Timeout in microseconds" + @require self.is_initialized() : "Queue must be initialized" + @return? CAPACITY_EXCEEDED : "If the queue is full" +*> +fn void? LinkedBlockingQueue.push_timeout(&self, Value value, Duration timeout) +{ + self.lock.@in_lock() + { + if (self.capacity > 0 && self.count >= self.capacity) + { + Time start = time::now(); + Time end = start + timeout; + while (self.capacity > 0 && self.count >= self.capacity) + { + if (end <= time::now()) break; + if (catch self.not_empty.wait_until(&self.lock, end)) break; + } + if (self.capacity > 0 && self.count >= self.capacity) return CAPACITY_EXCEEDED?; + } + + QueueEntry* entry = allocator::new(self.allocator, QueueEntry, { + .value = value, + .next = null, + .prev = null + }); + self.link_entry(entry); + self.not_empty.signal()!!; + }; +} + +<* + @require self.is_initialized() : "Queue must be initialized" + @return "The head value or NO_MORE_ELEMENT? if queue is empty" +*> +fn Value? LinkedBlockingQueue.peek(&self) +{ + self.lock.@in_lock() + { + return (self.head != null) ? self.head.value : NO_MORE_ELEMENT?; + }; +} + +<* + @return "True if queue is initialized" +*> +fn bool LinkedBlockingQueue.is_initialized(&self) +{ + return self.allocator && self.lock.initialized; +} diff --git a/lib/std/core/builtin.c3 b/lib/std/core/builtin.c3 index 13c9fbee1..92cfc929b 100644 --- a/lib/std/core/builtin.c3 +++ b/lib/std/core/builtin.c3 @@ -39,20 +39,24 @@ macro @is_valid_macro_slot(#arg) @const @builtin => !@typeis(#arg, EmptySlot); macro @rnd() @const @builtin => $$rnd(); /* - Use `IteratorResult` when reading the end of an iterator, or accessing a result out of bounds. + Use `NO_MORE_ELEMENT` when reading the end of an iterator, or accessing a result out of bounds. */ faultdef NO_MORE_ELEMENT @builtin; /* - Use `SearchResult` when trying to return a value from some collection but the element is missing. + Use `NOT_FOUND` when trying to return a value from some collection but the element is missing. */ faultdef NOT_FOUND @builtin; /* - Use `CastResult` when an attempt at conversion fails. + Use `TYPE_MISMATCH` when an attempt at conversion fails. */ faultdef TYPE_MISMATCH @builtin; +/* + Use `CAPACITY_EXCEEDED` when trying to add to a bounded list or similar. +*/ +faultdef CAPACITY_EXCEEDED @builtin; alias VoidFn = fn void(); diff --git a/lib/std/threads/os/thread_posix.c3 b/lib/std/threads/os/thread_posix.c3 index b30740860..6d2fcbb28 100644 --- a/lib/std/threads/os/thread_posix.c3 +++ b/lib/std/threads/os/thread_posix.c3 @@ -139,6 +139,7 @@ fn void? NativeConditionVariable.wait(&cond, NativeMutex* mtx) <* @require mtx.is_initialized() + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) { @@ -148,6 +149,7 @@ fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) <* @require mtx.is_initialized() + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, Duration duration) { @@ -158,6 +160,7 @@ fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, <* @require mtx.is_initialized() + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time) { diff --git a/lib/std/threads/os/thread_win32.c3 b/lib/std/threads/os/thread_win32.c3 index cddaa0717..a3f0cfc05 100644 --- a/lib/std/threads/os/thread_win32.c3 +++ b/lib/std/threads/os/thread_win32.c3 @@ -311,6 +311,7 @@ fn void? NativeConditionVariable.wait(&cond, NativeMutex* mtx) @inline <* @require mtx.initialized : "Mutex was not initialized" + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) @inline { @@ -320,6 +321,7 @@ fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) <* @require mtx.initialized : "Mutex was not initialized" + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, Duration duration) @inline { @@ -331,6 +333,7 @@ fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, <* @require mtx.initialized : "Mutex was not initialized" + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time) @inline { diff --git a/lib/std/threads/thread.c3 b/lib/std/threads/thread.c3 index ce043d38a..5103f18ec 100644 --- a/lib/std/threads/thread.c3 +++ b/lib/std/threads/thread.c3 @@ -70,6 +70,7 @@ macro void? ConditionVariable.wait(&cond, Mutex* mutex) } <* @require @assignable_to(#ms_or_duration, Duration) || @assignable_to(#ms_or_duration, ulong) + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED *> macro void? ConditionVariable.wait_timeout(&cond, Mutex* mutex, #ms_or_duration) @safemacro { @@ -80,6 +81,9 @@ macro void? ConditionVariable.wait_timeout(&cond, Mutex* mutex, #ms_or_duration) $endif } +<* + @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED +*> macro void? ConditionVariable.wait_until(&cond, Mutex* mutex, Time time) { return NativeConditionVariable.wait_until((NativeConditionVariable*)cond, (NativeMutex*)mutex, time); diff --git a/test/unit/stdlib/collections/blocking_queue.c3 b/test/unit/stdlib/collections/blocking_queue.c3 new file mode 100644 index 000000000..ca796c739 --- /dev/null +++ b/test/unit/stdlib/collections/blocking_queue.c3 @@ -0,0 +1,255 @@ +module blocking_queue_test; +import std::collections::blockingqueue; +import std::thread; +import std::time; +import std::atomic; + +alias TestQueue = LinkedBlockingQueue {int}; + +// Basic functionality tests +fn void test_init_free() @test +{ + TestQueue q; + q.tinit(10); + assert(q.is_initialized()); + assert(q.size() == 0); + assert(q.is_empty()); + q.free(); +} + +fn void test_single_thread_operations() @test +{ + TestQueue q; + q.tinit(3); // Capacity of 3 + + // Test add and remove + q.push(1); + q.push(2); + assert(q.size() == 2); + assert(!q.is_empty()); + + assert(q.pop()!! == 1); + assert(q.pop()!! == 2); + assert(q.size() == 0); + assert(q.is_empty()); + + // Test try_add and try_remove + q.try_push(3)!!; + q.try_push(4)!!; + q.try_push(5)!!; + test::@error(q.try_push(6), CAPACITY_EXCEEDED); // Should fail - queue full + + assert(q.pop()!! == 3); + assert(q.pop()!! == 4); + assert(q.pop()!! == 5); + test::@error(q.pop(), NO_MORE_ELEMENT); + + // Test peek + q.push(7); + assert(q.peek()!! == 7); + assert(q.size() == 1); // Peek shouldn't remove + q.free(); +} + +fn void test_timeout_operations() @test +{ + TestQueue q; + q.tinit(2); + + // Test remove_timeout + q.push(1); + q.push(2); + + // Should fail to add within timeout + test::@error(q.push_timeout(3, time::MS * 10), CAPACITY_EXCEEDED); // 10ms timeout + + // Should succeed to remove + assert(q.poll_timeout(time::MS * 10)!! == 1); + assert(q.poll_timeout(time::MS * 10)!! == 2); + + // Should timeout on empty queue + assert(@catch(q.poll_timeout(time::MS * 10)) == NO_MORE_ELEMENT); + + q.free(); +} + +// Multi-threaded tests +const THREAD_COUNT = 4; +const ITEMS_PER_THREAD = 40; + +fn int producer(void* arg) +{ + TestQueue* q = (TestQueue*)arg; + for (int i = 0; i < ITEMS_PER_THREAD; i++) + { + q.push(i); + } + return 0; +} + +fn int consumer(void* arg) +{ + TestQueue* q = (TestQueue*)arg; + for (int i = 0; i < ITEMS_PER_THREAD; i++) + { + q.poll(); + } + return 0; +} + +fn void test_producer_consumer() @test +{ + TestQueue q; + q.tinit(0); // Unbounded queue + + Thread[THREAD_COUNT] producers; + Thread[THREAD_COUNT] consumers; + + // Create producer threads + foreach (i, &thread: producers) + { + assert(@ok(thread.create(&producer, &q))); + } + + // Create consumer threads + foreach (i, &thread: consumers) + { + assert(@ok(thread.create(&consumer, &q))); + } + + // Wait for all threads + foreach (i, &thread: producers) + { + assert((thread.join()??1) == 0); + } + + foreach (i, &thread: consumers) + { + assert((thread.join()??1) == 0); + } + + assert(q.is_empty()); + (void) q.free(); +} + +fn int bounded_producer(void* arg) +{ + TestQueue* q = (TestQueue*)arg; + for (int i = 0; i < ITEMS_PER_THREAD; i++) + { + while (catch q.try_push(i)) + { + thread::sleep_ms(10); // Brief sleep if queue is full + } + } + return 0; +} + +fn void test_bounded_queue() @test +{ + TestQueue q; + q.tinit(10); // Small bounded queue + + Thread[THREAD_COUNT] producers; + Thread[THREAD_COUNT] consumers; + + // Create producer threads + foreach (i, &thread: producers) + { + assert(@ok(thread.create(&bounded_producer, &q))); + } + + // Create consumer threads + foreach (i, &thread: consumers) + { + assert(@ok(thread.create(&consumer, &q))); + } + + // Wait for all threads + foreach (i, &thread: producers) + { + assert((thread.join()??1) == 0); + } + + foreach (i, &thread: consumers) + { + assert((thread.join()??1) == 0); + } + + // Queue should be empty + assert(q.is_empty()); + (void) q.free(); +} +// Track produced and consumed counts +int produced = 0; +int consumed = 0; + +fn void test_timeout_operations_threaded() @test +{ + TestQueue q; + q.tinit(5); // Small bounded queue + + Thread[THREAD_COUNT] producers; + Thread[THREAD_COUNT] consumers; + + // Create threads + foreach (i, &thread: producers) + { + thread.create(fn int(void* arg) { + TestQueue* q = (TestQueue*)arg; + for (int i = 0; i < ITEMS_PER_THREAD; i++) + { + while (@catch(q.poll_timeout(time::MS * 10))) + { + // Retry if timeout occurs + } + atomic::fetch_add(&consumed, 1); + } + return 0; + }, &q)!!; + } + foreach (i, &thread: consumers) + { + thread.create(fn int(void* arg) + { + TestQueue* q = (TestQueue*)arg; + for (int i = 0; i < ITEMS_PER_THREAD; i++) + { + while (catch q.push_timeout(i, time::MS * 10)) + { + // Retry if offer fails + } + atomic::fetch_add(&produced, 1); + } + return 0; + }, &q)!!; + } + + // Wait with timeout + bool all_ok = true; + Clock timer = clock::now(); + foreach (i, &thread: producers) + { + if (timer.to_now() > time::sec(5).to_nano()) + { + all_ok = false; + break; + } + all_ok &= (thread.join() ?? 1) == 0; + } + foreach (i, &thread: consumers) + { + if (timer.to_now() > time::sec(5).to_nano()) + { + all_ok = false; + break; + } + all_ok &= (thread.join() ?? 1) == 0; + } + + // Verify counts + assert(all_ok); + assert(produced == consumed + q.size(), "Production/consumption mismatch"); + assert(q.is_empty()); + q.free(); +} \ No newline at end of file