Files
c3c/lib/std/experimental/FrameScheduler.c3
Sander van den Bosch 3f20e5af1d add join for ThreadPool without destroying the threads (#2579)
* add join for ThreadPool without destroying the threads
* Make the main Thread block waiting for the worker threads to finish instead of buzy looping and do proper initialization and freeing of all variables.
* Updated test to use `atomic_store` and  take into account the maximum queue size of the threadpool.
* - Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads.
- Return of Thread/Mutex/CondVar `destroy()` is now "@maydiscard" and should be ignored. It will return void in 0.8.0.
- Return of Mutex `unlock()` and `lock()` is now "@maydiscard" and should be ignored. They will return void in 0.8.0.
- Return of ConditionVariable `signal()` `broadcast()` and `wait()` are now "@maydiscard". They will return void in 0.8.0.
- Return of Thread `detatch()` is now "@maydiscard". It will return void in 0.8.0.
- Buffered/UnbufferedChannel, and both ThreadPools have `@maydiscard` on a set of functions. They will retunr void in 0.8.0.
- Pthread bindings correctly return Errno instead of CInt.
- Return of Thread `join()` is now "@maydiscard".

---------

Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
2025-12-06 23:54:04 +01:00

95 lines
2.2 KiB
Plaintext

module std::experimental::scheduler{Event};
import std::collections, std::thread, std::time;
struct DelayedSchedulerEvent @local
{
inline Event event;
Clock execution_time;
}
fn int DelayedSchedulerEvent.compare_to(self, DelayedSchedulerEvent other) @local
{
switch
{
case self.execution_time < other.execution_time: return -1;
case self.execution_time > other.execution_time: return 1;
default: return 0;
}
}
struct FrameScheduler
{
PriorityQueue{DelayedSchedulerEvent} delayed_events;
List{Event} events;
List{Event} pending_events;
bool pending;
Mutex mtx;
}
fn void FrameScheduler.init(&self)
{
self.events.init(mem);
self.pending_events.init(mem);
self.delayed_events.init(mem);
(void)self.mtx.init();
bool pending;
}
macro void FrameScheduler.@destroy(&self; @destruct(Event e))
{
foreach (e : self.events) @destruct(e);
foreach (e : self.pending_events) @destruct(e);
foreach (e : self.delayed_events.heap) @destruct(e.event);
self.events.free();
self.pending_events.free();
self.delayed_events.free();
self.mtx.destroy();
}
fn void FrameScheduler.queue_delayed_event(&self, Event event, Duration delay)
{
self.mtx.@in_lock()
{
self.delayed_events.push({ event, clock::now().add_duration(delay)});
@atomic_store(self.pending, true);
};
}
fn bool FrameScheduler.has_delayed(&self)
{
self.mtx.@in_lock()
{
return @ok(self.delayed_events.first());
};
}
fn void FrameScheduler.queue_event(&self, Event event)
{
self.mtx.@in_lock()
{
self.pending_events.push(event);
@atomic_store(self.pending, true);
};
}
fn Event? FrameScheduler.pop_event(&self)
{
while (true)
{
if (try event = self.events.pop()) return event;
if (!@atomic_load(self.pending)) return NO_MORE_ELEMENT?;
self.mtx.@in_lock()
{
self.events.add_all(&self.pending_events);
self.pending_events.clear();
Clock c = clock::now();
while (try top = self.delayed_events.first())
{
if (top.execution_time > c) break;
self.events.push(self.delayed_events.pop()!!);
}
@atomic_store(self.pending, self.delayed_events.len() > 0);
if (!self.events.len()) return NO_MORE_ELEMENT?;
};
}
}