add join for ThreadPool without destroying the threads (#2579)

* add join for ThreadPool without destroying the threads
* Make the main Thread block waiting for the worker threads to finish instead of buzy looping and do proper initialization and freeing of all variables.
* Updated test to use `atomic_store` and  take into account the maximum queue size of the threadpool.
* - Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads.
- Return of Thread/Mutex/CondVar `destroy()` is now "@maydiscard" and should be ignored. It will return void in 0.8.0.
- Return of Mutex `unlock()` and `lock()` is now "@maydiscard" and should be ignored. They will return void in 0.8.0.
- Return of ConditionVariable `signal()` `broadcast()` and `wait()` are now "@maydiscard". They will return void in 0.8.0.
- Return of Thread `detatch()` is now "@maydiscard". It will return void in 0.8.0.
- Buffered/UnbufferedChannel, and both ThreadPools have `@maydiscard` on a set of functions. They will retunr void in 0.8.0.
- Pthread bindings correctly return Errno instead of CInt.
- Return of Thread `join()` is now "@maydiscard".

---------

Co-authored-by: Christoffer Lerno <christoffer@aegik.com>
This commit is contained in:
Sander van den Bosch
2025-12-06 23:54:04 +01:00
committed by GitHub
parent 18e2838772
commit 3f20e5af1d
17 changed files with 470 additions and 370 deletions

View File

@@ -70,9 +70,9 @@ fn void LinkedBlockingQueue.free(&self)
}
};
(void)self.lock.destroy();
(void)self.not_empty.destroy();
(void)self.not_full.destroy();
self.lock.destroy();
self.not_empty.destroy();
self.not_full.destroy();
}
fn void LinkedBlockingQueue.link_entry(&self, QueueEntry* entry) @private
@@ -126,7 +126,7 @@ fn void LinkedBlockingQueue.push(&self, Value value)
{
while (self.capacity > 0 && self.count >= self.capacity)
{
self.not_full.wait(&self.lock)!!;
self.not_full.wait(&self.lock);
}
QueueEntry* entry = allocator::new(self.allocator, QueueEntry, {
@@ -137,7 +137,7 @@ fn void LinkedBlockingQueue.push(&self, Value value)
self.link_entry(entry);
// Signal that queue is no longer empty
self.not_empty.signal()!!;
self.not_empty.signal();
};
}
@@ -153,7 +153,7 @@ fn Value LinkedBlockingQueue.poll(&self)
{
while (self.count == 0)
{
self.not_empty.wait(&self.lock)!!;
self.not_empty.wait(&self.lock);
}
QueueEntry* entry = self.unlink_head();
@@ -161,7 +161,7 @@ fn Value LinkedBlockingQueue.poll(&self)
allocator::free(self.allocator, entry);
if (self.capacity > 0)
{
self.not_full.signal()!!;
self.not_full.signal();
}
return value;
};
@@ -186,7 +186,7 @@ fn Value? LinkedBlockingQueue.pop(&self)
if (self.capacity > 0)
{
self.not_full.signal()!!;
self.not_full.signal();
}
return value;
};
@@ -224,7 +224,7 @@ fn Value? LinkedBlockingQueue.poll_timeout(&self, Duration timeout)
// Must signal not_full after removing an item
if (self.capacity > 0)
{
self.not_full.signal()!!;
self.not_full.signal();
}
return value;
};
@@ -274,7 +274,7 @@ fn void? LinkedBlockingQueue.try_push(&self, Value value)
.prev = null
});
self.link_entry(entry);
self.not_empty.signal()!!;
self.not_empty.signal();
};
}
@@ -308,7 +308,7 @@ fn void? LinkedBlockingQueue.push_timeout(&self, Value value, Duration timeout)
.prev = null
});
self.link_entry(entry);
self.not_empty.signal()!!;
self.not_empty.signal();
};
}

View File

@@ -148,10 +148,11 @@ fn void call_log_internal(LogPriority prio, LogCategory category, String file, S
LogPriority priority = mem::@atomic_load(config_priorities[category], UNORDERED);
if (priority > prio) return;
init();
bool locked = logger_mutex.is_initialized() && @ok(logger_mutex.lock());
bool locked = logger_mutex.is_initialized();
if (locked) logger_mutex.lock();
Logger logger = current_logger;
LogFn logfn = current_logfn;
defer if (locked) (void)logger_mutex.unlock();
defer if (locked) logger_mutex.unlock();
logfn(logger.ptr, prio, category, current_tag, file, func, line, fmt, args);
}

View File

@@ -43,7 +43,7 @@ macro void FrameScheduler.@destroy(&self; @destruct(Event e))
self.events.free();
self.pending_events.free();
self.delayed_events.free();
(void)self.mtx.destroy();
self.mtx.destroy();
}
fn void FrameScheduler.queue_delayed_event(&self, Event event, Duration delay)

View File

@@ -9,80 +9,80 @@ const PTHREAD_MUTEX_RECURSIVE = env::LINUX ? 1 : 2;
alias PosixThreadFn = fn void*(void*);
typedef Pthread_t = void*;
extern fn CInt pthread_create(Pthread_t*, Pthread_attr_t*, PosixThreadFn, void*);
extern fn CInt pthread_cancel(Pthread_t*);
extern fn CInt pthread_detach(Pthread_t);
extern fn CInt pthread_equal(Pthread_t this, Pthread_t other);
extern fn Errno pthread_create(Pthread_t*, Pthread_attr_t*, PosixThreadFn, void*);
extern fn Errno pthread_cancel(Pthread_t*);
extern fn Errno pthread_detach(Pthread_t);
extern fn Errno pthread_equal(Pthread_t this, Pthread_t other);
extern fn void pthread_exit(void* value_ptr) @noreturn;
extern fn CInt pthread_join(Pthread_t, void** value_ptr);
extern fn CInt pthread_kill(Pthread_t, CInt sig);
extern fn Errno pthread_join(Pthread_t, void** value_ptr);
extern fn Errno pthread_kill(Pthread_t, CInt sig);
extern fn void pthread_once(Pthread_once_t*, OnceFn);
extern fn Pthread_t pthread_self();
extern fn CInt pthread_setcancelstate(CInt state, CInt* oldstate);
extern fn CInt pthread_setcanceltype(CInt type, CInt* oldtype);
extern fn CInt pthread_testcancel();
extern fn Errno pthread_setcancelstate(CInt state, CInt* oldstate);
extern fn Errno pthread_setcanceltype(CInt type, CInt* oldtype);
extern fn Errno pthread_testcancel();
extern fn CInt pthread_attr_destroy(Pthread_attr_t*);
extern fn CInt pthread_attr_getinheritsched(Pthread_attr_t*, CInt*);
extern fn CInt pthread_attr_getschedparam(Pthread_attr_t*, Pthread_sched_param*);
extern fn CInt pthread_attr_getschedpolicy(Pthread_attr_t*, CInt*);
extern fn CInt pthread_attr_getscope(Pthread_attr_t*, CInt*);
extern fn CInt pthread_attr_getstacksize(Pthread_attr_t*, usz*);
extern fn CInt pthread_attr_getstackaddr(Pthread_attr_t*, void**);
extern fn CInt pthread_attr_getdetachstate(Pthread_attr_t*, CInt*);
extern fn CInt pthread_attr_init(Pthread_attr_t*);
extern fn CInt pthread_attr_setinheritsched(Pthread_attr_t*, CInt);
extern fn CInt pthread_attr_setschedparam(Pthread_attr_t*, Pthread_sched_param*);
extern fn CInt pthread_attr_setschedpolicy(Pthread_attr_t*, CInt);
extern fn CInt pthread_attr_setscope(Pthread_attr_t*, CInt);
extern fn CInt pthread_attr_setstacksize(Pthread_attr_t*, usz);
extern fn CInt pthread_attr_setstackaddr(Pthread_attr_t*, void*);
extern fn CInt pthread_attr_setdetachstate(Pthread_attr_t*, CInt);
extern fn Errno pthread_attr_destroy(Pthread_attr_t*);
extern fn Errno pthread_attr_getinheritsched(Pthread_attr_t*, CInt*);
extern fn Errno pthread_attr_getschedparam(Pthread_attr_t*, Pthread_sched_param*);
extern fn Errno pthread_attr_getschedpolicy(Pthread_attr_t*, CInt*);
extern fn Errno pthread_attr_getscope(Pthread_attr_t*, CInt*);
extern fn Errno pthread_attr_getstacksize(Pthread_attr_t*, usz*);
extern fn Errno pthread_attr_getstackaddr(Pthread_attr_t*, void**);
extern fn Errno pthread_attr_getdetachstate(Pthread_attr_t*, CInt*);
extern fn Errno pthread_attr_init(Pthread_attr_t*);
extern fn Errno pthread_attr_setinheritsched(Pthread_attr_t*, CInt);
extern fn Errno pthread_attr_setschedparam(Pthread_attr_t*, Pthread_sched_param*);
extern fn Errno pthread_attr_setschedpolicy(Pthread_attr_t*, CInt);
extern fn Errno pthread_attr_setscope(Pthread_attr_t*, CInt);
extern fn Errno pthread_attr_setstacksize(Pthread_attr_t*, usz);
extern fn Errno pthread_attr_setstackaddr(Pthread_attr_t*, void*);
extern fn Errno pthread_attr_setdetachstate(Pthread_attr_t*, CInt);
extern fn CInt pthread_mutexattr_destroy(Pthread_mutexattr_t*);
extern fn CInt pthread_mutexattr_getprioceiling(Pthread_mutexattr_t*, CInt*);
extern fn CInt pthread_mutexattr_getprotocol(Pthread_mutexattr_t*, CInt*);
extern fn CInt pthread_mutexattr_gettype(Pthread_mutexattr_t*, CInt*);
extern fn CInt pthread_mutexattr_init(Pthread_mutexattr_t*);
extern fn CInt pthread_mutexattr_setprioceiling(Pthread_mutexattr_t*, CInt);
extern fn CInt pthread_mutexattr_setprotocol(Pthread_mutexattr_t*, CInt);
extern fn CInt pthread_mutexattr_settype(Pthread_mutexattr_t*, CInt);
extern fn Errno pthread_mutexattr_destroy(Pthread_mutexattr_t*);
extern fn Errno pthread_mutexattr_getprioceiling(Pthread_mutexattr_t*, CInt*);
extern fn Errno pthread_mutexattr_getprotocol(Pthread_mutexattr_t*, CInt*);
extern fn Errno pthread_mutexattr_gettype(Pthread_mutexattr_t*, CInt*);
extern fn Errno pthread_mutexattr_init(Pthread_mutexattr_t*);
extern fn Errno pthread_mutexattr_setprioceiling(Pthread_mutexattr_t*, CInt);
extern fn Errno pthread_mutexattr_setprotocol(Pthread_mutexattr_t*, CInt);
extern fn Errno pthread_mutexattr_settype(Pthread_mutexattr_t*, CInt);
extern fn CInt pthread_mutex_destroy(Pthread_mutex_t*);
extern fn CInt pthread_mutex_init(Pthread_mutex_t*, Pthread_mutexattr_t*);
extern fn Errno pthread_mutex_destroy(Pthread_mutex_t*);
extern fn Errno pthread_mutex_init(Pthread_mutex_t*, Pthread_mutexattr_t*);
extern fn Errno pthread_mutex_lock(Pthread_mutex_t*);
extern fn Errno pthread_mutex_trylock(Pthread_mutex_t*);
extern fn Errno pthread_mutex_unlock(Pthread_mutex_t*);
extern fn CInt pthread_condattr_destroy(Pthread_condattr_t*);
extern fn CInt pthread_condattr_init(Pthread_condattr_t*);
extern fn Errno pthread_condattr_destroy(Pthread_condattr_t*);
extern fn Errno pthread_condattr_init(Pthread_condattr_t*);
extern fn CInt pthread_cond_broadcast(Pthread_cond_t*);
extern fn CInt pthread_cond_destroy(Pthread_cond_t*);
extern fn CInt pthread_cond_init(Pthread_cond_t*, Pthread_condattr_t*);
extern fn CInt pthread_cond_signal(Pthread_cond_t*);
extern fn CInt pthread_cond_timedwait(Pthread_cond_t*, Pthread_mutex_t*, TimeSpec*);
extern fn CInt pthread_cond_wait(Pthread_cond_t*, Pthread_mutex_t*);
extern fn Errno pthread_cond_broadcast(Pthread_cond_t*);
extern fn Errno pthread_cond_destroy(Pthread_cond_t*);
extern fn Errno pthread_cond_init(Pthread_cond_t*, Pthread_condattr_t*);
extern fn Errno pthread_cond_signal(Pthread_cond_t*);
extern fn Errno pthread_cond_timedwait(Pthread_cond_t*, Pthread_mutex_t*, TimeSpec*);
extern fn Errno pthread_cond_wait(Pthread_cond_t*, Pthread_mutex_t*);
extern fn CInt pthread_rwlock_destroy(Pthread_rwlock_t*);
extern fn CInt pthread_rwlock_init(Pthread_rwlock_t*, Pthread_rwlockattr_t*);
extern fn CInt pthread_rwlock_rdlock(Pthread_rwlock_t*);
extern fn CInt pthread_rwlock_tryrdlock(Pthread_rwlock_t*);
extern fn CInt pthread_rwlock_trywrlock(Pthread_rwlock_t*);
extern fn CInt pthread_rwlock_unlock(Pthread_rwlock_t*);
extern fn CInt pthread_rwlock_wrlock(Pthread_rwlock_t*);
extern fn Errno pthread_rwlock_destroy(Pthread_rwlock_t*);
extern fn Errno pthread_rwlock_init(Pthread_rwlock_t*, Pthread_rwlockattr_t*);
extern fn Errno pthread_rwlock_rdlock(Pthread_rwlock_t*);
extern fn Errno pthread_rwlock_tryrdlock(Pthread_rwlock_t*);
extern fn Errno pthread_rwlock_trywrlock(Pthread_rwlock_t*);
extern fn Errno pthread_rwlock_unlock(Pthread_rwlock_t*);
extern fn Errno pthread_rwlock_wrlock(Pthread_rwlock_t*);
extern fn CInt pthread_rwlockattr_destroy(Pthread_rwlockattr_t*);
extern fn CInt pthread_rwlockattr_getpshared(Pthread_rwlockattr_t*, CInt*);
extern fn CInt pthread_rwlockattr_init(Pthread_rwlockattr_t*);
extern fn CInt pthread_rwlockattr_setpshared(Pthread_rwlockattr_t*, CInt);
extern fn Errno pthread_rwlockattr_destroy(Pthread_rwlockattr_t*);
extern fn Errno pthread_rwlockattr_getpshared(Pthread_rwlockattr_t*, CInt*);
extern fn Errno pthread_rwlockattr_init(Pthread_rwlockattr_t*);
extern fn Errno pthread_rwlockattr_setpshared(Pthread_rwlockattr_t*, CInt);
extern fn CInt pthread_key_create(Pthread_key_t*, PosixThreadFn routine);
extern fn CInt pthread_key_delete(Pthread_key_t);
extern fn Errno pthread_key_create(Pthread_key_t*, PosixThreadFn routine);
extern fn Errno pthread_key_delete(Pthread_key_t);
extern fn void* pthread_getspecific(Pthread_key_t);
extern fn CInt pthread_setspecific(Pthread_key_t, void* value_ptr);
extern fn Errno pthread_setspecific(Pthread_key_t, void* value_ptr);
extern fn CInt pthread_atfork(OnceFn prepare, OnceFn parent, OnceFn child);
extern fn Errno pthread_atfork(OnceFn prepare, OnceFn parent, OnceFn child);
extern fn void pthread_cleanup_pop(CInt execute);
extern fn void pthread_cleanup_push(PosixThreadFn routine, void* routine_arg);

View File

@@ -35,41 +35,40 @@ fn void? BufferedChannel.init(&self, Allocator allocator, usz size = 1)
channel.read_waiting = 0;
channel.mu.init()!;
defer catch (void)channel.mu.destroy();
defer catch channel.mu.destroy();
channel.send_cond.init()!;
defer catch (void)channel.send_cond.destroy();
defer catch channel.send_cond.destroy();
channel.read_cond.init()!;
defer catch (void)channel.read_cond.destroy();
defer catch channel.read_cond.destroy();
*self = (BufferedChannel)channel;
}
fn void? BufferedChannel.destroy(&self)
fn void? BufferedChannel.destroy(&self) @maydiscard // Remove optional in 0.8.0
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
fault err = @catch(channel.mu.destroy());
err = @catch(channel.send_cond.destroy()) ?: err;
err = @catch(channel.read_cond.destroy()) ?: err;
channel.mu.destroy();
channel.send_cond.destroy();
channel.read_cond.destroy();
allocator::free(channel.allocator, channel);
*self = null;
if (err) return err?;
}
fn void? BufferedChannel.push(self, Type val)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
channel.mu.lock()!;
defer catch (void)channel.mu.unlock();
channel.mu.lock();
defer catch channel.mu.unlock();
// if channel is full -> wait
while (channel.elems == channel.size && !channel.closed)
{
channel.send_waiting++;
channel.send_cond.wait(&channel.mu)!;
channel.send_cond.wait(&channel.mu);
channel.send_waiting--;
}
@@ -92,24 +91,24 @@ fn void? BufferedChannel.push(self, Type val)
// if someone is waiting -> awake him
if (channel.read_waiting > 0)
{
channel.read_cond.signal()!;
channel.read_cond.signal();
}
channel.mu.unlock()!;
channel.mu.unlock();
}
fn Type? BufferedChannel.pop(self)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
channel.mu.lock()!;
defer catch (void)channel.mu.unlock();
channel.mu.lock();
defer catch channel.mu.unlock();
// if chan is empty -> wait for sender
while (channel.elems == 0 && !channel.closed)
{
channel.read_waiting++;
channel.read_cond.wait(&channel.mu)!;
channel.read_cond.wait(&channel.mu);
channel.read_waiting--;
}
@@ -135,26 +134,25 @@ fn Type? BufferedChannel.pop(self)
// if someone is waiting -> awake him
if (channel.send_waiting > 0)
{
channel.send_cond.signal()!;
channel.send_cond.signal();
}
channel.mu.unlock()!;
channel.mu.unlock();
return ret;
}
fn void? BufferedChannel.close(self)
fn void? BufferedChannel.close(self) @maydiscard // Remove optional in 0.8.0
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;
fault err = @catch(channel.mu.lock());
channel.mu.lock();
channel.closed = true;
err = @catch(channel.read_cond.broadcast()) ?: err;
err = @catch(channel.send_cond.broadcast()) ?: err;
err = @catch(channel.mu.unlock()) ?: err;
channel.read_cond.broadcast();
channel.send_cond.broadcast();
channel.mu.unlock();
if (err) return err?;
}

View File

@@ -13,14 +13,17 @@ struct FixedThreadPool
Mutex mu;
QueueItem[] queue;
usz qindex;
usz qworking;
usz num_threads;
bitstruct : char {
bool initialized;
bool stop;
bool stop_now;
bool joining;
}
Thread[] pool;
ConditionVariable notify;
ConditionVariable collect;
}
struct QueueItem @private
@@ -46,7 +49,11 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0)
.pool = mem::new_array(Thread, threads)
};
self.mu.init()!;
defer catch self.mu.destroy();
self.notify.init()!;
defer catch self.notify.destroy();
self.collect.init()!;
defer catch self.collect.destroy();
foreach (&thread : self.pool)
{
thread.create(&process_work, self)!;
@@ -55,6 +62,22 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0)
}
}
<*
Join all threads in the pool.
*>
fn void? FixedThreadPool.join(&self) @maydiscard // Remove optional in 0.8.0
{
if (self.initialized)
{
self.mu.lock();
defer self.mu.unlock();
self.joining = true;
self.notify.broadcast();
self.collect.wait(&self.mu);
self.joining = false;
}
}
<*
Stop all the threads and cleanup the pool.
Any pending work will be dropped.
@@ -68,37 +91,37 @@ fn void? FixedThreadPool.destroy(&self)
Stop all the threads and cleanup the pool.
Any pending work will be processed.
*>
fn void? FixedThreadPool.stop_and_destroy(&self)
fn void? FixedThreadPool.stop_and_destroy(&self) @maydiscard // Remove optional in 0.8.0
{
return self.@shutdown(self.stop);
self.@shutdown(self.stop);
}
macro void? FixedThreadPool.@shutdown(&self, #stop) @private
macro void FixedThreadPool.@shutdown(&self, #stop) @private
{
if (self.initialized)
{
self.mu.lock()!;
self.mu.lock();
#stop = true;
self.notify.broadcast()!;
self.mu.unlock()!;
self.notify.broadcast();
self.mu.unlock();
// Wait for all threads to shutdown.
while (true)
{
self.mu.lock()!;
defer self.mu.unlock()!!;
if (self.num_threads == 0)
{
break;
}
self.notify.signal()!;
self.mu.lock();
defer self.mu.unlock();
if (self.num_threads == 0) break;
self.notify.signal();
}
self.mu.destroy()!;
self.collect.destroy();
self.notify.destroy();
self.mu.destroy();
self.initialized = false;
while (self.qindex)
{
free_qitem(self.queue[--self.qindex]);
}
free(self.queue);
free(self.pool);
self.queue = {};
}
}
@@ -109,8 +132,8 @@ macro void? FixedThreadPool.@shutdown(&self, #stop) @private
*>
fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...)
{
self.mu.lock()!;
defer self.mu.unlock()!!;
self.mu.lock();
defer self.mu.unlock();
if (self.qindex == self.queue.len) return thread::THREAD_QUEUE_FULL?;
any[] data;
if (args.len)
@@ -120,12 +143,13 @@ fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...)
}
self.queue[self.qindex] = { .func = func, .args = data };
self.qindex++;
self.qworking++;
defer catch
{
free_qitem(self.queue[--self.qindex]);
}
// Notify the threads that work is available.
self.notify.broadcast()!;
self.notify.broadcast();
}
fn int process_work(void* self_arg) @private
@@ -133,39 +157,43 @@ fn int process_work(void* self_arg) @private
FixedThreadPool* self = self_arg;
while (true)
{
self.mu.lock()!!;
self.mu.lock();
if (self.stop_now)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
self.mu.unlock();
return 0;
}
// Wait for work.
while (self.qindex == 0)
{
if (self.joining && self.qworking == 0) self.collect.signal();
if (self.stop)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
self.mu.unlock();
return 0;
}
self.notify.wait(&self.mu)!!;
self.notify.wait(&self.mu);
if (self.stop_now)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
self.mu.unlock();
return 0;
}
}
// Process the job.
self.qindex--;
QueueItem item = self.queue[self.qindex];
self.mu.unlock()!!;
defer free_qitem(item);
self.mu.unlock();
item.func(item.args);
defer free_qitem(item);
self.mu.lock();
self.qworking--;
self.mu.unlock();
}
}

View File

@@ -22,6 +22,6 @@ fn bool NativeMutex.is_initialized(&self)
return false;
}
macro void? NativeMutex.lock(&mutex) => NOT_IMPLEMENTED?;
macro bool NativeMutex.try_lock(&mutex) => NOT_IMPLEMENTED?;
macro void? NativeMutex.unlock(&mutex) => NOT_IMPLEMENTED?;
macro void NativeMutex.lock(&mutex) => NOT_IMPLEMENTED?!!;
macro bool NativeMutex.try_lock(&mutex) => NOT_IMPLEMENTED?!!;
macro void NativeMutex.unlock(&mutex) => NOT_IMPLEMENTED?!!;

View File

@@ -1,5 +1,9 @@
module std::thread::os @if(env::POSIX);
import std::os::posix, std::time, libc;
import libc::errno;
import std::thread;
struct NativeMutex
{
@@ -53,18 +57,24 @@ fn bool NativeMutex.is_initialized(&self)
@require self.is_initialized() : "Mutex was not initialized"
@ensure !self.is_initialized()
*>
fn void? NativeMutex.destroy(&self)
fn void NativeMutex.destroy(&self)
{
if (posix::pthread_mutex_destroy(&self.mutex)) return thread::DESTROY_FAILED?;
if (Errno err = posix::pthread_mutex_destroy(&self.mutex)) abort("Error destroying mutex: %d", err);
*self = {};
}
<*
@require self.is_initialized() : "Mutex was not initialized"
*>
fn void? NativeMutex.lock(&self)
fn void NativeMutex.lock(&self)
{
if (posix::pthread_mutex_lock(&self.mutex)) return thread::LOCK_FAILED?;
switch (posix::pthread_mutex_lock(&self.mutex))
{
case errno::EINVAL: unreachable("Mutex invalid");
case errno::EDEADLK: abort("Mutex deadlock");
case errno::OK: return;
default: unreachable("Unexpected error in lock");
}
}
<*
@@ -78,7 +88,7 @@ fn void? NativeMutex.lock_timeout(&self, ulong ms)
{
if (!ms) break;
ulong sleep = min(5, ms);
if (!libc::nanosleep(&&time::ms(ms).to_timespec(), null)) return thread::LOCK_FAILED?;
if (!libc::nanosleep(&&time::ms(ms).to_timespec(), null)) return thread::LOCK_TIMEOUT?;
ms -= sleep;
}
switch (result)
@@ -89,7 +99,7 @@ fn void? NativeMutex.lock_timeout(&self, ulong ms)
case errno::ETIMEDOUT:
return thread::LOCK_TIMEOUT?;
default:
return thread::LOCK_FAILED?;
return unreachable("Invalid lock %d", result);
}
}
@@ -104,9 +114,9 @@ fn bool NativeMutex.try_lock(&self)
<*
@require self.is_initialized() : "Mutex was not initialized"
*>
fn void? NativeMutex.unlock(&self)
fn void NativeMutex.unlock(&self)
{
if (posix::pthread_mutex_unlock(&self.mutex)) return thread::UNLOCK_FAILED?;
if (Errno err = posix::pthread_mutex_unlock(&self.mutex)) abort("Failed to unlock mutex: %d", err);
}
fn void? NativeConditionVariable.init(&cond)
@@ -114,32 +124,32 @@ fn void? NativeConditionVariable.init(&cond)
if (posix::pthread_cond_init(cond, null)) return thread::INIT_FAILED?;
}
fn void? NativeConditionVariable.destroy(&cond)
fn void NativeConditionVariable.destroy(&cond)
{
if (posix::pthread_cond_destroy(cond)) return thread::DESTROY_FAILED?;
if (Errno err = posix::pthread_cond_destroy(cond)) abort("Failed to destroy pthread_cond %d", err);
}
fn void? NativeConditionVariable.signal(&cond)
fn void NativeConditionVariable.signal(&cond)
{
if (posix::pthread_cond_signal(cond)) return thread::SIGNAL_FAILED?;
if (Errno err = posix::pthread_cond_signal(cond)) abort("Failed to signal %d", err);
}
fn void? NativeConditionVariable.broadcast(&cond)
fn void NativeConditionVariable.broadcast(&cond)
{
if (posix::pthread_cond_broadcast(cond)) return thread::SIGNAL_FAILED?;
if (Errno err = posix::pthread_cond_broadcast(cond)) abort("Failed to broadcast %d", err);
}
<*
@require mtx.is_initialized()
*>
fn void? NativeConditionVariable.wait(&cond, NativeMutex* mtx)
fn void NativeConditionVariable.wait(&cond, NativeMutex* mtx)
{
if (posix::pthread_cond_wait(cond, &mtx.mutex)) return thread::WAIT_FAILED?;
if (Errno err = posix::pthread_cond_wait(cond, &mtx.mutex)) abort("Failed to wait %d", err);
}
<*
@require mtx.is_initialized()
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms)
{
@@ -149,7 +159,7 @@ fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms)
<*
@require mtx.is_initialized()
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, Duration duration)
{
@@ -160,7 +170,7 @@ fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx,
<*
@require mtx.is_initialized()
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time)
{
@@ -171,7 +181,7 @@ fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time)
case errno::OK:
return;
default:
return thread::WAIT_FAILED?;
abort("pthread_cond_timedwait failed, invalid value");
}
}
@@ -194,9 +204,9 @@ fn void? NativeThread.create(&thread, ThreadFn thread_fn, void* arg)
}
}
fn void? NativeThread.detach(thread)
fn void NativeThread.detach(thread)
{
if (posix::pthread_detach(thread.pthread)) return thread::DETACH_FAILED?;
if (Errno errno = posix::pthread_detach(thread.pthread)) abort("Thread detach failed: %d", errno);
}
fn void native_thread_exit(int result)
@@ -214,11 +224,21 @@ fn bool NativeThread.equals(thread, NativeThread other)
return (bool)posix::pthread_equal(thread.pthread, other.pthread);
}
fn int? NativeThread.join(thread)
<*
@return "the return value of the thread"
@return? thread::THREAD_NOT_FOUND : "If the thread was not running"
*>
fn int? NativeThread.join(thread) @maydiscard
{
void *pres;
if (posix::pthread_join(thread.pthread, &pres)) return thread::JOIN_FAILED?;
return (int)(iptr)pres;
switch (posix::pthread_join(thread.pthread, &pres))
{
case errno::OK: return (int)(iptr)pres;
case errno::EINVAL: unreachable("Thread is not joinable.");
case errno::EDEADLK: unreachable("Thread join from current thread.");
case errno::ESRCH: return thread::THREAD_NOT_FOUND~;
default: unreachable("Thread join returned unexpected result");
}
}
fn void NativeOnceFlag.call_once(&flag, OnceFn func)

View File

@@ -1,5 +1,7 @@
module std::thread::os @if(env::WIN32);
import std::os::win32, std::time;
import std::thread;
typedef NativeThread = inline Win32_HANDLE;
@@ -61,7 +63,7 @@ fn void? NativeMutex.init(&mtx, MutexType type)
@require mtx.owner_thread != win32::getCurrentThreadId() : "Mutex was not unlocked before destroying it"
@ensure !mtx.initialized
*>
fn void? NativeMutex.destroy(&mtx)
fn void NativeMutex.destroy(&mtx)
{
*mtx = {};
}
@@ -69,12 +71,12 @@ fn void? NativeMutex.destroy(&mtx)
<*
@require mtx.initialized : "Mutex was not initialized"
*>
fn void? NativeMutex.lock(&mtx)
fn void NativeMutex.lock(&mtx)
{
Win32_DWORD current_thread = win32::getCurrentThreadId();
if (mtx.owner_thread == current_thread)
{
if (!mtx.recursive) return thread::LOCK_FAILED?;
assert(mtx.recursive, "Recursive locking with non-recursive mutex");
mtx.locks++;
return;
}
@@ -106,12 +108,10 @@ fn bool NativeMutex.try_lock(&mtx)
<*
@require mtx.initialized : "Mutex was not initialized"
@require mtx.owner_thread == win32::getCurrentThreadId() : "Mutex was not locked by the current thread"
*>
fn void? NativeMutex.unlock(&mtx)
fn void NativeMutex.unlock(&mtx)
{
$if env::COMPILER_SAFE_MODE:
if (mtx.owner_thread != win32::getCurrentThreadId()) return thread::UNLOCK_FAILED?; // Mutex was not locked by the current thread
$endif
if (--mtx.locks == 0)
{
mtx.owner_thread = 0;
@@ -137,7 +137,7 @@ fn void? NativeTimedMutex.init(&mtx, MutexType type)
@require mtx.owner_thread != win32::getCurrentThreadId() : "Mutex was not unlocked before destroying it"
@ensure !mtx.initialized
*>
fn void? NativeTimedMutex.destroy(&mtx)
fn void NativeTimedMutex.destroy(&mtx)
{
*mtx = {};
}
@@ -146,22 +146,20 @@ fn void? NativeTimedMutex.wait_cond_var(&mtx, uint ms) @local
{
if (!win32::sleepConditionVariableSRW(&mtx.cond_var, &mtx.srw_lock, ms, 0))
{
if (win32::getLastError() != win32::ERROR_TIMEOUT)
{
return thread::WAIT_FAILED?;
}
Win32_DWORD err = win32::getLastError();
assert(err == win32::ERROR_TIMEOUT, "Unexpected error waiting for condvar %d", err);
}
}
<*
@require mtx.initialized : "Mutex was not initialized"
*>
fn void? NativeTimedMutex.lock(&mtx)
fn void NativeTimedMutex.lock(&mtx)
{
Win32_DWORD current_thread = win32::getCurrentThreadId();
if (mtx.owner_thread == current_thread)
{
if (!mtx.recursive) return thread::LOCK_FAILED?;
assert(mtx.recursive, "Tried to lock a non-recursive mutex");
mtx.locks++;
return;
}
@@ -171,7 +169,7 @@ fn void? NativeTimedMutex.lock(&mtx)
while (mtx.locks)
{
mtx.wait_cond_var(win32::INFINITE)!;
mtx.wait_cond_var(win32::INFINITE)!!;
}
mtx.locks = 1;
mtx.owner_thread = current_thread;
@@ -179,13 +177,14 @@ fn void? NativeTimedMutex.lock(&mtx)
<*
@require mtx.initialized : "Mutex was not initialized"
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeTimedMutex.lock_timeout(&mtx, ulong ms)
{
Win32_DWORD current_thread = win32::getCurrentThreadId();
if (mtx.owner_thread == current_thread)
{
if (!mtx.recursive) return thread::LOCK_FAILED?;
assert(mtx.recursive, "The mutex was not recursive, but was locked in the same thread more than once.");
mtx.locks++;
return;
}
@@ -216,7 +215,7 @@ fn void? NativeTimedMutex.lock_timeout(&mtx, ulong ms)
return;
}
}
return thread::WAIT_FAILED?;
return thread::WAIT_TIMEOUT~;
}
<*
@@ -244,14 +243,14 @@ fn bool NativeTimedMutex.try_lock(&mtx)
<*
@require mtx.initialized : "Mutex was not initialized"
*>
fn void? NativeTimedMutex.unlock(&mtx)
fn void NativeTimedMutex.unlock(&mtx)
{
win32::acquireSRWLockExclusive(&mtx.srw_lock);
$if env::COMPILER_SAFE_MODE:
if (mtx.owner_thread != win32::getCurrentThreadId())
{
win32::releaseSRWLockExclusive(&mtx.srw_lock);
return thread::UNLOCK_FAILED?; // Mutex was not locked by the current thread
abort("Mutex was not locked by the current thread");
}
$endif
@@ -270,25 +269,29 @@ fn void? NativeConditionVariable.init(&cond)
cond.cond_var = {};
}
fn void? NativeConditionVariable.destroy(&cond) @maydiscard
fn void NativeConditionVariable.destroy(&cond)
{
// Nothing to do
}
fn void? NativeConditionVariable.signal(&cond)
fn void NativeConditionVariable.signal(&cond)
{
win32::wakeConditionVariable(&cond.cond_var);
}
fn void? NativeConditionVariable.broadcast(&cond)
fn void NativeConditionVariable.broadcast(&cond)
{
win32::wakeAllConditionVariable(&cond.cond_var);
}
fn void? timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout) @private
<*
@return "true if wait succeeded, false if it timed out"
*>
fn bool timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout) @private
{
Win32_DWORD owner_thread = mtx.owner_thread;
if (mtx.locks != 1 || owner_thread != win32::getCurrentThreadId()) return thread::WAIT_FAILED?;
assert(mtx.locks == 1, "Expected lock count = 1, was %d", mtx.locks);
assert(owner_thread == win32::getCurrentThreadId(), "Timed wait on other thread");
mtx.owner_thread = 0;
mtx.locks = 0;
defer
@@ -300,45 +303,47 @@ fn void? timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout
{
if (win32::getLastError() == win32::ERROR_TIMEOUT)
{
return thread::WAIT_TIMEOUT?;
return false;
}
return thread::WAIT_FAILED?;
abort("Unexpected error sleepConditionVariableSRW: %d", win32::getLastError());
}
return true;
}
<*
@require mtx.initialized : "Mutex was not initialized"
*>
fn void? NativeConditionVariable.wait(&cond, NativeMutex* mtx) @inline
fn void NativeConditionVariable.wait(&cond, NativeMutex* mtx) @inline
{
return timedwait(cond, mtx, win32::INFINITE) @inline;
bool res = timedwait(cond, mtx, win32::INFINITE) @inline;
assert(res, "Timed wait exited despite inifite time");
}
<*
@require mtx.initialized : "Mutex was not initialized"
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) @inline
{
if (ms > uint.max) ms = uint.max;
return timedwait(cond, mtx, (uint)ms) @inline;
if (!timedwait(cond, mtx, (uint)ms) @inline) return thread::WAIT_TIMEOUT~;
}
<*
@require mtx.initialized : "Mutex was not initialized"
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, Duration duration) @inline
{
if (duration < time::DURATION_ZERO) return thread::WAIT_TIMEOUT?;
if (duration < time::DURATION_ZERO) return thread::WAIT_TIMEOUT~;
long ms = duration.to_ms();
if (ms > uint.max) ms = uint.max;
return timedwait(cond, mtx, (uint)ms) @inline;
if (!timedwait(cond, mtx, (uint)ms) @inline) return thread::WAIT_TIMEOUT~;
}
<*
@require mtx.initialized : "Mutex was not initialized"
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time) @inline
{
@@ -351,12 +356,13 @@ fn void? NativeThread.create(&thread, ThreadFn func, void* args)
if (!(*thread = (NativeThread)win32::createThread(null, 0, func, args, 0, null))) return thread::INIT_FAILED?;
}
fn void? NativeThread.detach(thread) @inline
fn void NativeThread.detach(thread) @inline
{
if (!win32::closeHandle(thread)) return thread::DETACH_FAILED?;
if (!win32::closeHandle(thread)) abort("Detaching thread failed");
}
fn void native_thread_exit(int result) @inline
{
win32::exitThread((uint)result);
@@ -377,11 +383,14 @@ fn void NativeOnceFlag.call_once(&flag, OnceFn func)
win32::initOnceExecuteOnce(&flag.init_once, callback, func, null);
}
fn int? NativeThread.join(thread)
<*
@return "the exit value of the thread"
*>
fn int NativeThread.join(thread)
{
uint res;
if (win32::waitForSingleObject(thread, win32::INFINITE) == win32::WAIT_FAILED) return thread::JOIN_FAILED?;
if (!win32::getExitCodeThread(thread, &res)) return thread::JOIN_FAILED?;
if (win32::waitForSingleObject(thread, win32::INFINITE) == win32::WAIT_FAILED) unreachable("Failed to join thread, received wait failed.");
if (!win32::getExitCodeThread(thread, &res)) unreachable("Failed to retrieve exit code when joining.");
defer win32::closeHandle(thread);
return res;
}

View File

@@ -6,16 +6,19 @@ struct ThreadPool
Mutex mu;
QueueItem[SIZE] queue;
usz qindex;
usz qworking;
usz num_threads;
bitstruct : char
{
bool initialized;
bool stop;
bool stop_now;
bool joining;
}
Thread[SIZE] pool;
ConditionVariable notify;
ConditionVariable collect;
}
struct QueueItem @private
@@ -29,10 +32,11 @@ struct QueueItem @private
*>
fn void? ThreadPool.init(&self)
{
defer catch @ok(self.destroy());
defer catch self.destroy();
*self = { .num_threads = SIZE, .initialized = true };
self.mu.init()!;
self.notify.init()!;
self.collect.init()!;
foreach (&thread : self.pool)
{
thread.create(&process_work, self)!;
@@ -41,44 +45,62 @@ fn void? ThreadPool.init(&self)
}
}
<*
Join all threads in the pool.
*>
fn void? ThreadPool.join(&self) @maydiscard // Remove optional in 0.8.0
{
if (self.initialized)
{
self.mu.lock();
self.joining = true;
self.notify.broadcast();
self.collect.wait(&self.mu);
self.joining = false;
self.mu.unlock();
}
}
<*
Stop all the threads and cleanup the pool.
Any pending work will be dropped.
*>
fn void? ThreadPool.destroy(&self)
fn void? ThreadPool.destroy(&self) @maydiscard // Remove optional in 0.8.0
{
return self.@shutdown(self.stop_now);
self.@shutdown(self.stop_now);
}
<*
Stop all the threads and cleanup the pool.
Any pending work will be processed.
*>
fn void? ThreadPool.stop_and_destroy(&self)
fn void? ThreadPool.stop_and_destroy(&self) @maydiscard // Remove optional in 0.8.0
{
return self.@shutdown(self.stop);
self.@shutdown(self.stop);
}
macro void? ThreadPool.@shutdown(&self, #stop) @private
macro void ThreadPool.@shutdown(&self, #stop) @private
{
if (self.initialized)
{
self.mu.lock()!;
self.mu.lock();
#stop = true;
self.notify.broadcast()!;
self.mu.unlock()!;
self.notify.broadcast();
self.mu.unlock();
// Wait for all threads to shutdown.
while (true)
{
self.mu.lock()!;
defer self.mu.unlock()!!;
self.mu.lock();
defer self.mu.unlock();
if (self.num_threads == 0)
{
break;
}
self.notify.signal()!;
self.notify.signal();
}
self.mu.destroy()!;
self.mu.destroy();
self.notify.destroy();
self.collect.destroy();
self.initialized = false;
}
}
@@ -87,18 +109,19 @@ macro void? ThreadPool.@shutdown(&self, #stop) @private
Push a new job to the pool.
Returns whether the queue is full, in which case the job is ignored.
*>
fn void? ThreadPool.push(&self, ThreadFn func, void* arg)
fn void? ThreadPool.push(&self, ThreadFn func, void* arg) @maydiscard // Remove optional in 0.8.0
{
while (true)
{
self.mu.lock()!;
defer self.mu.unlock()!!;
self.mu.lock();
defer self.mu.unlock();
if (self.qindex < SIZE)
{
self.queue[self.qindex] = { .func = func, .arg = arg };
self.qindex++;
self.qworking++;
// Notify the threads that work is available.
self.notify.broadcast()!;
self.notify.broadcast();
return;
}
}
@@ -109,30 +132,34 @@ fn int process_work(void* arg) @private
ThreadPool* self = arg;
while (true)
{
self.mu.lock()!!;
self.mu.lock();
// Wait for work.
while (self.qindex == 0)
{
if (self.joining && self.qworking == 0) self.collect.broadcast();
if (self.stop)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
self.mu.unlock();
return 0;
}
self.notify.wait(&self.mu)!!;
self.notify.wait(&self.mu);
if (self.stop_now)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
self.mu.unlock();
return 0;
}
}
// Process the job.
self.qindex--;
QueueItem item = self.queue[self.qindex];
self.mu.unlock()!!;
self.mu.unlock();
item.func(item.arg);
self.mu.lock();
self.qworking--;
self.mu.unlock();
}
}

View File

@@ -21,33 +21,36 @@ alias ThreadFn = fn int(void* arg);
faultdef
INIT_FAILED,
DESTROY_FAILED,
LOCK_FAILED,
LOCK_TIMEOUT,
UNLOCK_FAILED,
SIGNAL_FAILED,
WAIT_FAILED,
WAIT_TIMEOUT,
DETACH_FAILED,
JOIN_FAILED,
THREAD_NOT_FOUND,
INTERRUPTED,
CHANNEL_CLOSED;
faultdef
DETACH_FAILED @deprecated,
UNLOCK_FAILED @deprecated,
DESTROY_FAILED @deprecated,
SIGNAL_FAILED @deprecated,
JOIN_FAILED @deprecated,
LOCK_FAILED @deprecated,
WAIT_FAILED @deprecated;
macro void? Mutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, {});
macro bool Mutex.is_initialized(mutex) => ((NativeMutex*)&mutex).is_initialized();
macro void? RecursiveMutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, {.recursive});
macro void? Mutex.destroy(&mutex) => NativeMutex.destroy((NativeMutex*)mutex);
macro void? Mutex.lock(&mutex) => NativeMutex.lock((NativeMutex*)mutex);
macro bool Mutex.try_lock(&mutex) => NativeMutex.try_lock((NativeMutex*)mutex);
macro void? Mutex.unlock(&mutex) => NativeMutex.unlock((NativeMutex*)mutex);
macro void? RecursiveMutex.init(&mutex) @maydiscard => NativeMutex.init((NativeMutex*)mutex, {.recursive});
macro void? Mutex.destroy(&mutex) @maydiscard => NativeMutex.destroy((NativeMutex*)mutex); // Remove optional in 0.8.0
macro void? Mutex.lock(&mutex) @maydiscard => NativeMutex.lock((NativeMutex*)mutex);
macro bool Mutex.try_lock(&mutex) => NativeMutex.try_lock((NativeMutex*)mutex);
macro void? Mutex.unlock(&mutex) @maydiscard => NativeMutex.unlock((NativeMutex*)mutex); // Remove optional in 0.8.0
macro void? TimedMutex.init(&mutex) => NativeTimedMutex.init((NativeTimedMutex*)mutex, {.timed});
macro void? TimedRecursiveMutex.init(&mutex) => NativeTimedMutex.init((NativeTimedMutex*)mutex, {.timed, .recursive});
macro void? TimedMutex.destroy(&mutex) => NativeTimedMutex.destroy((NativeTimedMutex*)mutex);
macro void? TimedMutex.lock(&mutex) => NativeTimedMutex.lock((NativeTimedMutex*)mutex);
macro void? TimedRecursiveMutex.init(&mutex) @maydiscard => NativeTimedMutex.init((NativeTimedMutex*)mutex, {.timed, .recursive});
macro void? TimedMutex.destroy(&mutex) @maydiscard => NativeTimedMutex.destroy((NativeTimedMutex*)mutex); // Remove optional in 0.8.0
macro void? TimedMutex.lock(&mutex) @maydiscard => NativeTimedMutex.lock((NativeTimedMutex*)mutex);
macro void? TimedMutex.lock_timeout(&mutex, ulong ms) => NativeTimedMutex.lock_timeout((NativeTimedMutex*)mutex, ms);
macro bool TimedMutex.try_lock(&mutex) => NativeTimedMutex.try_lock((NativeTimedMutex*)mutex);
macro void? TimedMutex.unlock(&mutex) => NativeTimedMutex.unlock((NativeTimedMutex*)mutex);
macro void? TimedMutex.unlock(&mutex) @maydiscard => NativeTimedMutex.unlock((NativeTimedMutex*)mutex); // Remove optional in 0.8.0
macro void fence(AtomicOrdering $ordering) @safemacro
{
@@ -56,22 +59,22 @@ macro void fence(AtomicOrdering $ordering) @safemacro
macro void Mutex.@in_lock(&mutex; @body)
{
(void)mutex.lock();
defer (void)mutex.unlock();
mutex.lock();
defer mutex.unlock();
@body();
}
macro void? ConditionVariable.init(&cond) => NativeConditionVariable.init((NativeConditionVariable*)cond);
macro void? ConditionVariable.destroy(&cond) => NativeConditionVariable.destroy((NativeConditionVariable*)cond);
macro void? ConditionVariable.signal(&cond) => NativeConditionVariable.signal((NativeConditionVariable*)cond);
macro void? ConditionVariable.broadcast(&cond) => NativeConditionVariable.broadcast((NativeConditionVariable*)cond);
macro void? ConditionVariable.wait(&cond, Mutex* mutex)
macro void? ConditionVariable.destroy(&cond) @maydiscard => NativeConditionVariable.destroy((NativeConditionVariable*)cond);
macro void? ConditionVariable.signal(&cond) @maydiscard => NativeConditionVariable.signal((NativeConditionVariable*)cond);
macro void? ConditionVariable.broadcast(&cond) @maydiscard => NativeConditionVariable.broadcast((NativeConditionVariable*)cond);
macro void? ConditionVariable.wait(&cond, Mutex* mutex) @maydiscard
{
return NativeConditionVariable.wait((NativeConditionVariable*)cond, (NativeMutex*)mutex);
}
<*
@require $defined(Duration d = #ms_or_duration) ||| $defined(ulong l = #ms_or_duration)
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
macro void? ConditionVariable.wait_timeout(&cond, Mutex* mutex, #ms_or_duration) @safemacro
{
@@ -83,7 +86,7 @@ macro void? ConditionVariable.wait_timeout(&cond, Mutex* mutex, #ms_or_duration)
}
<*
@return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED
@return? thread::WAIT_TIMEOUT
*>
macro void? ConditionVariable.wait_until(&cond, Mutex* mutex, Time time)
{
@@ -100,8 +103,12 @@ macro void? Thread.create(&thread, ThreadFn thread_fn, void* arg)
return NativeThread.create(thread, thread_fn, arg);
}
macro void? Thread.detach(thread) => NativeThread.detach(thread);
macro int? Thread.join(thread) => NativeThread.join(thread);
macro void? Thread.detach(thread) @maydiscard => NativeThread.detach(thread);
<*
@return? THREAD_NOT_FOUND : "If the thread cannot be found."
*>
macro int? Thread.join(thread) @maydiscard => NativeThread.join(thread);
macro bool Thread.equals(thread, Thread other) => NativeThread.equals(thread, other);
macro void OnceFlag.call(&flag, OnceFn func) => NativeOnceFlag.call_once((NativeOnceFlag*)flag, func);

View File

@@ -28,31 +28,29 @@ fn void? UnbufferedChannel.init(&self, Allocator allocator)
channel.read_waiting = 0;
channel.mu.init()!;
defer catch (void)channel.mu.destroy();
defer catch channel.mu.destroy();
channel.send_mu.init()!;
defer catch (void)channel.send_mu.destroy();
defer catch channel.send_mu.destroy();
channel.send_cond.init()!;
defer catch (void)channel.send_cond.destroy();
defer catch channel.send_cond.destroy();
channel.read_mu.init()!;
defer catch (void)channel.read_mu.destroy();
defer catch channel.read_mu.destroy();
channel.read_cond.init()!;
*self = (UnbufferedChannel)channel;
}
fn void? UnbufferedChannel.destroy(&self)
fn void? UnbufferedChannel.destroy(&self) @maydiscard // Remove optional in 0.8.0
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self);
fault err = @catch(channel.mu.destroy());
err = @catch(channel.send_mu.destroy()) ?: err;
err = @catch(channel.send_cond.destroy()) ?: err;
err = @catch(channel.read_mu.destroy()) ?: err;
err = @catch(channel.read_cond.destroy()) ?: err;
channel.mu.destroy();
channel.send_mu.destroy();
channel.send_cond.destroy();
channel.read_mu.destroy();
channel.read_cond.destroy();
allocator::free(channel.allocator, channel);
if (err) return err?;
*self = null;
}
@@ -60,10 +58,10 @@ fn void? UnbufferedChannel.push(self, Type val)
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self;
channel.mu.lock()!;
defer catch (void)channel.mu.unlock();
channel.send_mu.lock()!;
defer catch (void)channel.send_mu.unlock();
channel.mu.lock();
defer catch channel.mu.unlock();
channel.send_mu.lock();
defer catch channel.send_mu.unlock();
if (channel.closed)
{
@@ -78,32 +76,32 @@ fn void? UnbufferedChannel.push(self, Type val)
// if reader is already waiting for us -> awake him
if (channel.read_waiting > 0)
{
channel.read_cond.signal()!;
channel.read_cond.signal();
}
// wait until reader takes value from buffer
channel.send_cond.wait(&channel.mu)!;
channel.send_cond.wait(&channel.mu);
if (channel.closed) return thread::CHANNEL_CLOSED?;
channel.mu.unlock()!;
channel.send_mu.unlock()!;
channel.mu.unlock();
channel.send_mu.unlock();
}
fn Type? UnbufferedChannel.pop(self)
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self;
channel.mu.lock()!;
defer catch (void)channel.mu.unlock();
channel.read_mu.lock()!;
defer catch (void)channel.read_mu.unlock();
channel.mu.lock();
defer catch channel.mu.unlock();
channel.read_mu.lock();
defer catch channel.read_mu.unlock();
// if no one is waiting, then there is nothing in the buffer
while (!channel.closed && channel.send_waiting == 0)
{
channel.read_waiting++;
channel.read_cond.wait(&channel.mu)!;
channel.read_cond.wait(&channel.mu);
channel.read_waiting--;
}
@@ -114,25 +112,24 @@ fn Type? UnbufferedChannel.pop(self)
// awake sender
channel.send_waiting--;
channel.send_cond.signal()!;
channel.send_cond.signal();
channel.mu.unlock()!;
channel.read_mu.unlock()!;
channel.mu.unlock();
channel.read_mu.unlock();
return ret;
}
fn void? UnbufferedChannel.close(self)
fn void? UnbufferedChannel.close(self) @maydiscard // Remove optional in 0.8.0
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self;
fault err = @catch(channel.mu.lock());
channel.mu.lock();
channel.closed = true;
err = @catch(channel.read_cond.broadcast()) ?: err;
err = @catch(channel.send_cond.broadcast()) ?: err;
err = @catch(channel.mu.unlock()) ?: err;
channel.read_cond.broadcast();
channel.send_cond.broadcast();
channel.mu.unlock();
if (err) return err?;
}