Files
c3c/test/unit/stdlib/collections/blocking_queue.c3
Velikiy Kirill 8358af2240 Add LinkedBlockingQueue (#2328)
* Add LinkedBlockingQueue

---------

Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
2025-08-03 22:47:21 +02:00

255 lines
4.7 KiB
Plaintext

module blocking_queue_test;
import std::collections::blockingqueue;
import std::thread;
import std::time;
import std::atomic;
alias TestQueue = LinkedBlockingQueue {int};
// Basic functionality tests
fn void test_init_free() @test
{
TestQueue q;
q.tinit(10);
assert(q.is_initialized());
assert(q.size() == 0);
assert(q.is_empty());
q.free();
}
fn void test_single_thread_operations() @test
{
TestQueue q;
q.tinit(3); // Capacity of 3
// Test add and remove
q.push(1);
q.push(2);
assert(q.size() == 2);
assert(!q.is_empty());
assert(q.pop()!! == 1);
assert(q.pop()!! == 2);
assert(q.size() == 0);
assert(q.is_empty());
// Test try_add and try_remove
q.try_push(3)!!;
q.try_push(4)!!;
q.try_push(5)!!;
test::@error(q.try_push(6), CAPACITY_EXCEEDED); // Should fail - queue full
assert(q.pop()!! == 3);
assert(q.pop()!! == 4);
assert(q.pop()!! == 5);
test::@error(q.pop(), NO_MORE_ELEMENT);
// Test peek
q.push(7);
assert(q.peek()!! == 7);
assert(q.size() == 1); // Peek shouldn't remove
q.free();
}
fn void test_timeout_operations() @test
{
TestQueue q;
q.tinit(2);
// Test remove_timeout
q.push(1);
q.push(2);
// Should fail to add within timeout
test::@error(q.push_timeout(3, time::MS * 10), CAPACITY_EXCEEDED); // 10ms timeout
// Should succeed to remove
assert(q.poll_timeout(time::MS * 10)!! == 1);
assert(q.poll_timeout(time::MS * 10)!! == 2);
// Should timeout on empty queue
assert(@catch(q.poll_timeout(time::MS * 10)) == NO_MORE_ELEMENT);
q.free();
}
// Multi-threaded tests
const THREAD_COUNT = 4;
const ITEMS_PER_THREAD = 40;
fn int producer(void* arg)
{
TestQueue* q = (TestQueue*)arg;
for (int i = 0; i < ITEMS_PER_THREAD; i++)
{
q.push(i);
}
return 0;
}
fn int consumer(void* arg)
{
TestQueue* q = (TestQueue*)arg;
for (int i = 0; i < ITEMS_PER_THREAD; i++)
{
q.poll();
}
return 0;
}
fn void test_producer_consumer() @test
{
TestQueue q;
q.tinit(0); // Unbounded queue
Thread[THREAD_COUNT] producers;
Thread[THREAD_COUNT] consumers;
// Create producer threads
foreach (i, &thread: producers)
{
assert(@ok(thread.create(&producer, &q)));
}
// Create consumer threads
foreach (i, &thread: consumers)
{
assert(@ok(thread.create(&consumer, &q)));
}
// Wait for all threads
foreach (i, &thread: producers)
{
assert((thread.join()??1) == 0);
}
foreach (i, &thread: consumers)
{
assert((thread.join()??1) == 0);
}
assert(q.is_empty());
(void) q.free();
}
fn int bounded_producer(void* arg)
{
TestQueue* q = (TestQueue*)arg;
for (int i = 0; i < ITEMS_PER_THREAD; i++)
{
while (catch q.try_push(i))
{
thread::sleep_ms(10); // Brief sleep if queue is full
}
}
return 0;
}
fn void test_bounded_queue() @test
{
TestQueue q;
q.tinit(10); // Small bounded queue
Thread[THREAD_COUNT] producers;
Thread[THREAD_COUNT] consumers;
// Create producer threads
foreach (i, &thread: producers)
{
assert(@ok(thread.create(&bounded_producer, &q)));
}
// Create consumer threads
foreach (i, &thread: consumers)
{
assert(@ok(thread.create(&consumer, &q)));
}
// Wait for all threads
foreach (i, &thread: producers)
{
assert((thread.join()??1) == 0);
}
foreach (i, &thread: consumers)
{
assert((thread.join()??1) == 0);
}
// Queue should be empty
assert(q.is_empty());
(void) q.free();
}
// Track produced and consumed counts
int produced = 0;
int consumed = 0;
fn void test_timeout_operations_threaded() @test
{
TestQueue q;
q.tinit(5); // Small bounded queue
Thread[THREAD_COUNT] producers;
Thread[THREAD_COUNT] consumers;
// Create threads
foreach (i, &thread: producers)
{
thread.create(fn int(void* arg) {
TestQueue* q = (TestQueue*)arg;
for (int i = 0; i < ITEMS_PER_THREAD; i++)
{
while (@catch(q.poll_timeout(time::MS * 10)))
{
// Retry if timeout occurs
}
atomic::fetch_add(&consumed, 1);
}
return 0;
}, &q)!!;
}
foreach (i, &thread: consumers)
{
thread.create(fn int(void* arg)
{
TestQueue* q = (TestQueue*)arg;
for (int i = 0; i < ITEMS_PER_THREAD; i++)
{
while (catch q.push_timeout(i, time::MS * 10))
{
// Retry if offer fails
}
atomic::fetch_add(&produced, 1);
}
return 0;
}, &q)!!;
}
// Wait with timeout
bool all_ok = true;
Clock timer = clock::now();
foreach (i, &thread: producers)
{
if (timer.to_now() > time::sec(5).to_nano())
{
all_ok = false;
break;
}
all_ok &= (thread.join() ?? 1) == 0;
}
foreach (i, &thread: consumers)
{
if (timer.to_now() > time::sec(5).to_nano())
{
all_ok = false;
break;
}
all_ok &= (thread.join() ?? 1) == 0;
}
// Verify counts
assert(all_ok);
assert(produced == consumed + q.size(), "Production/consumption mismatch");
assert(q.is_empty());
q.free();
}