Add compare_exchange. Rudimentary threads (subject to change)

This commit is contained in:
Christoffer Lerno
2023-01-10 18:54:42 +01:00
committed by Christoffer Lerno
parent c9e40cfa37
commit da65de2d01
14 changed files with 814 additions and 7 deletions

View File

@@ -0,0 +1,239 @@
module std::thread::os;
import libc;
$if (thread::THREAD_MODEL == ThreadModel.POSIX):
const PTHREAD_MUTEX_NORMAL = 0;
const PTHREAD_MUTEX_ERRORCHECK = 1;
const PTHREAD_MUTEX_RECURSIVE = 2;
define NativeMutex = PthreadMutex;
define NativeConditionVariable = PthreadCond;
define NativeThread = Pthread;
define NativeOnceFlag = PthreadOnce;
define Pthread = distinct void*;
$if (env::OS_TYPE == OsType.LINUX):
define PthreadMutex = distinct ulong[5];
define PthreadAttribute = distinct ulong[7];
define PthreadMutexAttribute = distinct uint;
define PthreadCondAttribute = distinct uint;
define PthreadCond = distinct ulong[6];
define PthreadOnce = distinct uint;
$else:
define PthreadMutex = distinct ulong[8];
define PthreadMutexAttribute = distinct ulong[2];
define PthreadAttribute = distinct ulong[8];
define PthreadCond = distinct ulong[6];
define PthreadCondAttribute = distinct ulong[8];
define PthreadOnce = distinct ulong[2];
$endif;
define PosixThreadFn = fn void*(void*);
extern fn int pthread_attr_destroy(PthreadAttribute*);
extern fn int pthread_attr_getdetachstate(PthreadAttribute*, int*);
extern fn int pthread_attr_init(PthreadAttribute*);
extern fn int pthread_mutex_init(PthreadMutex*, PthreadMutexAttribute*);
extern fn int pthread_mutex_destroy(PthreadMutex*);
extern fn int pthread_mutex_lock(PthreadMutex*);
extern fn int pthread_mutexattr_init(PthreadMutexAttribute*);
extern fn int pthread_mutexattr_destroy(PthreadMutexAttribute*);
extern fn int pthread_mutexattr_settype(PthreadMutexAttribute*, int);
extern fn int pthread_cond_init(PthreadCond*, PthreadCondAttribute*);
extern fn int pthread_cond_destroy(PthreadCond*);
extern fn int pthread_detach(Pthread);
extern fn int pthread_join(Pthread, void**);
extern fn int pthread_create(Pthread*, PthreadAttribute*, PosixThreadFn, void*);
extern fn Pthread pthread_self();
extern fn Errno pthread_mutex_trylock(PthreadMutex*);
extern fn Errno pthread_mutex_unlock(PthreadMutex*);
extern fn Errno pthread_cond_signal(PthreadCond*);
extern fn Errno pthread_cond_broadcast(PthreadCond*);
extern fn Errno pthread_cond_wait(PthreadCond*, PthreadMutex*);
extern fn Errno pthread_cond_timedwait(PthreadCond*, PthreadMutex*, TimeSpec*);
extern fn void pthread_exit(void*);
extern fn void pthread_once(PthreadOnce*, OnceFn);
extern fn int pthread_equal(Pthread this, Pthread other);
extern fn int sched_yield();
fn void! NativeMutex.init(NativeMutex* mutex, MutexType type)
{
PthreadMutexAttribute attr;
if (pthread_mutexattr_init(&attr)) return ThreadFault.INIT_FAILED!;
defer pthread_mutexattr_destroy(&attr);
if (type & thread::MUTEX_RECURSIVE)
{
if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)) return ThreadFault.INIT_FAILED!;
}
if (pthread_mutex_init(mutex, &attr)) return ThreadFault.INIT_FAILED!;
}
fn void! NativeMutex.destroy(NativeMutex* mtx)
{
if (pthread_mutex_destroy(mtx)) return ThreadFault.DESTROY_FAILED!;
}
fn void! NativeMutex.lock(NativeMutex* mtx)
{
if (pthread_mutex_lock(mtx)) return ThreadFault.LOCK_FAILED!;
}
fn void! NativeMutex.lock_timeoutout(NativeMutex* mtx, ulong ms)
{
/* Try to acquire the lock and, if we fail, sleep for 5ms. */
Errno result;
while ((result = pthread_mutex_trylock(mtx)) == errno::EBUSY)
{
if (!ms) break;
ulong sleep = min(5, ms);
if (!libc::nanosleep(&& TimeSpec { .s = 0, .ns = sleep * 1000_000 }, null)) return ThreadFault.LOCK_FAILED!;
ms -= sleep;
}
switch (result)
{
case errno::OK:
return;
case errno::EBUSY:
case errno::ETIMEDOUT:
return ThreadFault.LOCK_TIMEOUT!;
default:
return ThreadFault.LOCK_FAILED!;
}
}
fn bool NativeMutex.try_lock(NativeMutex* mtx)
{
return !pthread_mutex_trylock(mtx);
}
fn void! NativeMutex.unlock(NativeMutex* mtx)
{
if (pthread_mutex_unlock(mtx)) return ThreadFault.UNLOCK_FAILED!;
}
fn void! NativeConditionVariable.init(NativeConditionVariable* cond)
{
if (pthread_cond_init(cond, null)) return ThreadFault.INIT_FAILED!;
}
fn void! NativeConditionVariable.destroy(NativeConditionVariable* cond)
{
if (pthread_cond_destroy(cond)) return ThreadFault.DESTROY_FAILED!;
}
fn void! NativeConditionVariable.signal(NativeConditionVariable* cond)
{
if (pthread_cond_signal(cond)) return ThreadFault.SIGNAL_FAILED!;
}
fn void! NativeConditionVariable.broadcast(NativeConditionVariable* cond)
{
if (pthread_cond_broadcast(cond)) return ThreadFault.SIGNAL_FAILED!;
}
fn void! NativeConditionVariable.wait(NativeConditionVariable* cond, NativeMutex* mtx)
{
if (pthread_cond_wait(cond, mtx)) return ThreadFault.WAIT_FAILED!;
}
fn void! NativeConditionVariable.wait_timeout(NativeConditionVariable* cond, NativeMutex* mtx, ulong ms)
{
TimeSpec now;
if (libc::timespec_get(&now, libc::TIME_UTC) != libc::TIME_UTC) return ThreadFault.WAIT_FAILED!;
now.s += ms / 1000;
now.ns += (ms % 1000) * 1000_000;
switch (pthread_cond_timedwait(cond, mtx, &now))
{
case errno::ETIMEDOUT:
return ThreadFault.WAIT_TIMEOUT!;
case errno::OK:
return;
default:
return ThreadFault.WAIT_FAILED!;
}
}
private fn void* callback(void* arg)
{
PosixThreadData *data = arg;
return (void*)(iptr)data.thread_fn(data.arg);
}
fn void! NativeThread.create(NativeThread* thread, ThreadFn thread_fn, void* arg)
{
PosixThreadData *thread_data = mem::alloc(PosixThreadData);
*thread_data = { .thread_fn = thread_fn, .arg = arg };
if (pthread_create(thread, null, &callback, thread_data) != 0)
{
*thread = null;
free(thread_data);
return ThreadFault.INIT_FAILED!;
}
}
fn void! NativeThread.detach(NativeThread thread)
{
if (!pthread_detach(thread)) return ThreadFault.DETACH_FAILED!;
}
fn void native_thread_exit(int result)
{
pthread_exit((void*)(iptr)result);
}
fn NativeThread native_thread_current()
{
return pthread_self();
}
fn bool NativeThread.equals(NativeThread this, NativeThread other)
{
return (bool)pthread_equal(this, other);
}
fn int! NativeThread.join(NativeThread thread)
{
void *pres;
if (pthread_join(thread, &pres)) return ThreadFault.JOIN_FAILED!;
return (int)(iptr)pres;
}
fn void NativeOnceFlag.call_once(NativeOnceFlag* flag, OnceFn func)
{
pthread_once(flag, func);
}
fn void native_thread_yield()
{
sched_yield();
}
private struct PosixThreadData
{
ThreadFn thread_fn;
void* arg;
}
fn void! native_sleep_nano(ulong nano)
{
TimeSpec to = { .s = 0, .ns = nano };
if (libc::nanosleep(&to, null)) return ThreadFault.INTERRUPTED!;
}
fn void! native_sleep_ms(ulong ms)
{
TimeSpec to = { .s = ms / 1000, .ns = (ms % 1000) * 1000_000 };
if (libc::nanosleep(&to, null)) return ThreadFault.INTERRUPTED!;
}
fn void! native_sleep(double s)
{
ulong si = (ulong)s;
TimeSpec to = { .s = si, .ns = (ulong)((s - si) * 1000_000_000) };
if (libc::nanosleep(&to, null)) return ThreadFault.INTERRUPTED!;
}
$endif;

View File

@@ -0,0 +1,351 @@
module std::thread::os;
$if (thread::THREAD_MODEL == ThreadModel.WIN32):
define NativeThread = Win32_HANDLE;
define Win32_CRITICAL_SECTION = distinct ulong[5];
define Win32_HANDLE = distinct ulong;
struct NativeMutex
{
union
{
Win32_CRITICAL_SECTION critical_section;
Win32_HANDLE handle;
}
bool already_locked;
bool recursive;
bool timed;
}
struct NativeOnceFlag
{
int status;
Win32_CRITICAL_SECTION lock;
}
struct NativeConditionVariable
{
union
{
struct
{
Win32_HANDLE event_one;
Win32_HANDLE event_all;
}
Win32_HANDLE[2] events;
}
uint waiters_count;
Win32_CRITICAL_SECTION waiters_count_lock;
}
extern fn void win32_InitializeCriticalSection(Win32_CRITICAL_SECTION* section) @extname("InitializeCriticalSection");
extern fn void win32_DeleteCriticalSection(Win32_CRITICAL_SECTION* section) @extname("DeleteCriticalSection");
extern fn Win32_HANDLE win32_CreateMutex(void*, bool, void*) @extname("CreateMutexA");
extern fn bool win32_CloseHandle(Win32_HANDLE) @extname("CloseHandle");
extern fn bool win32_ReleaseMutex(Win32_HANDLE) @extname("ReleaseMutex");
extern fn void win32_EnterCriticalSection(Win32_CRITICAL_SECTION* section) @extname("EnterCriticalSection");
extern fn void win32_LeaveCriticalSection(Win32_CRITICAL_SECTION* section) @extname("LeaveCriticalSection");
extern fn bool win32_TryEnterCriticalSection(Win32_CRITICAL_SECTION* section) @extname("TryEnterCriticalSection");
extern fn uint win32_WaitForSingleObject(Win32_HANDLE, uint milliseconds) @extname("WaitForSingleObject");
extern fn void win32_Sleep(uint ms) @extname("Sleep");
extern fn uint win32_WaitForMultipleObjects(uint count, Win32_HANDLE* handles, bool wait_all, uint ms) @extname("WaitForMultipleObjects");
extern fn Win32_HANDLE win32_CreateEventA(void* attributes, bool manual_reset, bool initial_state, char* name) @extname("CreateEventA");
extern fn bool win32_ResetEvent(Win32_HANDLE event) @extname("ResetEvent");
extern fn bool win32_SetEvent(Win32_HANDLE handle) @extname("SetEvent");
extern fn long win32_InterlockedCompareExchange(int* dest, int exchange, int comperand) @extname("InterlockedCompareExchange");
extern fn uint win32_SleepEx(uint ms, bool alertable) @extname("SleepEx");
extern fn Win32_HANDLE win32_CreateThread(void* attributes, usz stack, ThreadFn func, void* arg, uint flags, uint* thread_id) @extname("CreateThread");
extern fn bool win32_GetExitCodeThread(Win32_HANDLE handle, uint* exit_code) @extname("GetExitCodeThread");
extern fn uint win32_GetThreadId(Win32_HANDLE) @extname("GetThreadId");
extern fn void win32_ExitThread(uint res) @noreturn @extname("ExitThread");
extern fn Win32_HANDLE win32_GetCurrentThread() @extname("GetCurrentThread");
const uint WAIT_OBJECT_0 = 0;
const uint WAIT_ABANDONED = 128;
const uint WAIT_TIMEOUT = 258;
const uint WAIT_IO_COMPLETION = 192;
const uint WAIT_FAILED = (uint)-1;
const uint INFINITE = (uint)-1;
fn void! NativeMutex.init(NativeMutex* mtx, MutexType type)
{
mtx.already_locked = false;
mtx.recursive = (bool)(type & thread::MUTEX_RECURSIVE);
mtx.timed = (bool)(type & thread::MUTEX_TIMED);
if (!mtx.timed)
{
win32_InitializeCriticalSection(&(mtx.critical_section));
return;
}
if (!(mtx.handle = win32_CreateMutex(null, false, null))) return ThreadFault.INIT_FAILED!;
}
fn void! NativeMutex.destroy(NativeMutex* mtx)
{
if (!mtx.timed)
{
win32_DeleteCriticalSection(&mtx.critical_section);
return;
}
if (!win32_CloseHandle(mtx.handle)) return ThreadFault.DESTROY_FAILED!;
}
fn void! NativeMutex.lock(NativeMutex* mtx)
{
if (!mtx.timed)
{
win32_EnterCriticalSection(&mtx.critical_section);
}
else
{
switch (win32_WaitForSingleObject(mtx.handle, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_ABANDONED:
default:
return ThreadFault.LOCK_FAILED!;
}
}
if (!mtx.recursive)
{
while (mtx.already_locked) win32_Sleep(1);
}
mtx.already_locked = true;
}
/**
* @require mtx.timed "Only available for timed locks"
**/
fn void! NativeMutex.lock_timeout(NativeMutex* mtx, uint ms)
{
switch (win32_WaitForSingleObject(mtx.handle, ms))
{
case WAIT_OBJECT_0:
break;
case WAIT_TIMEOUT:
return ThreadFault.LOCK_TIMEOUT!;
case WAIT_ABANDONED:
default:
return ThreadFault.LOCK_FAILED!;
}
if (!mtx.recursive)
{
while (mtx.already_locked) win32_Sleep(1);
mtx.already_locked = true;
}
}
fn bool NativeMutex.try_lock(NativeMutex* mtx)
{
bool success = mtx.timed
? win32_WaitForSingleObject(mtx.handle, 0) == WAIT_OBJECT_0
: win32_TryEnterCriticalSection(&mtx.critical_section);
if (!success) return false;
if (!mtx.recursive)
{
if (mtx.already_locked)
{
assert(!mtx.timed);
win32_LeaveCriticalSection(&mtx.critical_section);
return false;
}
mtx.already_locked = true;
}
return true;
}
fn void! NativeMutex.unlock(NativeMutex* mtx)
{
mtx.already_locked = false;
if (!mtx.timed)
{
win32_LeaveCriticalSection(&mtx.critical_section);
return;
}
if (!win32_ReleaseMutex(mtx.handle)) return ThreadFault.UNLOCK_FAILED!;
}
const int CONDITION_EVENT_ONE = 0;
const int CONDITION_EVENT_ALL = 1;
fn void! NativeConditionVariable.init(NativeConditionVariable* cond)
{
cond.waiters_count = 0;
win32_InitializeCriticalSection(&cond.waiters_count_lock);
cond.event_one = win32_CreateEventA(null, false, false, null);
if (!cond.event_one)
{
cond.event_all = (Win32_HANDLE)0;
return ThreadFault.INIT_FAILED!;
}
cond.event_all = win32_CreateEventA(null, true, false, null);
if (!cond.event_all)
{
win32_CloseHandle(cond.event_one);
cond.event_one = (Win32_HANDLE)0;
return ThreadFault.INIT_FAILED!;
}
}
fn void! NativeConditionVariable.destroy(NativeConditionVariable* cond) @maydiscard
{
if (cond.event_one) win32_CloseHandle(cond.event_one);
if (cond.event_all) win32_CloseHandle(cond.event_all);
win32_DeleteCriticalSection(&cond.waiters_count_lock);
}
fn void! NativeConditionVariable.signal(NativeConditionVariable* cond)
{
win32_EnterCriticalSection(&cond.waiters_count_lock);
bool have_waiters = cond.waiters_count > 0;
win32_LeaveCriticalSection(&cond.waiters_count_lock);
if (have_waiters && !win32_SetEvent(cond.event_one)) return ThreadFault.SIGNAL_FAILED!;
}
fn void! NativeConditionVariable.broadcast(NativeConditionVariable* cond)
{
win32_EnterCriticalSection(&cond.waiters_count_lock);
bool have_waiters = cond.waiters_count > 0;
win32_LeaveCriticalSection(&cond.waiters_count_lock);
if (have_waiters && !win32_SetEvent(cond.event_all)) return ThreadFault.SIGNAL_FAILED!;
}
private fn void! timedwait(NativeConditionVariable* cond, NativeMutex* mtx, uint timeout)
{
win32_EnterCriticalSection(&cond.waiters_count_lock);
cond.waiters_count++;
win32_LeaveCriticalSection(&cond.waiters_count_lock);
mtx.unlock()?;
uint result = win32_WaitForMultipleObjects(2, &cond.events, false, timeout);
switch (result)
{
case WAIT_TIMEOUT:
mtx.lock()?;
return ThreadFault.WAIT_TIMEOUT!;
case WAIT_FAILED:
mtx.lock()?;
return ThreadFault.WAIT_FAILED!;
default:
break;
}
win32_EnterCriticalSection(&cond.waiters_count_lock);
cond.waiters_count--;
// If event all && no waiters
bool last_waiter = result == 1 && !cond.waiters_count;
win32_LeaveCriticalSection(&cond.waiters_count_lock);
if (last_waiter)
{
if (!win32_ResetEvent(cond.event_all))
{
mtx.lock()?;
return ThreadFault.WAIT_FAILED!;
}
}
mtx.lock()?;
}
fn void! NativeConditionVariable.wait(NativeConditionVariable* cond, NativeMutex* mtx) @inline
{
return timedwait(cond, mtx, INFINITE) @inline;
}
fn void! NativeConditionVariable.wait_timeout(NativeConditionVariable* cond, NativeMutex* mtx, uint time) @inline
{
return timedwait(cond, mtx, time) @inline;
}
fn void! NativeThread.create(NativeThread* thread, ThreadFn func, void* args)
{
if (!(*thread = win32_CreateThread(null, 0, func, args, 0, null))) return ThreadFault.INIT_FAILED!;
}
fn void! NativeThread.detach(NativeThread thread) @inline
{
if (!win32_CloseHandle(thread)) return ThreadFault.DETACH_FAILED!;
}
fn void native_thread_exit(int result) @inline
{
win32_ExitThread((uint)result);
}
fn void native_thread_yield()
{
win32_Sleep(0);
}
fn void NativeOnceFlag.call_once(NativeOnceFlag* flag, OnceFn func)
{
while (@volatile_load(flag.status) < 3)
{
switch (@volatile_load(flag.status))
{
case 0:
if (mem::compare_exchange_volatile(&flag.status, 1, 0, AtomicOrdering.SEQ_CONSISTENT, AtomicOrdering.SEQ_CONSISTENT) == 0)
{
win32_InitializeCriticalSection(&flag.lock);
win32_EnterCriticalSection(&flag.lock);
@volatile_store(flag.status, 2);
func();
@volatile_store(flag.status, 3);
win32_LeaveCriticalSection(&flag.lock);
return;
}
break;
case 1:
break;
case 2:
win32_EnterCriticalSection(&flag.lock);
win32_LeaveCriticalSection(&flag.lock);
break;
}
}
}
fn void! NativeThread.join(NativeThread thread, int *res)
{
if (win32_WaitForSingleObject(thread, INFINITE) == WAIT_FAILED) return ThreadFault.JOIN_FAILED!;
if (!win32_GetExitCodeThread(thread, (uint*)res)) return ThreadFault.JOIN_FAILED!;
defer win32_CloseHandle(thread);
}
fn NativeThread native_thread_current()
{
return win32_GetCurrentThread();
}
fn bool NativeThread.equals(NativeThread this, NativeThread other)
{
return win32_GetThreadId(this) == win32_GetThreadId(other);
}
/**
* @require ms < uint.max "Too long sleep"
**/
fn void! native_sleep_ms(ulong ms)
{
if (win32_SleepEx((uint)ms, true) == WAIT_IO_COMPLETION) return ThreadFault.INTERRUPTED!;
}
fn void! native_sleep(double s)
{
return native_sleep_ms((uint)s * 1000);
}
fn void! native_sleep_nano(ulong ns)
{
return native_sleep_ms(ns < 1000_000 ? 1 : ns / 1000_000);
}
$endif;

73
lib/std/threads/thread.c3 Normal file
View File

@@ -0,0 +1,73 @@
module std::thread;
enum ThreadModel
{
WIN32,
POSIX
}
const ThreadModel THREAD_MODEL = env::OS_TYPE == OsType.WIN32 ? ThreadModel.WIN32 : ThreadModel.POSIX;
define MutexType = distinct int;
const MutexType MUTEX_PLAIN = 0;
const MutexType MUTEX_TIMED = 1;
const MutexType MUTEX_RECURSIVE = 2;
define Mutex = distinct NativeMutex;
define ConditionVariable = distinct NativeConditionVariable;
define Thread = distinct NativeThread;
define OnceFlag = distinct NativeOnceFlag;
define OnceFn = fn void();
define ThreadFn = fn int(void* arg);
fault ThreadFault
{
INIT_FAILED,
DESTROY_FAILED,
LOCK_FAILED,
LOCK_TIMEOUT,
UNLOCK_FAILED,
SIGNAL_FAILED,
WAIT_FAILED,
WAIT_TIMEOUT,
DETACH_FAILED,
JOIN_FAILED,
INTERRUPTED,
}
macro void! Mutex.init(Mutex* mutex, MutexType type) = NativeMutex.init((NativeMutex*)mutex, type);
macro void! Mutex.destroy(Mutex* mutex) = NativeMutex.destroy((NativeMutex*)mutex);
macro void! Mutex.lock(Mutex* mutex) = NativeMutex.lock((NativeMutex*)mutex);
macro void! Mutex.lock_timeout(Mutex* mutex, ulong ms) = NativeMutex.lock((NativeMutex*)mutex, ms);
macro bool Mutex.try_lock(Mutex* mutex) = NativeMutex.try_lock((NativeMutex*)mutex);
macro bool Mutex.unlock(Mutex* mutex) = NativeMutex.unlock((NativeMutex*)mutex);
macro void! ConditionVariable.init(ConditionVariable* cond) = NativeConditionVariable.init((NativeConditionVariable*)cond);
macro void! ConditionVariable.destroy(ConditionVariable* cond) = NativeConditionVariable.destroy((NativeConditionVariable*)cond);
macro void! ConditionVariable.signal(ConditionVariable* cond) = NativeConditionVariable.signal((NativeConditionVariable*)cond);
macro void! ConditionVariable.broadcast(ConditionVariable* cond) = NativeConditionVariable.broadcast((NativeConditionVariable*)cond);
macro void! ConditionVariable.wait(ConditionVariable* cond, Mutex* mutex)
{
return NativeConditionVariable.wait((NativeConditionVariable*)cond, (NativeMutex*)mutex);
}
macro void! ConditionVariable.wait_timeout(ConditionVariable* cond, Mutex* mutex, ulong timeout)
{
return NativeConditionVariable.wait_timeout((NativeConditionVariable*)cond, (NativeMutex*)mutex, timeout);
}
macro void! Thread.create(Thread* thread, ThreadFn thread_fn, void* arg) = NativeThread.create((NativeThread*)thread, thread_fn, arg);
macro void! Thread.detach(Thread thread) = NativeThread.detach((NativeThread)thread);
macro int! Thread.join(Thread thread) = NativeThread.join((NativeThread)thread);
macro bool Thread.equals(Thread thread, Thread other) = NativeThread.equals((NativeThread)this, (NativeThread)other);
macro void OnceFlag.call_once(OnceFlag* flag, OnceFn func) = NativeOnceFlag.call_once((NativeOnceFlag*)flag, func);
macro void yield() = os::native_thread_yield();
macro Thread current() = os::native_thread_current();
macro void exit(int result) = os::native_thread_exit(result);
macro void! sleep(double s) @maydiscard = os::native_sleep(s);
macro void! sleep_ms(ulong ms) @maydiscard = os::native_sleep_ms(ms);
macro void! sleep_ns(ulong ns) @maydiscard = os::native_sleep_nano(ns);