From 3f20e5af1d170c0d78b3f587ce79977ab67a6533 Mon Sep 17 00:00:00 2001 From: Sander van den Bosch Date: Sat, 6 Dec 2025 23:54:04 +0100 Subject: [PATCH] add join for ThreadPool without destroying the threads (#2579) * add join for ThreadPool without destroying the threads * Make the main Thread block waiting for the worker threads to finish instead of buzy looping and do proper initialization and freeing of all variables. * Updated test to use `atomic_store` and take into account the maximum queue size of the threadpool. * - Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads. - Return of Thread/Mutex/CondVar `destroy()` is now "@maydiscard" and should be ignored. It will return void in 0.8.0. - Return of Mutex `unlock()` and `lock()` is now "@maydiscard" and should be ignored. They will return void in 0.8.0. - Return of ConditionVariable `signal()` `broadcast()` and `wait()` are now "@maydiscard". They will return void in 0.8.0. - Return of Thread `detatch()` is now "@maydiscard". It will return void in 0.8.0. - Buffered/UnbufferedChannel, and both ThreadPools have `@maydiscard` on a set of functions. They will retunr void in 0.8.0. - Pthread bindings correctly return Errno instead of CInt. - Return of Thread `join()` is now "@maydiscard". --------- Co-authored-by: Christoffer Lerno --- lib/std/collections/linked_blockingqueue.c3 | 22 ++-- lib/std/core/logging.c3 | 5 +- lib/std/experimental/FrameScheduler.c3 | 2 +- lib/std/os/posix/threads.c3 | 116 ++++++++++---------- lib/std/threads/buffered_channel.c3 | 46 ++++---- lib/std/threads/fixed_pool.c3 | 76 +++++++++---- lib/std/threads/os/thread_none.c3 | 6 +- lib/std/threads/os/thread_posix.c3 | 70 +++++++----- lib/std/threads/os/thread_win32.c3 | 87 ++++++++------- lib/std/threads/pool.c3 | 71 ++++++++---- lib/std/threads/thread.c3 | 59 +++++----- lib/std/threads/unbuffered_channel.c3 | 65 ++++++----- releasenotes.md | 14 ++- test/unit/stdlib/threads/channel.c3 | 64 +++++------ test/unit/stdlib/threads/mutex.c3 | 64 +++-------- test/unit/stdlib/threads/pool.c3 | 32 +++++- test/unit/stdlib/threads/simple_thread.c3 | 41 ++++--- 17 files changed, 470 insertions(+), 370 deletions(-) diff --git a/lib/std/collections/linked_blockingqueue.c3 b/lib/std/collections/linked_blockingqueue.c3 index 1d4f49b93..082238fa8 100644 --- a/lib/std/collections/linked_blockingqueue.c3 +++ b/lib/std/collections/linked_blockingqueue.c3 @@ -70,9 +70,9 @@ fn void LinkedBlockingQueue.free(&self) } }; - (void)self.lock.destroy(); - (void)self.not_empty.destroy(); - (void)self.not_full.destroy(); + self.lock.destroy(); + self.not_empty.destroy(); + self.not_full.destroy(); } fn void LinkedBlockingQueue.link_entry(&self, QueueEntry* entry) @private @@ -126,7 +126,7 @@ fn void LinkedBlockingQueue.push(&self, Value value) { while (self.capacity > 0 && self.count >= self.capacity) { - self.not_full.wait(&self.lock)!!; + self.not_full.wait(&self.lock); } QueueEntry* entry = allocator::new(self.allocator, QueueEntry, { @@ -137,7 +137,7 @@ fn void LinkedBlockingQueue.push(&self, Value value) self.link_entry(entry); // Signal that queue is no longer empty - self.not_empty.signal()!!; + self.not_empty.signal(); }; } @@ -153,7 +153,7 @@ fn Value LinkedBlockingQueue.poll(&self) { while (self.count == 0) { - self.not_empty.wait(&self.lock)!!; + self.not_empty.wait(&self.lock); } QueueEntry* entry = self.unlink_head(); @@ -161,7 +161,7 @@ fn Value LinkedBlockingQueue.poll(&self) allocator::free(self.allocator, entry); if (self.capacity > 0) { - self.not_full.signal()!!; + self.not_full.signal(); } return value; }; @@ -186,7 +186,7 @@ fn Value? LinkedBlockingQueue.pop(&self) if (self.capacity > 0) { - self.not_full.signal()!!; + self.not_full.signal(); } return value; }; @@ -224,7 +224,7 @@ fn Value? LinkedBlockingQueue.poll_timeout(&self, Duration timeout) // Must signal not_full after removing an item if (self.capacity > 0) { - self.not_full.signal()!!; + self.not_full.signal(); } return value; }; @@ -274,7 +274,7 @@ fn void? LinkedBlockingQueue.try_push(&self, Value value) .prev = null }); self.link_entry(entry); - self.not_empty.signal()!!; + self.not_empty.signal(); }; } @@ -308,7 +308,7 @@ fn void? LinkedBlockingQueue.push_timeout(&self, Value value, Duration timeout) .prev = null }); self.link_entry(entry); - self.not_empty.signal()!!; + self.not_empty.signal(); }; } diff --git a/lib/std/core/logging.c3 b/lib/std/core/logging.c3 index abaa1cd97..cd95e06ae 100644 --- a/lib/std/core/logging.c3 +++ b/lib/std/core/logging.c3 @@ -148,10 +148,11 @@ fn void call_log_internal(LogPriority prio, LogCategory category, String file, S LogPriority priority = mem::@atomic_load(config_priorities[category], UNORDERED); if (priority > prio) return; init(); - bool locked = logger_mutex.is_initialized() && @ok(logger_mutex.lock()); + bool locked = logger_mutex.is_initialized(); + if (locked) logger_mutex.lock(); Logger logger = current_logger; LogFn logfn = current_logfn; - defer if (locked) (void)logger_mutex.unlock(); + defer if (locked) logger_mutex.unlock(); logfn(logger.ptr, prio, category, current_tag, file, func, line, fmt, args); } diff --git a/lib/std/experimental/FrameScheduler.c3 b/lib/std/experimental/FrameScheduler.c3 index 06d497d00..7d0ae856f 100644 --- a/lib/std/experimental/FrameScheduler.c3 +++ b/lib/std/experimental/FrameScheduler.c3 @@ -43,7 +43,7 @@ macro void FrameScheduler.@destroy(&self; @destruct(Event e)) self.events.free(); self.pending_events.free(); self.delayed_events.free(); - (void)self.mtx.destroy(); + self.mtx.destroy(); } fn void FrameScheduler.queue_delayed_event(&self, Event event, Duration delay) diff --git a/lib/std/os/posix/threads.c3 b/lib/std/os/posix/threads.c3 index 0ef156293..c12b12a40 100644 --- a/lib/std/os/posix/threads.c3 +++ b/lib/std/os/posix/threads.c3 @@ -9,80 +9,80 @@ const PTHREAD_MUTEX_RECURSIVE = env::LINUX ? 1 : 2; alias PosixThreadFn = fn void*(void*); typedef Pthread_t = void*; -extern fn CInt pthread_create(Pthread_t*, Pthread_attr_t*, PosixThreadFn, void*); -extern fn CInt pthread_cancel(Pthread_t*); -extern fn CInt pthread_detach(Pthread_t); -extern fn CInt pthread_equal(Pthread_t this, Pthread_t other); +extern fn Errno pthread_create(Pthread_t*, Pthread_attr_t*, PosixThreadFn, void*); +extern fn Errno pthread_cancel(Pthread_t*); +extern fn Errno pthread_detach(Pthread_t); +extern fn Errno pthread_equal(Pthread_t this, Pthread_t other); extern fn void pthread_exit(void* value_ptr) @noreturn; -extern fn CInt pthread_join(Pthread_t, void** value_ptr); -extern fn CInt pthread_kill(Pthread_t, CInt sig); +extern fn Errno pthread_join(Pthread_t, void** value_ptr); +extern fn Errno pthread_kill(Pthread_t, CInt sig); extern fn void pthread_once(Pthread_once_t*, OnceFn); extern fn Pthread_t pthread_self(); -extern fn CInt pthread_setcancelstate(CInt state, CInt* oldstate); -extern fn CInt pthread_setcanceltype(CInt type, CInt* oldtype); -extern fn CInt pthread_testcancel(); +extern fn Errno pthread_setcancelstate(CInt state, CInt* oldstate); +extern fn Errno pthread_setcanceltype(CInt type, CInt* oldtype); +extern fn Errno pthread_testcancel(); -extern fn CInt pthread_attr_destroy(Pthread_attr_t*); -extern fn CInt pthread_attr_getinheritsched(Pthread_attr_t*, CInt*); -extern fn CInt pthread_attr_getschedparam(Pthread_attr_t*, Pthread_sched_param*); -extern fn CInt pthread_attr_getschedpolicy(Pthread_attr_t*, CInt*); -extern fn CInt pthread_attr_getscope(Pthread_attr_t*, CInt*); -extern fn CInt pthread_attr_getstacksize(Pthread_attr_t*, usz*); -extern fn CInt pthread_attr_getstackaddr(Pthread_attr_t*, void**); -extern fn CInt pthread_attr_getdetachstate(Pthread_attr_t*, CInt*); -extern fn CInt pthread_attr_init(Pthread_attr_t*); -extern fn CInt pthread_attr_setinheritsched(Pthread_attr_t*, CInt); -extern fn CInt pthread_attr_setschedparam(Pthread_attr_t*, Pthread_sched_param*); -extern fn CInt pthread_attr_setschedpolicy(Pthread_attr_t*, CInt); -extern fn CInt pthread_attr_setscope(Pthread_attr_t*, CInt); -extern fn CInt pthread_attr_setstacksize(Pthread_attr_t*, usz); -extern fn CInt pthread_attr_setstackaddr(Pthread_attr_t*, void*); -extern fn CInt pthread_attr_setdetachstate(Pthread_attr_t*, CInt); +extern fn Errno pthread_attr_destroy(Pthread_attr_t*); +extern fn Errno pthread_attr_getinheritsched(Pthread_attr_t*, CInt*); +extern fn Errno pthread_attr_getschedparam(Pthread_attr_t*, Pthread_sched_param*); +extern fn Errno pthread_attr_getschedpolicy(Pthread_attr_t*, CInt*); +extern fn Errno pthread_attr_getscope(Pthread_attr_t*, CInt*); +extern fn Errno pthread_attr_getstacksize(Pthread_attr_t*, usz*); +extern fn Errno pthread_attr_getstackaddr(Pthread_attr_t*, void**); +extern fn Errno pthread_attr_getdetachstate(Pthread_attr_t*, CInt*); +extern fn Errno pthread_attr_init(Pthread_attr_t*); +extern fn Errno pthread_attr_setinheritsched(Pthread_attr_t*, CInt); +extern fn Errno pthread_attr_setschedparam(Pthread_attr_t*, Pthread_sched_param*); +extern fn Errno pthread_attr_setschedpolicy(Pthread_attr_t*, CInt); +extern fn Errno pthread_attr_setscope(Pthread_attr_t*, CInt); +extern fn Errno pthread_attr_setstacksize(Pthread_attr_t*, usz); +extern fn Errno pthread_attr_setstackaddr(Pthread_attr_t*, void*); +extern fn Errno pthread_attr_setdetachstate(Pthread_attr_t*, CInt); -extern fn CInt pthread_mutexattr_destroy(Pthread_mutexattr_t*); -extern fn CInt pthread_mutexattr_getprioceiling(Pthread_mutexattr_t*, CInt*); -extern fn CInt pthread_mutexattr_getprotocol(Pthread_mutexattr_t*, CInt*); -extern fn CInt pthread_mutexattr_gettype(Pthread_mutexattr_t*, CInt*); -extern fn CInt pthread_mutexattr_init(Pthread_mutexattr_t*); -extern fn CInt pthread_mutexattr_setprioceiling(Pthread_mutexattr_t*, CInt); -extern fn CInt pthread_mutexattr_setprotocol(Pthread_mutexattr_t*, CInt); -extern fn CInt pthread_mutexattr_settype(Pthread_mutexattr_t*, CInt); +extern fn Errno pthread_mutexattr_destroy(Pthread_mutexattr_t*); +extern fn Errno pthread_mutexattr_getprioceiling(Pthread_mutexattr_t*, CInt*); +extern fn Errno pthread_mutexattr_getprotocol(Pthread_mutexattr_t*, CInt*); +extern fn Errno pthread_mutexattr_gettype(Pthread_mutexattr_t*, CInt*); +extern fn Errno pthread_mutexattr_init(Pthread_mutexattr_t*); +extern fn Errno pthread_mutexattr_setprioceiling(Pthread_mutexattr_t*, CInt); +extern fn Errno pthread_mutexattr_setprotocol(Pthread_mutexattr_t*, CInt); +extern fn Errno pthread_mutexattr_settype(Pthread_mutexattr_t*, CInt); -extern fn CInt pthread_mutex_destroy(Pthread_mutex_t*); -extern fn CInt pthread_mutex_init(Pthread_mutex_t*, Pthread_mutexattr_t*); +extern fn Errno pthread_mutex_destroy(Pthread_mutex_t*); +extern fn Errno pthread_mutex_init(Pthread_mutex_t*, Pthread_mutexattr_t*); extern fn Errno pthread_mutex_lock(Pthread_mutex_t*); extern fn Errno pthread_mutex_trylock(Pthread_mutex_t*); extern fn Errno pthread_mutex_unlock(Pthread_mutex_t*); -extern fn CInt pthread_condattr_destroy(Pthread_condattr_t*); -extern fn CInt pthread_condattr_init(Pthread_condattr_t*); +extern fn Errno pthread_condattr_destroy(Pthread_condattr_t*); +extern fn Errno pthread_condattr_init(Pthread_condattr_t*); -extern fn CInt pthread_cond_broadcast(Pthread_cond_t*); -extern fn CInt pthread_cond_destroy(Pthread_cond_t*); -extern fn CInt pthread_cond_init(Pthread_cond_t*, Pthread_condattr_t*); -extern fn CInt pthread_cond_signal(Pthread_cond_t*); -extern fn CInt pthread_cond_timedwait(Pthread_cond_t*, Pthread_mutex_t*, TimeSpec*); -extern fn CInt pthread_cond_wait(Pthread_cond_t*, Pthread_mutex_t*); +extern fn Errno pthread_cond_broadcast(Pthread_cond_t*); +extern fn Errno pthread_cond_destroy(Pthread_cond_t*); +extern fn Errno pthread_cond_init(Pthread_cond_t*, Pthread_condattr_t*); +extern fn Errno pthread_cond_signal(Pthread_cond_t*); +extern fn Errno pthread_cond_timedwait(Pthread_cond_t*, Pthread_mutex_t*, TimeSpec*); +extern fn Errno pthread_cond_wait(Pthread_cond_t*, Pthread_mutex_t*); -extern fn CInt pthread_rwlock_destroy(Pthread_rwlock_t*); -extern fn CInt pthread_rwlock_init(Pthread_rwlock_t*, Pthread_rwlockattr_t*); -extern fn CInt pthread_rwlock_rdlock(Pthread_rwlock_t*); -extern fn CInt pthread_rwlock_tryrdlock(Pthread_rwlock_t*); -extern fn CInt pthread_rwlock_trywrlock(Pthread_rwlock_t*); -extern fn CInt pthread_rwlock_unlock(Pthread_rwlock_t*); -extern fn CInt pthread_rwlock_wrlock(Pthread_rwlock_t*); +extern fn Errno pthread_rwlock_destroy(Pthread_rwlock_t*); +extern fn Errno pthread_rwlock_init(Pthread_rwlock_t*, Pthread_rwlockattr_t*); +extern fn Errno pthread_rwlock_rdlock(Pthread_rwlock_t*); +extern fn Errno pthread_rwlock_tryrdlock(Pthread_rwlock_t*); +extern fn Errno pthread_rwlock_trywrlock(Pthread_rwlock_t*); +extern fn Errno pthread_rwlock_unlock(Pthread_rwlock_t*); +extern fn Errno pthread_rwlock_wrlock(Pthread_rwlock_t*); -extern fn CInt pthread_rwlockattr_destroy(Pthread_rwlockattr_t*); -extern fn CInt pthread_rwlockattr_getpshared(Pthread_rwlockattr_t*, CInt*); -extern fn CInt pthread_rwlockattr_init(Pthread_rwlockattr_t*); -extern fn CInt pthread_rwlockattr_setpshared(Pthread_rwlockattr_t*, CInt); +extern fn Errno pthread_rwlockattr_destroy(Pthread_rwlockattr_t*); +extern fn Errno pthread_rwlockattr_getpshared(Pthread_rwlockattr_t*, CInt*); +extern fn Errno pthread_rwlockattr_init(Pthread_rwlockattr_t*); +extern fn Errno pthread_rwlockattr_setpshared(Pthread_rwlockattr_t*, CInt); -extern fn CInt pthread_key_create(Pthread_key_t*, PosixThreadFn routine); -extern fn CInt pthread_key_delete(Pthread_key_t); +extern fn Errno pthread_key_create(Pthread_key_t*, PosixThreadFn routine); +extern fn Errno pthread_key_delete(Pthread_key_t); extern fn void* pthread_getspecific(Pthread_key_t); -extern fn CInt pthread_setspecific(Pthread_key_t, void* value_ptr); +extern fn Errno pthread_setspecific(Pthread_key_t, void* value_ptr); -extern fn CInt pthread_atfork(OnceFn prepare, OnceFn parent, OnceFn child); +extern fn Errno pthread_atfork(OnceFn prepare, OnceFn parent, OnceFn child); extern fn void pthread_cleanup_pop(CInt execute); extern fn void pthread_cleanup_push(PosixThreadFn routine, void* routine_arg); diff --git a/lib/std/threads/buffered_channel.c3 b/lib/std/threads/buffered_channel.c3 index 65602ba51..214edb47a 100644 --- a/lib/std/threads/buffered_channel.c3 +++ b/lib/std/threads/buffered_channel.c3 @@ -35,41 +35,40 @@ fn void? BufferedChannel.init(&self, Allocator allocator, usz size = 1) channel.read_waiting = 0; channel.mu.init()!; - defer catch (void)channel.mu.destroy(); + defer catch channel.mu.destroy(); channel.send_cond.init()!; - defer catch (void)channel.send_cond.destroy(); + defer catch channel.send_cond.destroy(); channel.read_cond.init()!; - defer catch (void)channel.read_cond.destroy(); + defer catch channel.read_cond.destroy(); *self = (BufferedChannel)channel; } -fn void? BufferedChannel.destroy(&self) +fn void? BufferedChannel.destroy(&self) @maydiscard // Remove optional in 0.8.0 { BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self); - fault err = @catch(channel.mu.destroy()); - err = @catch(channel.send_cond.destroy()) ?: err; - err = @catch(channel.read_cond.destroy()) ?: err; + channel.mu.destroy(); + channel.send_cond.destroy(); + channel.read_cond.destroy(); allocator::free(channel.allocator, channel); *self = null; - if (err) return err?; } fn void? BufferedChannel.push(self, Type val) { BufferedChannelImpl* channel = (BufferedChannelImpl*)self; - channel.mu.lock()!; - defer catch (void)channel.mu.unlock(); + channel.mu.lock(); + defer catch channel.mu.unlock(); // if channel is full -> wait while (channel.elems == channel.size && !channel.closed) { channel.send_waiting++; - channel.send_cond.wait(&channel.mu)!; + channel.send_cond.wait(&channel.mu); channel.send_waiting--; } @@ -92,24 +91,24 @@ fn void? BufferedChannel.push(self, Type val) // if someone is waiting -> awake him if (channel.read_waiting > 0) { - channel.read_cond.signal()!; + channel.read_cond.signal(); } - channel.mu.unlock()!; + channel.mu.unlock(); } fn Type? BufferedChannel.pop(self) { BufferedChannelImpl* channel = (BufferedChannelImpl*)self; - channel.mu.lock()!; - defer catch (void)channel.mu.unlock(); + channel.mu.lock(); + defer catch channel.mu.unlock(); // if chan is empty -> wait for sender while (channel.elems == 0 && !channel.closed) { channel.read_waiting++; - channel.read_cond.wait(&channel.mu)!; + channel.read_cond.wait(&channel.mu); channel.read_waiting--; } @@ -135,26 +134,25 @@ fn Type? BufferedChannel.pop(self) // if someone is waiting -> awake him if (channel.send_waiting > 0) { - channel.send_cond.signal()!; + channel.send_cond.signal(); } - channel.mu.unlock()!; + channel.mu.unlock(); return ret; } -fn void? BufferedChannel.close(self) +fn void? BufferedChannel.close(self) @maydiscard // Remove optional in 0.8.0 { BufferedChannelImpl* channel = (BufferedChannelImpl*)self; - fault err = @catch(channel.mu.lock()); + channel.mu.lock(); channel.closed = true; - err = @catch(channel.read_cond.broadcast()) ?: err; - err = @catch(channel.send_cond.broadcast()) ?: err; - err = @catch(channel.mu.unlock()) ?: err; + channel.read_cond.broadcast(); + channel.send_cond.broadcast(); + channel.mu.unlock(); - if (err) return err?; } diff --git a/lib/std/threads/fixed_pool.c3 b/lib/std/threads/fixed_pool.c3 index 17b4f7cc1..08fd34b55 100644 --- a/lib/std/threads/fixed_pool.c3 +++ b/lib/std/threads/fixed_pool.c3 @@ -13,14 +13,17 @@ struct FixedThreadPool Mutex mu; QueueItem[] queue; usz qindex; + usz qworking; usz num_threads; bitstruct : char { bool initialized; bool stop; bool stop_now; + bool joining; } Thread[] pool; ConditionVariable notify; + ConditionVariable collect; } struct QueueItem @private @@ -46,7 +49,11 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0) .pool = mem::new_array(Thread, threads) }; self.mu.init()!; + defer catch self.mu.destroy(); self.notify.init()!; + defer catch self.notify.destroy(); + self.collect.init()!; + defer catch self.collect.destroy(); foreach (&thread : self.pool) { thread.create(&process_work, self)!; @@ -55,6 +62,22 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0) } } +<* + Join all threads in the pool. +*> +fn void? FixedThreadPool.join(&self) @maydiscard // Remove optional in 0.8.0 +{ + if (self.initialized) + { + self.mu.lock(); + defer self.mu.unlock(); + self.joining = true; + self.notify.broadcast(); + self.collect.wait(&self.mu); + self.joining = false; + } +} + <* Stop all the threads and cleanup the pool. Any pending work will be dropped. @@ -68,37 +91,37 @@ fn void? FixedThreadPool.destroy(&self) Stop all the threads and cleanup the pool. Any pending work will be processed. *> -fn void? FixedThreadPool.stop_and_destroy(&self) +fn void? FixedThreadPool.stop_and_destroy(&self) @maydiscard // Remove optional in 0.8.0 { - return self.@shutdown(self.stop); + self.@shutdown(self.stop); } -macro void? FixedThreadPool.@shutdown(&self, #stop) @private +macro void FixedThreadPool.@shutdown(&self, #stop) @private { if (self.initialized) { - self.mu.lock()!; + self.mu.lock(); #stop = true; - self.notify.broadcast()!; - self.mu.unlock()!; + self.notify.broadcast(); + self.mu.unlock(); // Wait for all threads to shutdown. while (true) { - self.mu.lock()!; - defer self.mu.unlock()!!; - if (self.num_threads == 0) - { - break; - } - self.notify.signal()!; + self.mu.lock(); + defer self.mu.unlock(); + if (self.num_threads == 0) break; + self.notify.signal(); } - self.mu.destroy()!; + self.collect.destroy(); + self.notify.destroy(); + self.mu.destroy(); self.initialized = false; while (self.qindex) { free_qitem(self.queue[--self.qindex]); } free(self.queue); + free(self.pool); self.queue = {}; } } @@ -109,8 +132,8 @@ macro void? FixedThreadPool.@shutdown(&self, #stop) @private *> fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...) { - self.mu.lock()!; - defer self.mu.unlock()!!; + self.mu.lock(); + defer self.mu.unlock(); if (self.qindex == self.queue.len) return thread::THREAD_QUEUE_FULL?; any[] data; if (args.len) @@ -120,12 +143,13 @@ fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...) } self.queue[self.qindex] = { .func = func, .args = data }; self.qindex++; + self.qworking++; defer catch { free_qitem(self.queue[--self.qindex]); } // Notify the threads that work is available. - self.notify.broadcast()!; + self.notify.broadcast(); } fn int process_work(void* self_arg) @private @@ -133,39 +157,43 @@ fn int process_work(void* self_arg) @private FixedThreadPool* self = self_arg; while (true) { - self.mu.lock()!!; + self.mu.lock(); if (self.stop_now) { // Shutdown requested. self.num_threads--; - self.mu.unlock()!!; + self.mu.unlock(); return 0; } // Wait for work. while (self.qindex == 0) { + if (self.joining && self.qworking == 0) self.collect.signal(); if (self.stop) { // Shutdown requested. self.num_threads--; - self.mu.unlock()!!; + self.mu.unlock(); return 0; } - self.notify.wait(&self.mu)!!; + self.notify.wait(&self.mu); if (self.stop_now) { // Shutdown requested. self.num_threads--; - self.mu.unlock()!!; + self.mu.unlock(); return 0; } } // Process the job. self.qindex--; QueueItem item = self.queue[self.qindex]; - self.mu.unlock()!!; - defer free_qitem(item); + self.mu.unlock(); item.func(item.args); + defer free_qitem(item); + self.mu.lock(); + self.qworking--; + self.mu.unlock(); } } diff --git a/lib/std/threads/os/thread_none.c3 b/lib/std/threads/os/thread_none.c3 index 9148d58e5..ec2f3842f 100644 --- a/lib/std/threads/os/thread_none.c3 +++ b/lib/std/threads/os/thread_none.c3 @@ -22,6 +22,6 @@ fn bool NativeMutex.is_initialized(&self) return false; } -macro void? NativeMutex.lock(&mutex) => NOT_IMPLEMENTED?; -macro bool NativeMutex.try_lock(&mutex) => NOT_IMPLEMENTED?; -macro void? NativeMutex.unlock(&mutex) => NOT_IMPLEMENTED?; \ No newline at end of file +macro void NativeMutex.lock(&mutex) => NOT_IMPLEMENTED?!!; +macro bool NativeMutex.try_lock(&mutex) => NOT_IMPLEMENTED?!!; +macro void NativeMutex.unlock(&mutex) => NOT_IMPLEMENTED?!!; \ No newline at end of file diff --git a/lib/std/threads/os/thread_posix.c3 b/lib/std/threads/os/thread_posix.c3 index 6d2fcbb28..4a6a32e26 100644 --- a/lib/std/threads/os/thread_posix.c3 +++ b/lib/std/threads/os/thread_posix.c3 @@ -1,5 +1,9 @@ module std::thread::os @if(env::POSIX); import std::os::posix, std::time, libc; +import libc::errno; +import std::thread; + + struct NativeMutex { @@ -53,18 +57,24 @@ fn bool NativeMutex.is_initialized(&self) @require self.is_initialized() : "Mutex was not initialized" @ensure !self.is_initialized() *> -fn void? NativeMutex.destroy(&self) +fn void NativeMutex.destroy(&self) { - if (posix::pthread_mutex_destroy(&self.mutex)) return thread::DESTROY_FAILED?; + if (Errno err = posix::pthread_mutex_destroy(&self.mutex)) abort("Error destroying mutex: %d", err); *self = {}; } <* @require self.is_initialized() : "Mutex was not initialized" *> -fn void? NativeMutex.lock(&self) +fn void NativeMutex.lock(&self) { - if (posix::pthread_mutex_lock(&self.mutex)) return thread::LOCK_FAILED?; + switch (posix::pthread_mutex_lock(&self.mutex)) + { + case errno::EINVAL: unreachable("Mutex invalid"); + case errno::EDEADLK: abort("Mutex deadlock"); + case errno::OK: return; + default: unreachable("Unexpected error in lock"); + } } <* @@ -78,7 +88,7 @@ fn void? NativeMutex.lock_timeout(&self, ulong ms) { if (!ms) break; ulong sleep = min(5, ms); - if (!libc::nanosleep(&&time::ms(ms).to_timespec(), null)) return thread::LOCK_FAILED?; + if (!libc::nanosleep(&&time::ms(ms).to_timespec(), null)) return thread::LOCK_TIMEOUT?; ms -= sleep; } switch (result) @@ -89,7 +99,7 @@ fn void? NativeMutex.lock_timeout(&self, ulong ms) case errno::ETIMEDOUT: return thread::LOCK_TIMEOUT?; default: - return thread::LOCK_FAILED?; + return unreachable("Invalid lock %d", result); } } @@ -104,9 +114,9 @@ fn bool NativeMutex.try_lock(&self) <* @require self.is_initialized() : "Mutex was not initialized" *> -fn void? NativeMutex.unlock(&self) +fn void NativeMutex.unlock(&self) { - if (posix::pthread_mutex_unlock(&self.mutex)) return thread::UNLOCK_FAILED?; + if (Errno err = posix::pthread_mutex_unlock(&self.mutex)) abort("Failed to unlock mutex: %d", err); } fn void? NativeConditionVariable.init(&cond) @@ -114,32 +124,32 @@ fn void? NativeConditionVariable.init(&cond) if (posix::pthread_cond_init(cond, null)) return thread::INIT_FAILED?; } -fn void? NativeConditionVariable.destroy(&cond) +fn void NativeConditionVariable.destroy(&cond) { - if (posix::pthread_cond_destroy(cond)) return thread::DESTROY_FAILED?; + if (Errno err = posix::pthread_cond_destroy(cond)) abort("Failed to destroy pthread_cond %d", err); } -fn void? NativeConditionVariable.signal(&cond) +fn void NativeConditionVariable.signal(&cond) { - if (posix::pthread_cond_signal(cond)) return thread::SIGNAL_FAILED?; + if (Errno err = posix::pthread_cond_signal(cond)) abort("Failed to signal %d", err); } -fn void? NativeConditionVariable.broadcast(&cond) +fn void NativeConditionVariable.broadcast(&cond) { - if (posix::pthread_cond_broadcast(cond)) return thread::SIGNAL_FAILED?; + if (Errno err = posix::pthread_cond_broadcast(cond)) abort("Failed to broadcast %d", err); } <* @require mtx.is_initialized() *> -fn void? NativeConditionVariable.wait(&cond, NativeMutex* mtx) +fn void NativeConditionVariable.wait(&cond, NativeMutex* mtx) { - if (posix::pthread_cond_wait(cond, &mtx.mutex)) return thread::WAIT_FAILED?; + if (Errno err = posix::pthread_cond_wait(cond, &mtx.mutex)) abort("Failed to wait %d", err); } <* @require mtx.is_initialized() - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) { @@ -149,7 +159,7 @@ fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) <* @require mtx.is_initialized() - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, Duration duration) { @@ -160,7 +170,7 @@ fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, <* @require mtx.is_initialized() - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time) { @@ -171,7 +181,7 @@ fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time) case errno::OK: return; default: - return thread::WAIT_FAILED?; + abort("pthread_cond_timedwait failed, invalid value"); } } @@ -194,9 +204,9 @@ fn void? NativeThread.create(&thread, ThreadFn thread_fn, void* arg) } } -fn void? NativeThread.detach(thread) +fn void NativeThread.detach(thread) { - if (posix::pthread_detach(thread.pthread)) return thread::DETACH_FAILED?; + if (Errno errno = posix::pthread_detach(thread.pthread)) abort("Thread detach failed: %d", errno); } fn void native_thread_exit(int result) @@ -214,11 +224,21 @@ fn bool NativeThread.equals(thread, NativeThread other) return (bool)posix::pthread_equal(thread.pthread, other.pthread); } -fn int? NativeThread.join(thread) +<* + @return "the return value of the thread" + @return? thread::THREAD_NOT_FOUND : "If the thread was not running" +*> +fn int? NativeThread.join(thread) @maydiscard { void *pres; - if (posix::pthread_join(thread.pthread, &pres)) return thread::JOIN_FAILED?; - return (int)(iptr)pres; + switch (posix::pthread_join(thread.pthread, &pres)) + { + case errno::OK: return (int)(iptr)pres; + case errno::EINVAL: unreachable("Thread is not joinable."); + case errno::EDEADLK: unreachable("Thread join from current thread."); + case errno::ESRCH: return thread::THREAD_NOT_FOUND~; + default: unreachable("Thread join returned unexpected result"); + } } fn void NativeOnceFlag.call_once(&flag, OnceFn func) diff --git a/lib/std/threads/os/thread_win32.c3 b/lib/std/threads/os/thread_win32.c3 index 45107ee2d..ea0f3b491 100644 --- a/lib/std/threads/os/thread_win32.c3 +++ b/lib/std/threads/os/thread_win32.c3 @@ -1,5 +1,7 @@ module std::thread::os @if(env::WIN32); import std::os::win32, std::time; +import std::thread; + typedef NativeThread = inline Win32_HANDLE; @@ -61,7 +63,7 @@ fn void? NativeMutex.init(&mtx, MutexType type) @require mtx.owner_thread != win32::getCurrentThreadId() : "Mutex was not unlocked before destroying it" @ensure !mtx.initialized *> -fn void? NativeMutex.destroy(&mtx) +fn void NativeMutex.destroy(&mtx) { *mtx = {}; } @@ -69,12 +71,12 @@ fn void? NativeMutex.destroy(&mtx) <* @require mtx.initialized : "Mutex was not initialized" *> -fn void? NativeMutex.lock(&mtx) +fn void NativeMutex.lock(&mtx) { Win32_DWORD current_thread = win32::getCurrentThreadId(); if (mtx.owner_thread == current_thread) { - if (!mtx.recursive) return thread::LOCK_FAILED?; + assert(mtx.recursive, "Recursive locking with non-recursive mutex"); mtx.locks++; return; } @@ -106,12 +108,10 @@ fn bool NativeMutex.try_lock(&mtx) <* @require mtx.initialized : "Mutex was not initialized" + @require mtx.owner_thread == win32::getCurrentThreadId() : "Mutex was not locked by the current thread" *> -fn void? NativeMutex.unlock(&mtx) +fn void NativeMutex.unlock(&mtx) { - $if env::COMPILER_SAFE_MODE: - if (mtx.owner_thread != win32::getCurrentThreadId()) return thread::UNLOCK_FAILED?; // Mutex was not locked by the current thread - $endif if (--mtx.locks == 0) { mtx.owner_thread = 0; @@ -137,7 +137,7 @@ fn void? NativeTimedMutex.init(&mtx, MutexType type) @require mtx.owner_thread != win32::getCurrentThreadId() : "Mutex was not unlocked before destroying it" @ensure !mtx.initialized *> -fn void? NativeTimedMutex.destroy(&mtx) +fn void NativeTimedMutex.destroy(&mtx) { *mtx = {}; } @@ -146,22 +146,20 @@ fn void? NativeTimedMutex.wait_cond_var(&mtx, uint ms) @local { if (!win32::sleepConditionVariableSRW(&mtx.cond_var, &mtx.srw_lock, ms, 0)) { - if (win32::getLastError() != win32::ERROR_TIMEOUT) - { - return thread::WAIT_FAILED?; - } + Win32_DWORD err = win32::getLastError(); + assert(err == win32::ERROR_TIMEOUT, "Unexpected error waiting for condvar %d", err); } } <* @require mtx.initialized : "Mutex was not initialized" *> -fn void? NativeTimedMutex.lock(&mtx) +fn void NativeTimedMutex.lock(&mtx) { Win32_DWORD current_thread = win32::getCurrentThreadId(); if (mtx.owner_thread == current_thread) { - if (!mtx.recursive) return thread::LOCK_FAILED?; + assert(mtx.recursive, "Tried to lock a non-recursive mutex"); mtx.locks++; return; } @@ -171,7 +169,7 @@ fn void? NativeTimedMutex.lock(&mtx) while (mtx.locks) { - mtx.wait_cond_var(win32::INFINITE)!; + mtx.wait_cond_var(win32::INFINITE)!!; } mtx.locks = 1; mtx.owner_thread = current_thread; @@ -179,13 +177,14 @@ fn void? NativeTimedMutex.lock(&mtx) <* @require mtx.initialized : "Mutex was not initialized" + @return? thread::WAIT_TIMEOUT *> fn void? NativeTimedMutex.lock_timeout(&mtx, ulong ms) { Win32_DWORD current_thread = win32::getCurrentThreadId(); if (mtx.owner_thread == current_thread) { - if (!mtx.recursive) return thread::LOCK_FAILED?; + assert(mtx.recursive, "The mutex was not recursive, but was locked in the same thread more than once."); mtx.locks++; return; } @@ -216,7 +215,7 @@ fn void? NativeTimedMutex.lock_timeout(&mtx, ulong ms) return; } } - return thread::WAIT_FAILED?; + return thread::WAIT_TIMEOUT~; } <* @@ -244,14 +243,14 @@ fn bool NativeTimedMutex.try_lock(&mtx) <* @require mtx.initialized : "Mutex was not initialized" *> -fn void? NativeTimedMutex.unlock(&mtx) +fn void NativeTimedMutex.unlock(&mtx) { win32::acquireSRWLockExclusive(&mtx.srw_lock); $if env::COMPILER_SAFE_MODE: if (mtx.owner_thread != win32::getCurrentThreadId()) { win32::releaseSRWLockExclusive(&mtx.srw_lock); - return thread::UNLOCK_FAILED?; // Mutex was not locked by the current thread + abort("Mutex was not locked by the current thread"); } $endif @@ -270,25 +269,29 @@ fn void? NativeConditionVariable.init(&cond) cond.cond_var = {}; } -fn void? NativeConditionVariable.destroy(&cond) @maydiscard +fn void NativeConditionVariable.destroy(&cond) { // Nothing to do } -fn void? NativeConditionVariable.signal(&cond) +fn void NativeConditionVariable.signal(&cond) { win32::wakeConditionVariable(&cond.cond_var); } -fn void? NativeConditionVariable.broadcast(&cond) +fn void NativeConditionVariable.broadcast(&cond) { win32::wakeAllConditionVariable(&cond.cond_var); } -fn void? timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout) @private +<* + @return "true if wait succeeded, false if it timed out" +*> +fn bool timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout) @private { Win32_DWORD owner_thread = mtx.owner_thread; - if (mtx.locks != 1 || owner_thread != win32::getCurrentThreadId()) return thread::WAIT_FAILED?; + assert(mtx.locks == 1, "Expected lock count = 1, was %d", mtx.locks); + assert(owner_thread == win32::getCurrentThreadId(), "Timed wait on other thread"); mtx.owner_thread = 0; mtx.locks = 0; defer @@ -300,45 +303,47 @@ fn void? timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout { if (win32::getLastError() == win32::ERROR_TIMEOUT) { - return thread::WAIT_TIMEOUT?; + return false; } - return thread::WAIT_FAILED?; + abort("Unexpected error sleepConditionVariableSRW: %d", win32::getLastError()); } + return true; } <* @require mtx.initialized : "Mutex was not initialized" *> -fn void? NativeConditionVariable.wait(&cond, NativeMutex* mtx) @inline +fn void NativeConditionVariable.wait(&cond, NativeMutex* mtx) @inline { - return timedwait(cond, mtx, win32::INFINITE) @inline; + bool res = timedwait(cond, mtx, win32::INFINITE) @inline; + assert(res, "Timed wait exited despite inifite time"); } <* @require mtx.initialized : "Mutex was not initialized" - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> fn void? NativeConditionVariable.wait_timeout(&cond, NativeMutex* mtx, ulong ms) @inline { if (ms > uint.max) ms = uint.max; - return timedwait(cond, mtx, (uint)ms) @inline; + if (!timedwait(cond, mtx, (uint)ms) @inline) return thread::WAIT_TIMEOUT~; } <* @require mtx.initialized : "Mutex was not initialized" - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> fn void? NativeConditionVariable.wait_timeout_duration(&cond, NativeMutex* mtx, Duration duration) @inline { - if (duration < time::DURATION_ZERO) return thread::WAIT_TIMEOUT?; + if (duration < time::DURATION_ZERO) return thread::WAIT_TIMEOUT~; long ms = duration.to_ms(); if (ms > uint.max) ms = uint.max; - return timedwait(cond, mtx, (uint)ms) @inline; + if (!timedwait(cond, mtx, (uint)ms) @inline) return thread::WAIT_TIMEOUT~; } <* @require mtx.initialized : "Mutex was not initialized" - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> fn void? NativeConditionVariable.wait_until(&cond, NativeMutex* mtx, Time time) @inline { @@ -351,12 +356,13 @@ fn void? NativeThread.create(&thread, ThreadFn func, void* args) if (!(*thread = (NativeThread)win32::createThread(null, 0, func, args, 0, null))) return thread::INIT_FAILED?; } -fn void? NativeThread.detach(thread) @inline +fn void NativeThread.detach(thread) @inline { - if (!win32::closeHandle(thread)) return thread::DETACH_FAILED?; + if (!win32::closeHandle(thread)) abort("Detaching thread failed"); } + fn void native_thread_exit(int result) @inline { win32::exitThread((uint)result); @@ -377,11 +383,14 @@ fn void NativeOnceFlag.call_once(&flag, OnceFn func) win32::initOnceExecuteOnce(&flag.init_once, callback, func, null); } -fn int? NativeThread.join(thread) +<* + @return "the exit value of the thread" +*> +fn int NativeThread.join(thread) { uint res; - if (win32::waitForSingleObject(thread, win32::INFINITE) == win32::WAIT_FAILED) return thread::JOIN_FAILED?; - if (!win32::getExitCodeThread(thread, &res)) return thread::JOIN_FAILED?; + if (win32::waitForSingleObject(thread, win32::INFINITE) == win32::WAIT_FAILED) unreachable("Failed to join thread, received wait failed."); + if (!win32::getExitCodeThread(thread, &res)) unreachable("Failed to retrieve exit code when joining."); defer win32::closeHandle(thread); return res; } diff --git a/lib/std/threads/pool.c3 b/lib/std/threads/pool.c3 index ccec23741..5070824ba 100644 --- a/lib/std/threads/pool.c3 +++ b/lib/std/threads/pool.c3 @@ -6,16 +6,19 @@ struct ThreadPool Mutex mu; QueueItem[SIZE] queue; usz qindex; + usz qworking; usz num_threads; bitstruct : char { bool initialized; bool stop; bool stop_now; + bool joining; } Thread[SIZE] pool; ConditionVariable notify; + ConditionVariable collect; } struct QueueItem @private @@ -29,10 +32,11 @@ struct QueueItem @private *> fn void? ThreadPool.init(&self) { - defer catch @ok(self.destroy()); + defer catch self.destroy(); *self = { .num_threads = SIZE, .initialized = true }; self.mu.init()!; self.notify.init()!; + self.collect.init()!; foreach (&thread : self.pool) { thread.create(&process_work, self)!; @@ -41,44 +45,62 @@ fn void? ThreadPool.init(&self) } } +<* + Join all threads in the pool. +*> +fn void? ThreadPool.join(&self) @maydiscard // Remove optional in 0.8.0 +{ + if (self.initialized) + { + self.mu.lock(); + self.joining = true; + self.notify.broadcast(); + self.collect.wait(&self.mu); + self.joining = false; + self.mu.unlock(); + } +} + <* Stop all the threads and cleanup the pool. Any pending work will be dropped. *> -fn void? ThreadPool.destroy(&self) +fn void? ThreadPool.destroy(&self) @maydiscard // Remove optional in 0.8.0 { - return self.@shutdown(self.stop_now); + self.@shutdown(self.stop_now); } <* Stop all the threads and cleanup the pool. Any pending work will be processed. *> -fn void? ThreadPool.stop_and_destroy(&self) +fn void? ThreadPool.stop_and_destroy(&self) @maydiscard // Remove optional in 0.8.0 { - return self.@shutdown(self.stop); + self.@shutdown(self.stop); } -macro void? ThreadPool.@shutdown(&self, #stop) @private +macro void ThreadPool.@shutdown(&self, #stop) @private { if (self.initialized) { - self.mu.lock()!; + self.mu.lock(); #stop = true; - self.notify.broadcast()!; - self.mu.unlock()!; + self.notify.broadcast(); + self.mu.unlock(); // Wait for all threads to shutdown. while (true) { - self.mu.lock()!; - defer self.mu.unlock()!!; + self.mu.lock(); + defer self.mu.unlock(); if (self.num_threads == 0) { break; } - self.notify.signal()!; + self.notify.signal(); } - self.mu.destroy()!; + self.mu.destroy(); + self.notify.destroy(); + self.collect.destroy(); self.initialized = false; } } @@ -87,18 +109,19 @@ macro void? ThreadPool.@shutdown(&self, #stop) @private Push a new job to the pool. Returns whether the queue is full, in which case the job is ignored. *> -fn void? ThreadPool.push(&self, ThreadFn func, void* arg) +fn void? ThreadPool.push(&self, ThreadFn func, void* arg) @maydiscard // Remove optional in 0.8.0 { while (true) { - self.mu.lock()!; - defer self.mu.unlock()!!; + self.mu.lock(); + defer self.mu.unlock(); if (self.qindex < SIZE) { self.queue[self.qindex] = { .func = func, .arg = arg }; self.qindex++; + self.qworking++; // Notify the threads that work is available. - self.notify.broadcast()!; + self.notify.broadcast(); return; } } @@ -109,30 +132,34 @@ fn int process_work(void* arg) @private ThreadPool* self = arg; while (true) { - self.mu.lock()!!; + self.mu.lock(); // Wait for work. while (self.qindex == 0) { + if (self.joining && self.qworking == 0) self.collect.broadcast(); if (self.stop) { // Shutdown requested. self.num_threads--; - self.mu.unlock()!!; + self.mu.unlock(); return 0; } - self.notify.wait(&self.mu)!!; + self.notify.wait(&self.mu); if (self.stop_now) { // Shutdown requested. self.num_threads--; - self.mu.unlock()!!; + self.mu.unlock(); return 0; } } // Process the job. self.qindex--; QueueItem item = self.queue[self.qindex]; - self.mu.unlock()!!; + self.mu.unlock(); item.func(item.arg); + self.mu.lock(); + self.qworking--; + self.mu.unlock(); } } \ No newline at end of file diff --git a/lib/std/threads/thread.c3 b/lib/std/threads/thread.c3 index fa375f776..5a671c0a8 100644 --- a/lib/std/threads/thread.c3 +++ b/lib/std/threads/thread.c3 @@ -21,33 +21,36 @@ alias ThreadFn = fn int(void* arg); faultdef INIT_FAILED, - DESTROY_FAILED, - LOCK_FAILED, LOCK_TIMEOUT, - UNLOCK_FAILED, - SIGNAL_FAILED, - WAIT_FAILED, WAIT_TIMEOUT, - DETACH_FAILED, - JOIN_FAILED, + THREAD_NOT_FOUND, INTERRUPTED, CHANNEL_CLOSED; +faultdef + DETACH_FAILED @deprecated, + UNLOCK_FAILED @deprecated, + DESTROY_FAILED @deprecated, + SIGNAL_FAILED @deprecated, + JOIN_FAILED @deprecated, + LOCK_FAILED @deprecated, + WAIT_FAILED @deprecated; + macro void? Mutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, {}); macro bool Mutex.is_initialized(mutex) => ((NativeMutex*)&mutex).is_initialized(); -macro void? RecursiveMutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, {.recursive}); -macro void? Mutex.destroy(&mutex) => NativeMutex.destroy((NativeMutex*)mutex); -macro void? Mutex.lock(&mutex) => NativeMutex.lock((NativeMutex*)mutex); -macro bool Mutex.try_lock(&mutex) => NativeMutex.try_lock((NativeMutex*)mutex); -macro void? Mutex.unlock(&mutex) => NativeMutex.unlock((NativeMutex*)mutex); +macro void? RecursiveMutex.init(&mutex) @maydiscard => NativeMutex.init((NativeMutex*)mutex, {.recursive}); +macro void? Mutex.destroy(&mutex) @maydiscard => NativeMutex.destroy((NativeMutex*)mutex); // Remove optional in 0.8.0 +macro void? Mutex.lock(&mutex) @maydiscard => NativeMutex.lock((NativeMutex*)mutex); +macro bool Mutex.try_lock(&mutex) => NativeMutex.try_lock((NativeMutex*)mutex); +macro void? Mutex.unlock(&mutex) @maydiscard => NativeMutex.unlock((NativeMutex*)mutex); // Remove optional in 0.8.0 macro void? TimedMutex.init(&mutex) => NativeTimedMutex.init((NativeTimedMutex*)mutex, {.timed}); -macro void? TimedRecursiveMutex.init(&mutex) => NativeTimedMutex.init((NativeTimedMutex*)mutex, {.timed, .recursive}); -macro void? TimedMutex.destroy(&mutex) => NativeTimedMutex.destroy((NativeTimedMutex*)mutex); -macro void? TimedMutex.lock(&mutex) => NativeTimedMutex.lock((NativeTimedMutex*)mutex); +macro void? TimedRecursiveMutex.init(&mutex) @maydiscard => NativeTimedMutex.init((NativeTimedMutex*)mutex, {.timed, .recursive}); +macro void? TimedMutex.destroy(&mutex) @maydiscard => NativeTimedMutex.destroy((NativeTimedMutex*)mutex); // Remove optional in 0.8.0 +macro void? TimedMutex.lock(&mutex) @maydiscard => NativeTimedMutex.lock((NativeTimedMutex*)mutex); macro void? TimedMutex.lock_timeout(&mutex, ulong ms) => NativeTimedMutex.lock_timeout((NativeTimedMutex*)mutex, ms); macro bool TimedMutex.try_lock(&mutex) => NativeTimedMutex.try_lock((NativeTimedMutex*)mutex); -macro void? TimedMutex.unlock(&mutex) => NativeTimedMutex.unlock((NativeTimedMutex*)mutex); +macro void? TimedMutex.unlock(&mutex) @maydiscard => NativeTimedMutex.unlock((NativeTimedMutex*)mutex); // Remove optional in 0.8.0 macro void fence(AtomicOrdering $ordering) @safemacro { @@ -56,22 +59,22 @@ macro void fence(AtomicOrdering $ordering) @safemacro macro void Mutex.@in_lock(&mutex; @body) { - (void)mutex.lock(); - defer (void)mutex.unlock(); + mutex.lock(); + defer mutex.unlock(); @body(); } macro void? ConditionVariable.init(&cond) => NativeConditionVariable.init((NativeConditionVariable*)cond); -macro void? ConditionVariable.destroy(&cond) => NativeConditionVariable.destroy((NativeConditionVariable*)cond); -macro void? ConditionVariable.signal(&cond) => NativeConditionVariable.signal((NativeConditionVariable*)cond); -macro void? ConditionVariable.broadcast(&cond) => NativeConditionVariable.broadcast((NativeConditionVariable*)cond); -macro void? ConditionVariable.wait(&cond, Mutex* mutex) +macro void? ConditionVariable.destroy(&cond) @maydiscard => NativeConditionVariable.destroy((NativeConditionVariable*)cond); +macro void? ConditionVariable.signal(&cond) @maydiscard => NativeConditionVariable.signal((NativeConditionVariable*)cond); +macro void? ConditionVariable.broadcast(&cond) @maydiscard => NativeConditionVariable.broadcast((NativeConditionVariable*)cond); +macro void? ConditionVariable.wait(&cond, Mutex* mutex) @maydiscard { return NativeConditionVariable.wait((NativeConditionVariable*)cond, (NativeMutex*)mutex); } <* @require $defined(Duration d = #ms_or_duration) ||| $defined(ulong l = #ms_or_duration) - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> macro void? ConditionVariable.wait_timeout(&cond, Mutex* mutex, #ms_or_duration) @safemacro { @@ -83,7 +86,7 @@ macro void? ConditionVariable.wait_timeout(&cond, Mutex* mutex, #ms_or_duration) } <* - @return? thread::WAIT_TIMEOUT, thread::WAIT_FAILED + @return? thread::WAIT_TIMEOUT *> macro void? ConditionVariable.wait_until(&cond, Mutex* mutex, Time time) { @@ -100,8 +103,12 @@ macro void? Thread.create(&thread, ThreadFn thread_fn, void* arg) return NativeThread.create(thread, thread_fn, arg); } -macro void? Thread.detach(thread) => NativeThread.detach(thread); -macro int? Thread.join(thread) => NativeThread.join(thread); +macro void? Thread.detach(thread) @maydiscard => NativeThread.detach(thread); + +<* + @return? THREAD_NOT_FOUND : "If the thread cannot be found." +*> +macro int? Thread.join(thread) @maydiscard => NativeThread.join(thread); macro bool Thread.equals(thread, Thread other) => NativeThread.equals(thread, other); macro void OnceFlag.call(&flag, OnceFn func) => NativeOnceFlag.call_once((NativeOnceFlag*)flag, func); diff --git a/lib/std/threads/unbuffered_channel.c3 b/lib/std/threads/unbuffered_channel.c3 index f7b4ffd7f..1bb429d86 100644 --- a/lib/std/threads/unbuffered_channel.c3 +++ b/lib/std/threads/unbuffered_channel.c3 @@ -28,31 +28,29 @@ fn void? UnbufferedChannel.init(&self, Allocator allocator) channel.read_waiting = 0; channel.mu.init()!; - defer catch (void)channel.mu.destroy(); + defer catch channel.mu.destroy(); channel.send_mu.init()!; - defer catch (void)channel.send_mu.destroy(); + defer catch channel.send_mu.destroy(); channel.send_cond.init()!; - defer catch (void)channel.send_cond.destroy(); + defer catch channel.send_cond.destroy(); channel.read_mu.init()!; - defer catch (void)channel.read_mu.destroy(); + defer catch channel.read_mu.destroy(); channel.read_cond.init()!; *self = (UnbufferedChannel)channel; } -fn void? UnbufferedChannel.destroy(&self) +fn void? UnbufferedChannel.destroy(&self) @maydiscard // Remove optional in 0.8.0 { UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self); - fault err = @catch(channel.mu.destroy()); - err = @catch(channel.send_mu.destroy()) ?: err; - err = @catch(channel.send_cond.destroy()) ?: err; - err = @catch(channel.read_mu.destroy()) ?: err; - err = @catch(channel.read_cond.destroy()) ?: err; + channel.mu.destroy(); + channel.send_mu.destroy(); + channel.send_cond.destroy(); + channel.read_mu.destroy(); + channel.read_cond.destroy(); allocator::free(channel.allocator, channel); - if (err) return err?; - *self = null; } @@ -60,10 +58,10 @@ fn void? UnbufferedChannel.push(self, Type val) { UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self; - channel.mu.lock()!; - defer catch (void)channel.mu.unlock(); - channel.send_mu.lock()!; - defer catch (void)channel.send_mu.unlock(); + channel.mu.lock(); + defer catch channel.mu.unlock(); + channel.send_mu.lock(); + defer catch channel.send_mu.unlock(); if (channel.closed) { @@ -78,32 +76,32 @@ fn void? UnbufferedChannel.push(self, Type val) // if reader is already waiting for us -> awake him if (channel.read_waiting > 0) { - channel.read_cond.signal()!; + channel.read_cond.signal(); } // wait until reader takes value from buffer - channel.send_cond.wait(&channel.mu)!; + channel.send_cond.wait(&channel.mu); if (channel.closed) return thread::CHANNEL_CLOSED?; - channel.mu.unlock()!; - channel.send_mu.unlock()!; + channel.mu.unlock(); + channel.send_mu.unlock(); } fn Type? UnbufferedChannel.pop(self) { UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self; - channel.mu.lock()!; - defer catch (void)channel.mu.unlock(); - channel.read_mu.lock()!; - defer catch (void)channel.read_mu.unlock(); + channel.mu.lock(); + defer catch channel.mu.unlock(); + channel.read_mu.lock(); + defer catch channel.read_mu.unlock(); // if no one is waiting, then there is nothing in the buffer while (!channel.closed && channel.send_waiting == 0) { channel.read_waiting++; - channel.read_cond.wait(&channel.mu)!; + channel.read_cond.wait(&channel.mu); channel.read_waiting--; } @@ -114,25 +112,24 @@ fn Type? UnbufferedChannel.pop(self) // awake sender channel.send_waiting--; - channel.send_cond.signal()!; + channel.send_cond.signal(); - channel.mu.unlock()!; - channel.read_mu.unlock()!; + channel.mu.unlock(); + channel.read_mu.unlock(); return ret; } -fn void? UnbufferedChannel.close(self) +fn void? UnbufferedChannel.close(self) @maydiscard // Remove optional in 0.8.0 { UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)self; - fault err = @catch(channel.mu.lock()); + channel.mu.lock(); channel.closed = true; - err = @catch(channel.read_cond.broadcast()) ?: err; - err = @catch(channel.send_cond.broadcast()) ?: err; - err = @catch(channel.mu.unlock()) ?: err; + channel.read_cond.broadcast(); + channel.send_cond.broadcast(); + channel.mu.unlock(); - if (err) return err?; } \ No newline at end of file diff --git a/releasenotes.md b/releasenotes.md index b52f27ceb..46d72e9bc 100644 --- a/releasenotes.md +++ b/releasenotes.md @@ -13,6 +13,14 @@ - Hex escapes like `"\x80"` would be incorrectly lowered. #2623 ### Stdlib changes +- Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads. +- Return of Thread/Mutex/CondVar `destroy()` is now "@maydiscard" and should be ignored. It will return void in 0.8.0. +- Return of Mutex `unlock()` and `lock()` is now "@maydiscard" and should be ignored. They will return void in 0.8.0. +- Return of ConditionVariable `signal()` `broadcast()` and `wait()` are now "@maydiscard". They will return void in 0.8.0. +- Return of Thread `detatch()` is now "@maydiscard". It will return void in 0.8.0. +- Buffered/UnbufferedChannel, and both ThreadPools have `@maydiscard` on a set of functions. They will retunr void in 0.8.0. +- Pthread bindings correctly return Errno instead of CInt. +- Return of Thread `join()` is now "@maydiscard". ## 0.7.8 Change list @@ -90,7 +98,7 @@ - Allow `..` ranges to use "a..a-1" in order to express zero length. - Disallow aliasing of `@local` symbols with a higher visibility in the alias. - Add `--max-macro-iterations` to set macro iteration limit. -- Improved generic inference in initializers #2541. +- Improved generic inference in initializers #2541. - "Maybe-deref" subscripting `foo.[i] += 1` #2540. - ABI change for vectors: store and pass them as arrays #2542. - Add @simd and @align attributes to typedef #2543. @@ -132,7 +140,7 @@ - Unify generic and regular module namespace. - `env::PROJECT_VERSION` now returns the version in project.json. - Comparing slices and arrays of user-defined types that implement == operator now works #2486. -- Add 'loop-vectorize', 'slp-vectorize', 'unroll-loops' and 'merge-functions' optimization flags #2491. +- Add 'loop-vectorize', 'slp-vectorize', 'unroll-loops' and 'merge-functions' optimization flags #2491. - Add exec timings to -vv output #2490. - Support #! as a comment on the first line only. - Add `+++=` operator. @@ -206,7 +214,7 @@ - `@assignable_to` is deprecated in favour of `$define` - Add `linklib-dir` to c3l-libraries to place their linked libraries in. Defaults to `linked-libs` - If the `os-arch` linked library doesn't exist, try with `os` for c3l libs. -- A file with an inferred module may not contain additional other modules. +- A file with an inferred module may not contain additional other modules. - Update error message for missing body after if/for/etc #2289. - `@is_const` is deprecated in favour of directly using `$defined`. - `@is_lvalue(#value)` is deprecated in favour of directly using `$defined`. diff --git a/test/unit/stdlib/threads/channel.c3 b/test/unit/stdlib/threads/channel.c3 index 139301dbd..df9ca2089 100644 --- a/test/unit/stdlib/threads/channel.c3 +++ b/test/unit/stdlib/threads/channel.c3 @@ -11,7 +11,7 @@ fn void init_destroy_buffered() @test { BufferedChannel{int} c; c.init(mem, 1)!!; - defer c.destroy()!!; + defer c.destroy(); } } @@ -21,7 +21,7 @@ fn void init_destroy_unbuffered() @test { UnbufferedChannel{int} c; c.init(mem)!!; - defer c.destroy()!!; + defer c.destroy(); } } @@ -29,7 +29,7 @@ fn void push_to_buffered_channel_no_lock() @test { BufferedChannel{int} c; c.init(mem, 1)!!; - defer c.destroy()!!; + defer c.destroy(); c.push(1)!!; } @@ -38,7 +38,7 @@ fn void push_pop_buffered_no_locks() @test { BufferedChannel{int} c; c.init(mem, 1)!!; - defer c.destroy()!!; + defer c.destroy(); c.push(123)!!; int got = c.pop()!!; @@ -49,10 +49,10 @@ fn void push_pop_unbuffered_with_locks() @test { UnbufferedChannel{int} c; c.init(mem)!!; - defer c.destroy()!!; + defer c.destroy(); Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { @@ -72,9 +72,9 @@ fn void sending_to_closed_unbuffered_chan_is_forbidden() @test { UnbufferedChannel{int} c; c.init(mem, )!!; - defer c.destroy()!!; + defer c.destroy(); - c.close()!!; + c.close(); if (catch err = c.push(123)) { @@ -88,9 +88,9 @@ fn void sending_to_closed_buffered_chan_is_forbidden() @test { BufferedChannel{int} c; c.init(mem, 1)!!; - defer c.destroy()!!; + defer c.destroy(); - c.close()!!; + c.close(); if (catch err = c.push(123)) { @@ -104,9 +104,9 @@ fn void reading_from_empty_closed_unbuffered_chan_is_forbidden() @test { UnbufferedChannel{int} c; c.init(mem, )!!; - defer c.destroy()!!; + defer c.destroy(); - c.close()!!; + c.close(); if (catch err = c.pop()) { @@ -120,9 +120,9 @@ fn void reading_from_empty_closed_buffered_chan_is_forbidden() @test { BufferedChannel{int} c; c.init(mem, 1)!!; - defer c.destroy()!!; + defer c.destroy(); - c.close()!!; + c.close(); if (catch err = c.pop()) { @@ -136,13 +136,13 @@ fn void reading_from_non_empty_closed_buffered_chan_is_ok() @test { BufferedChannel{int} c; c.init(mem, 3)!!; - defer c.destroy()!!; + defer c.destroy(); c.push(1)!!; c.push(2)!!; c.push(3)!!; - c.close()!!; + c.close(); int got = c.pop()!!; assert(got == 1); @@ -165,15 +165,15 @@ fn void reading_from_empty_buffered_chan_aborted_by_close() @test { BufferedChannel{int} c; c.init(mem, 3)!!; - defer c.destroy()!!; + defer c.destroy(); Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { BufferedChannel{int} c = (BufferedChannel{int})arg; - c.close()!!; + c.close(); return 0; }, (void*)c)!!; @@ -191,15 +191,15 @@ fn void reading_from_unbuffered_chan_aborted_by_close() @test { UnbufferedChannel{int} c; c.init(mem, )!!; - defer c.destroy()!!; + defer c.destroy(); Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { UnbufferedChannel{int} c = (UnbufferedChannel{int})arg; - c.close()!!; + c.close(); return 0; }, (void*)c)!!; @@ -217,17 +217,17 @@ fn void sending_to_full_buffered_chan_aborted_by_close() @test { BufferedChannel{int} c; c.init(mem, 1)!!; - defer c.destroy()!!; + defer c.destroy(); c.push(1)!!; Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { BufferedChannel{int} c = (BufferedChannel{int})arg; - c.close()!!; + c.close(); return 0; }, (void*)c)!!; @@ -245,15 +245,15 @@ fn void sending_to_unbuffered_chan_aborted_by_close() @test { UnbufferedChannel{int} c; c.init(mem, )!!; - defer c.destroy()!!; + defer c.destroy(); Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { UnbufferedChannel{int} c = (UnbufferedChannel{int})arg; - c.close()!!; + c.close(); return 0; }, (void*)c)!!; @@ -271,10 +271,10 @@ fn void multiple_actions_unbuffered() @test { UnbufferedChannel{int} c; c.init(mem, )!!; - defer c.destroy()!!; + defer c.destroy(); Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { @@ -305,10 +305,10 @@ fn void multiple_actions_buffered() @test { BufferedChannel{int} c; c.init(mem, 10)!!; - defer c.destroy()!!; + defer c.destroy(); Thread thread; - defer thread.join()!!; + defer thread.join(); thread.create(fn int(void* arg) { diff --git a/test/unit/stdlib/threads/mutex.c3 b/test/unit/stdlib/threads/mutex.c3 index befe55872..fc305800d 100644 --- a/test/unit/stdlib/threads/mutex.c3 +++ b/test/unit/stdlib/threads/mutex.c3 @@ -5,35 +5,12 @@ import std::os; const TEST_MAGNITUDE = 10; -fn void lock_control_test() @test -{ - Mutex m; - m.init()!!; - m.lock()!!; - assert(@catch(m.lock())); -} -fn void unlock_control_test() @test -{ - Mutex m; - m.init()!!; - assert(@catch(m.unlock())); -} -fn void lock_with_double_unlock_test() @test +fn void own_mutex(Mutex* m) { - Mutex m; - m.init()!!; - - m.lock()!!; - m.unlock()!!; - assert(@catch(m.unlock())); -} - -fn void? own_mutex(Mutex* m) -{ - m.lock()!; - m.unlock()!; + m.lock(); + m.unlock(); } fn void ensure_owner_checks() @test @@ -48,12 +25,9 @@ fn void ensure_owner_checks() @test t.create((ThreadFn)&own_mutex, &m)!!; } - foreach(&t : threads) - { - t.join()!!; - } + foreach(&t : threads) t.join(); - own_mutex(&m)!!; + own_mutex(&m); } struct ArgsWrapper1 @@ -64,23 +38,23 @@ struct ArgsWrapper1 fn void shared_mutex_increment(ArgsWrapper1* args) { - args.m.lock()!!; + args.m.lock(); args.v++; - args.m.unlock()!!; + args.m.unlock(); } fn void shared_mutex_decrement(ArgsWrapper1* args) { - args.m.lock()!!; + args.m.lock(); args.v--; - args.m.unlock()!!; + args.m.unlock(); } fn void shared_mutex() @test { Mutex m; m.init()!!; - m.lock()!!; + m.lock(); ulong v; @@ -101,11 +75,8 @@ fn void shared_mutex() @test (&threads[i]).create((ThreadFn)&shared_mutex_decrement, &args)!!; } - m.unlock()!!; - foreach(&t : threads) - { - t.join()!!; - } + m.unlock(); + foreach(&t : threads) t.join(); assert(v == 0); } @@ -117,12 +88,12 @@ fn void acquire_recursively(RecursiveMutex* m) for (usz i = 0; i < 5 * TEST_MAGNITUDE; i++) { - ((Mutex*)m).lock()!!; + ((Mutex*)m).lock(); } for (usz i = 0; i < 5 * TEST_MAGNITUDE; i++) { - ((Mutex*)m).unlock()!!; + ((Mutex*)m).unlock(); } } @@ -130,7 +101,7 @@ fn void test_recursive_mutex() @test { RecursiveMutex m; m.init()!!; - defer m.destroy()!!; + defer m.destroy(); Thread[3 * TEST_MAGNITUDE] threads; foreach(&t : threads) @@ -138,10 +109,7 @@ fn void test_recursive_mutex() @test t.create((ThreadFn)&acquire_recursively, &m)!!; } - foreach(&t : threads) - { - t.join()!!; - } + foreach(&t : threads) t.join(); return acquire_recursively(&m); } diff --git a/test/unit/stdlib/threads/pool.c3 b/test/unit/stdlib/threads/pool.c3 index 82e67f7e8..cfb86a879 100644 --- a/test/unit/stdlib/threads/pool.c3 +++ b/test/unit/stdlib/threads/pool.c3 @@ -9,7 +9,7 @@ fn void init_destroy() @test { Pool pool; pool.init()!!; - pool.destroy()!!; + pool.destroy(); } } @@ -21,8 +21,8 @@ fn void push_destroy() @test int y = 20; Pool pool; pool.init()!!; - defer pool.destroy()!!; - pool.push(&do_work, &y)!!; + defer pool.destroy(); + pool.push(&do_work, &y); thread::sleep(time::ms(50)); test::eq(@atomic_load(x), @atomic_load(y)); } @@ -36,16 +36,38 @@ fn void push_stop() @test int y = 20; Pool pool; pool.init()!!; - pool.push(&do_work, &y)!!; - pool.stop_and_destroy()!!; + pool.push(&do_work, &y); + pool.stop_and_destroy(); test::eq(@atomic_load(x), @atomic_load(y)); } } +fn void join() @test +{ + @atomic_store(x, 0); + Pool pool; + pool.init()!!; + defer pool.stop_and_destroy(); + for (usz i = 0; i < 4; i++) + { + pool.push(&do_wait, (void*)i); + } + pool.join(); + test::eq(x, 6); +} + int x; fn int do_work(void* arg) { @atomic_store(x, @atomic_load(*(int*)arg)); return 0; +} + +fn int do_wait(void* arg) +{ + usz value = (iptr)arg; + for (usz i = 0; i < value; i++) thread::sleep(time::ms(50)); + @atomic_store(x, @atomic_load(x) + (int)value); + return 0; } \ No newline at end of file diff --git a/test/unit/stdlib/threads/simple_thread.c3 b/test/unit/stdlib/threads/simple_thread.c3 index 211599209..c5a7ab0b4 100644 --- a/test/unit/stdlib/threads/simple_thread.c3 +++ b/test/unit/stdlib/threads/simple_thread.c3 @@ -26,8 +26,8 @@ fn void testrun_mutex() foreach (&t : ts) { t.create(fn int(void* arg) { - m_global.lock()!!; - defer m_global.unlock()!!; + m_global.lock(); + defer m_global.unlock(); a += 10; thread::sleep_ms(5); a *= 10; @@ -45,32 +45,47 @@ fn void testrun_mutex() assert(t.join()!! == 0); } assert(a == ts.len); - m_global.destroy()!!; + m_global.destroy(); } fn void testrun_mutex_try() @test { Mutex m; m.init()!!; - m.lock()!!; + m.lock(); assert(m.try_lock() == false); - m.unlock()!!; + m.unlock(); assert(m.try_lock() == true); - m.unlock()!!; + m.unlock(); } + +int val_mutex = 0; + fn void testrun_mutex_timeout() @test { TimedMutex m; m.init()!!; - m.lock()!!; - if (try m.lock_timeout(20)) - { - unreachable("lock_timeout should fail"); - } - m.unlock()!!; + m.lock(); + Thread t; + val_mutex = 0; + t.create(fn int(void* arg) { + TimedMutex* m = arg; + if (try m.lock_timeout(20)) + { + val_mutex = 1; + } + else + { + val_mutex = 2; + } + return 0; + }, &m)!!; + t.join(); + assert(val_mutex == 2); + m.unlock(); m.lock_timeout(20)!!; - m.unlock()!!; + m.unlock(); } int x_once = 100;