mirror of
https://github.com/c3lang/c3c.git
synced 2026-02-27 12:01:16 +00:00
Add LinkedBlockingQueue (#2328)
* Add LinkedBlockingQueue --------- Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
This commit is contained in:
255
test/unit/stdlib/collections/blocking_queue.c3
Normal file
255
test/unit/stdlib/collections/blocking_queue.c3
Normal file
@@ -0,0 +1,255 @@
|
||||
module blocking_queue_test;
|
||||
import std::collections::blockingqueue;
|
||||
import std::thread;
|
||||
import std::time;
|
||||
import std::atomic;
|
||||
|
||||
alias TestQueue = LinkedBlockingQueue {int};
|
||||
|
||||
// Basic functionality tests
|
||||
fn void test_init_free() @test
|
||||
{
|
||||
TestQueue q;
|
||||
q.tinit(10);
|
||||
assert(q.is_initialized());
|
||||
assert(q.size() == 0);
|
||||
assert(q.is_empty());
|
||||
q.free();
|
||||
}
|
||||
|
||||
fn void test_single_thread_operations() @test
|
||||
{
|
||||
TestQueue q;
|
||||
q.tinit(3); // Capacity of 3
|
||||
|
||||
// Test add and remove
|
||||
q.push(1);
|
||||
q.push(2);
|
||||
assert(q.size() == 2);
|
||||
assert(!q.is_empty());
|
||||
|
||||
assert(q.pop()!! == 1);
|
||||
assert(q.pop()!! == 2);
|
||||
assert(q.size() == 0);
|
||||
assert(q.is_empty());
|
||||
|
||||
// Test try_add and try_remove
|
||||
q.try_push(3)!!;
|
||||
q.try_push(4)!!;
|
||||
q.try_push(5)!!;
|
||||
test::@error(q.try_push(6), CAPACITY_EXCEEDED); // Should fail - queue full
|
||||
|
||||
assert(q.pop()!! == 3);
|
||||
assert(q.pop()!! == 4);
|
||||
assert(q.pop()!! == 5);
|
||||
test::@error(q.pop(), NO_MORE_ELEMENT);
|
||||
|
||||
// Test peek
|
||||
q.push(7);
|
||||
assert(q.peek()!! == 7);
|
||||
assert(q.size() == 1); // Peek shouldn't remove
|
||||
q.free();
|
||||
}
|
||||
|
||||
fn void test_timeout_operations() @test
|
||||
{
|
||||
TestQueue q;
|
||||
q.tinit(2);
|
||||
|
||||
// Test remove_timeout
|
||||
q.push(1);
|
||||
q.push(2);
|
||||
|
||||
// Should fail to add within timeout
|
||||
test::@error(q.push_timeout(3, time::MS * 10), CAPACITY_EXCEEDED); // 10ms timeout
|
||||
|
||||
// Should succeed to remove
|
||||
assert(q.poll_timeout(time::MS * 10)!! == 1);
|
||||
assert(q.poll_timeout(time::MS * 10)!! == 2);
|
||||
|
||||
// Should timeout on empty queue
|
||||
assert(@catch(q.poll_timeout(time::MS * 10)) == NO_MORE_ELEMENT);
|
||||
|
||||
q.free();
|
||||
}
|
||||
|
||||
// Multi-threaded tests
|
||||
const THREAD_COUNT = 4;
|
||||
const ITEMS_PER_THREAD = 40;
|
||||
|
||||
fn int producer(void* arg)
|
||||
{
|
||||
TestQueue* q = (TestQueue*)arg;
|
||||
for (int i = 0; i < ITEMS_PER_THREAD; i++)
|
||||
{
|
||||
q.push(i);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
fn int consumer(void* arg)
|
||||
{
|
||||
TestQueue* q = (TestQueue*)arg;
|
||||
for (int i = 0; i < ITEMS_PER_THREAD; i++)
|
||||
{
|
||||
q.poll();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
fn void test_producer_consumer() @test
|
||||
{
|
||||
TestQueue q;
|
||||
q.tinit(0); // Unbounded queue
|
||||
|
||||
Thread[THREAD_COUNT] producers;
|
||||
Thread[THREAD_COUNT] consumers;
|
||||
|
||||
// Create producer threads
|
||||
foreach (i, &thread: producers)
|
||||
{
|
||||
assert(@ok(thread.create(&producer, &q)));
|
||||
}
|
||||
|
||||
// Create consumer threads
|
||||
foreach (i, &thread: consumers)
|
||||
{
|
||||
assert(@ok(thread.create(&consumer, &q)));
|
||||
}
|
||||
|
||||
// Wait for all threads
|
||||
foreach (i, &thread: producers)
|
||||
{
|
||||
assert((thread.join()??1) == 0);
|
||||
}
|
||||
|
||||
foreach (i, &thread: consumers)
|
||||
{
|
||||
assert((thread.join()??1) == 0);
|
||||
}
|
||||
|
||||
assert(q.is_empty());
|
||||
(void) q.free();
|
||||
}
|
||||
|
||||
fn int bounded_producer(void* arg)
|
||||
{
|
||||
TestQueue* q = (TestQueue*)arg;
|
||||
for (int i = 0; i < ITEMS_PER_THREAD; i++)
|
||||
{
|
||||
while (catch q.try_push(i))
|
||||
{
|
||||
thread::sleep_ms(10); // Brief sleep if queue is full
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
fn void test_bounded_queue() @test
|
||||
{
|
||||
TestQueue q;
|
||||
q.tinit(10); // Small bounded queue
|
||||
|
||||
Thread[THREAD_COUNT] producers;
|
||||
Thread[THREAD_COUNT] consumers;
|
||||
|
||||
// Create producer threads
|
||||
foreach (i, &thread: producers)
|
||||
{
|
||||
assert(@ok(thread.create(&bounded_producer, &q)));
|
||||
}
|
||||
|
||||
// Create consumer threads
|
||||
foreach (i, &thread: consumers)
|
||||
{
|
||||
assert(@ok(thread.create(&consumer, &q)));
|
||||
}
|
||||
|
||||
// Wait for all threads
|
||||
foreach (i, &thread: producers)
|
||||
{
|
||||
assert((thread.join()??1) == 0);
|
||||
}
|
||||
|
||||
foreach (i, &thread: consumers)
|
||||
{
|
||||
assert((thread.join()??1) == 0);
|
||||
}
|
||||
|
||||
// Queue should be empty
|
||||
assert(q.is_empty());
|
||||
(void) q.free();
|
||||
}
|
||||
// Track produced and consumed counts
|
||||
int produced = 0;
|
||||
int consumed = 0;
|
||||
|
||||
fn void test_timeout_operations_threaded() @test
|
||||
{
|
||||
TestQueue q;
|
||||
q.tinit(5); // Small bounded queue
|
||||
|
||||
Thread[THREAD_COUNT] producers;
|
||||
Thread[THREAD_COUNT] consumers;
|
||||
|
||||
// Create threads
|
||||
foreach (i, &thread: producers)
|
||||
{
|
||||
thread.create(fn int(void* arg) {
|
||||
TestQueue* q = (TestQueue*)arg;
|
||||
for (int i = 0; i < ITEMS_PER_THREAD; i++)
|
||||
{
|
||||
while (@catch(q.poll_timeout(time::MS * 10)))
|
||||
{
|
||||
// Retry if timeout occurs
|
||||
}
|
||||
atomic::fetch_add(&consumed, 1);
|
||||
}
|
||||
return 0;
|
||||
}, &q)!!;
|
||||
}
|
||||
foreach (i, &thread: consumers)
|
||||
{
|
||||
thread.create(fn int(void* arg)
|
||||
{
|
||||
TestQueue* q = (TestQueue*)arg;
|
||||
for (int i = 0; i < ITEMS_PER_THREAD; i++)
|
||||
{
|
||||
while (catch q.push_timeout(i, time::MS * 10))
|
||||
{
|
||||
// Retry if offer fails
|
||||
}
|
||||
atomic::fetch_add(&produced, 1);
|
||||
}
|
||||
return 0;
|
||||
}, &q)!!;
|
||||
}
|
||||
|
||||
// Wait with timeout
|
||||
bool all_ok = true;
|
||||
Clock timer = clock::now();
|
||||
foreach (i, &thread: producers)
|
||||
{
|
||||
if (timer.to_now() > time::sec(5).to_nano())
|
||||
{
|
||||
all_ok = false;
|
||||
break;
|
||||
}
|
||||
all_ok &= (thread.join() ?? 1) == 0;
|
||||
}
|
||||
foreach (i, &thread: consumers)
|
||||
{
|
||||
if (timer.to_now() > time::sec(5).to_nano())
|
||||
{
|
||||
all_ok = false;
|
||||
break;
|
||||
}
|
||||
all_ok &= (thread.join() ?? 1) == 0;
|
||||
}
|
||||
|
||||
// Verify counts
|
||||
assert(all_ok);
|
||||
assert(produced == consumed + q.size(), "Production/consumption mismatch");
|
||||
assert(q.is_empty());
|
||||
q.free();
|
||||
}
|
||||
Reference in New Issue
Block a user