diff --git a/lib/std/collections/ringbuffer.c3 b/lib/std/collections/ringbuffer.c3 index e60c79179..497e2991c 100644 --- a/lib/std/collections/ringbuffer.c3 +++ b/lib/std/collections/ringbuffer.c3 @@ -2,9 +2,9 @@ module collections::ringbuffer(); struct RingBuffer { - Type[SIZE] buf; - usz written; - usz head; + Type[SIZE] buf; + usz written; + usz head; } fn void RingBuffer.init(&self) @inline @@ -14,16 +14,16 @@ fn void RingBuffer.init(&self) @inline fn void RingBuffer.putc(&self, Type c) { - if (self.written < SIZE) - { - self.buf[self.written] = c; - self.written++; - } - else - { - self.buf[self.head] = c; - self.head = (self.head + 1) % SIZE; - } + if (self.written < SIZE) + { + self.buf[self.written] = c; + self.written++; + } + else + { + self.buf[self.head] = c; + self.head = (self.head + 1) % SIZE; + } } fn Type RingBuffer.getc(&self, usz index) @@ -32,15 +32,15 @@ fn Type RingBuffer.getc(&self, usz index) usz avail = SIZE - self.head; if (index < avail) { - return self.buf[self.head + index]; + return self.buf[self.head + index]; } - return self.buf[index - avail]; + return self.buf[index - avail]; } fn usz RingBuffer.get(&self, usz index, Type[] buffer) { index %= SIZE; - if (self.written < SIZE) + if (self.written < SIZE) { if (index >= self.written) return 0; usz end = self.written - index; diff --git a/lib/std/libc/os/darwin.c3 b/lib/std/libc/os/darwin.c3 index 938c8014a..9c70a3ca7 100644 --- a/lib/std/libc/os/darwin.c3 +++ b/lib/std/libc/os/darwin.c3 @@ -31,3 +31,5 @@ struct Stat } extern fn int stat(ZString str, Stat* stat) @extern("stat64"); + +extern fn CInt sysctl(CInt *name, CUInt namelen, void *oldp, usz *oldlenp, void *newp, usz newlen); diff --git a/lib/std/libc/os/linux.c3 b/lib/std/libc/os/linux.c3 index 617e0cf59..8564233bc 100644 --- a/lib/std/libc/os/linux.c3 +++ b/lib/std/libc/os/linux.c3 @@ -57,3 +57,6 @@ struct Stat @if(!env::X86_64) } extern fn CInt stat(ZString path, Stat* stat); + +extern fn CInt get_nprocs(); +extern fn CInt get_nprocs_conf(); diff --git a/lib/std/libc/os/win32.c3 b/lib/std/libc/os/win32.c3 index a358efa59..643b60928 100644 --- a/lib/std/libc/os/win32.c3 +++ b/lib/std/libc/os/win32.c3 @@ -21,6 +21,28 @@ extern fn CFile _wfreopen(WString, WString, CFile); extern fn CInt _write(Fd fd, void* buffer, CUInt count); extern fn CInt _wremove(WString); +struct SystemInfo +{ + union { + uint dwOemId; + struct { + ushort wProcessorArchitecture; + ushort wReserved; + } + } + uint dwPageSize; + void* lpMinimumApplicationAddress; + void* lpMaximumApplicationAddress; + usz dwActiveProcessorMask; + uint dwNumberOfProcessors; + uint dwProcessorType; + uint dwAllocationGranularity; + ushort wProcessorLevel; + ushort wProcessorRevision; +} + +extern fn CInt get_system_info(SystemInfo*) @extern("GetSystemInfo"); + // Aliases to simplify libc use macro Tm* localtime_r(Time_t* timer, Tm* buf) => _localtime64_s(buf, timer); macro CInt setjmp(JmpBuf* buffer) => _setjmp($$frameaddress(), buffer); diff --git a/lib/std/threads/os/cpu.c3 b/lib/std/threads/os/cpu.c3 new file mode 100644 index 000000000..d803dc827 --- /dev/null +++ b/lib/std/threads/os/cpu.c3 @@ -0,0 +1,69 @@ +// https://www.cprogramming.com/snippets/source-code/find-the-number-of-cpu-cores-for-windows-mac-or-linux + +module std::thread::cpu @if(env::DARWIN); +import libc; + +const CTL_UNSPEC = 0; /* unused */ +const CTL_KERN = 1; /* "high kernel": proc, limits */ +const CTL_VM = 2; /* virtual memory */ +const CTL_VFS = 3; /* file system, mount type is next */ +const CTL_NET = 4; /* network, see socket.h */ +const CTL_DEBUG = 5; /* debugging parameters */ +const CTL_HW = 6; /* generic cpu/io */ +const CTL_MACHDEP = 7; /* machine dependent */ +const CTL_USER = 8; /* user-level */ +const CTL_MAXID = 9; /* number of valid top-level ids */ + +const HW_MACHINE = 1; /* string: machine class */ +const HW_MODEL = 2; /* string: specific machine model */ +const HW_NCPU = 3; /* int: number of cpus */ +const HW_BYTEORDER = 4; /* int: machine byte order */ +const HW_PHYSMEM = 5; /* int: total memory */ +const HW_USERMEM = 6; /* int: non-kernel memory */ +const HW_PAGESIZE = 7; /* int: software page size */ +const HW_DISKNAMES = 8; /* strings: disk drive names */ +const HW_DISKSTATS = 9; /* struct: diskstats[] */ +const HW_EPOCH = 10; /* int: 0 for Legacy, else NewWorld */ +const HW_FLOATINGPT = 11; /* int: has HW floating point? */ +const HW_MACHINE_ARCH = 12; /* string: machine architecture */ +const HW_VECTORUNIT = 13; /* int: has HW vector unit? */ +const HW_BUS_FREQ = 14; /* int: Bus Frequency */ +const HW_CPU_FREQ = 15; /* int: CPU Frequency */ +const HW_CACHELINE = 16; /* int: Cache Line Size in Bytes */ +const HW_L1ICACHESIZE = 17; /* int: L1 I Cache Size in Bytes */ +const HW_L1DCACHESIZE = 18; /* int: L1 D Cache Size in Bytes */ +const HW_L2SETTINGS = 19; /* int: L2 Cache Settings */ +const HW_L2CACHESIZE = 20; /* int: L2 Cache Size in Bytes */ +const HW_L3SETTINGS = 21; /* int: L3 Cache Settings */ +const HW_L3CACHESIZE = 22; /* int: L3 Cache Size in Bytes */ +const HW_MAXID = 23; /* number of valid hw ids */ + +fn uint native_cpu() +{ + int[2] nm; + usz len = 4; + uint count; + + nm = { CTL_HW, HW_NCPU }; + libc::sysctl(&nm, 2, &count, &len, null, 0); + if (count < 1) count = 1; + return count; +} + +module std::thread::cpu @if(env::LINUX); +import libc; + +fn uint native_cpu() +{ + return libc::get_nprocs_conf(); +} + +module std::thread::cpu @if(env::WIN32); +import libc; + +fn uint native_cpu() +{ + SystemInfo info; + libc::get_system_info(&info); + return info.dwNumberOfProcessors; +} \ No newline at end of file diff --git a/lib/std/threads/os/thread_posix.c3 b/lib/std/threads/os/thread_posix.c3 index 02d64c8ab..d780aecae 100644 --- a/lib/std/threads/os/thread_posix.c3 +++ b/lib/std/threads/os/thread_posix.c3 @@ -150,7 +150,7 @@ fn void! NativeThread.create(&thread, ThreadFn thread_fn, void* arg) fn void! NativeThread.detach(thread) { - if (!posix::pthread_detach(thread)) return ThreadFault.DETACH_FAILED?; + if (posix::pthread_detach(thread)) return ThreadFault.DETACH_FAILED?; } fn void native_thread_exit(int result) diff --git a/lib/std/threads/pool.c3 b/lib/std/threads/pool.c3 new file mode 100644 index 000000000..17b8b20cd --- /dev/null +++ b/lib/std/threads/pool.c3 @@ -0,0 +1,137 @@ +module std::thread::pool(); +import std::thread; + +struct ThreadPool +{ + Mutex mu; + QueueItem[SIZE] queue; + usz qindex; + usz num_threads; + bitstruct : char { + bool initialized; + bool stop; + bool stop_now; + } + + Thread[SIZE] pool; + ConditionVariable notify; +} + +struct QueueItem +{ + ThreadFn func; + void* arg; +} + +/** + * @require !self.initialized "ThreadPool must not be already initialized" + **/ +fn void! ThreadPool.init(&self) +{ + defer catch @ok(self.destroy()); + *self = { .num_threads = SIZE, .initialized = true }; + self.mu.init()!; + self.notify.init()!; + foreach (&thread : self.pool) + { + thread.create(&process_work, self)!; + // The thread resources will be cleaned up when the thread exits. + thread.detach()!; + } +} + +/* + * Stop all the threads and cleanup the pool. + * Any pending work will be dropped. + */ +fn void! ThreadPool.destroy(&self) +{ + return self.@shutdown(stop_now); +} + +/* + * Stop all the threads and cleanup the pool. + * Any pending work will be processed. + */ +fn void! ThreadPool.stop_and_destroy(&self) +{ + return self.@shutdown(stop); +} + +macro void! ThreadPool.@shutdown(&self, #stop) @private +{ + if (self.initialized) + { + self.mu.lock()!; + self.#stop = true; + self.notify.broadcast()!; + self.mu.unlock()!; + // Wait for all threads to shutdown. + while (true) + { + self.mu.lock()!; + defer self.mu.unlock()!!; + if (self.num_threads == 0) + { + break; + } + self.notify.signal()!; + } + self.mu.destroy()!; + self.initialized = false; + } +} + +/* + * Push a new job to the pool. + * Returns whether the queue is full, in which case the job is ignored. + */ +fn void! ThreadPool.push(&self, ThreadFn func, void* arg) +{ + while (true) + { + self.mu.lock()!; + defer self.mu.unlock()!!; + if (self.qindex < SIZE) + { + self.queue[self.qindex] = { .func = func, .arg = arg }; + self.qindex++; + // Notify the threads that work is available. + self.notify.broadcast()!; + return; + } + } +} + +fn int process_work(void* arg) @private +{ + ThreadPool* self = arg; + while (true) + { + self.mu.lock()!!; + // Wait for work. + while (self.qindex == 0) + { + if (self.stop) + { + // Shutdown requested. + self.num_threads--; + self.mu.unlock()!!; + return 0; + } + self.notify.wait(&self.mu)!!; + if (self.stop_now) + { + // Shutdown requested. + self.num_threads--; + self.mu.unlock()!!; + return 0; + } + } + // Process the job. + self.qindex--; + QueueItem item = self.queue[self.qindex]; + self.mu.unlock()!!; + item.func(item.arg); + } +} \ No newline at end of file diff --git a/lib/std/threads/thread.c3 b/lib/std/threads/thread.c3 index f1d6c194b..402658b58 100644 --- a/lib/std/threads/thread.c3 +++ b/lib/std/threads/thread.c3 @@ -1,4 +1,5 @@ module std::thread; +import std::thread::cpu; import std::thread::os; def MutexType = distinct int; @@ -65,3 +66,4 @@ macro void! sleep(double s) @maydiscard => os::native_sleep(s); macro void! sleep_ms(ulong ms) @maydiscard => os::native_sleep_ms(ms); macro void! sleep_ns(ulong ns) @maydiscard => os::native_sleep_nano(ns); +macro uint num_cpu() => cpu::native_cpu(); \ No newline at end of file diff --git a/test/unit/stdlib/collections/ringbuffer.c3 b/test/unit/stdlib/collections/ringbuffer.c3 index c14ba71ee..88ee3cec1 100644 --- a/test/unit/stdlib/collections/ringbuffer.c3 +++ b/test/unit/stdlib/collections/ringbuffer.c3 @@ -6,21 +6,21 @@ def Buffer = RingBuffer(); fn void putc_getc() { - Buffer rb; - rb.init(); - rb.putc(1); - rb.putc(2); - rb.putc(3); - rb.putc(4); + Buffer rb; + rb.init(); + rb.putc(1); + rb.putc(2); + rb.putc(3); + rb.putc(4); - assert(rb.getc(0) == 1); - assert(rb.getc(1) == 2); - assert(rb.getc(2) == 3); - assert(rb.getc(3) == 4); + assert(rb.getc(0) == 1); + assert(rb.getc(1) == 2); + assert(rb.getc(2) == 3); + assert(rb.getc(3) == 4); - rb.putc(5); - assert(rb.getc(0) == 2); - assert(rb.getc(1) == 3); + rb.putc(5); + assert(rb.getc(0) == 2); + assert(rb.getc(1) == 3); assert(rb.getc(2) == 4); assert(rb.getc(3) == 5); } \ No newline at end of file diff --git a/test/unit/stdlib/threads/pool.c3 b/test/unit/stdlib/threads/pool.c3 new file mode 100644 index 000000000..ab72d7bf3 --- /dev/null +++ b/test/unit/stdlib/threads/pool.c3 @@ -0,0 +1,67 @@ +module thread_pool_test; +import std::io; +import std::thread; +import std::thread::pool; + +def Pool = ThreadPool(<4>); + +fn void! init_destroy() @test +{ + for (usz i = 0; i < 20; i++) + { + Pool pool; + pool.init()!; + pool.destroy()!; + } +} + +fn void! push_destroy() @test +{ + for (usz i = 0; i < 20; i++) + { + x = 0; + int y = 20; + Pool pool; + pool.init()!; + defer pool.destroy()!!; + work_done.lock()!!; + pool.push(&do_work, &y)!; + work_done.lock()!!; + assert(x == y, "%d: %d != %d", i, x, y); + work_done.unlock()!!; + } +} + +fn void! push_stop() @test +{ + for (usz i = 0; i < 20; i++) + { + x = 0; + int y = 20; + Pool pool; + pool.init()!; + work_done.lock()!!; + pool.push(&do_work, &y)!; + pool.stop_and_destroy()!!; + assert(x == y, "%d: %d != %d", i, x, y); + } +} + +int x; + +Mutex work_done; + +static initialize { + work_done.init()!!; +} + +static finalize { + work_done.destroy()!!; +} + +fn int do_work(void* arg) +{ + x = *(int*)arg; + work_done.unlock()!!; + return 0; +} \ No newline at end of file diff --git a/test/unit/stdlib/threads/simple_thread.c3 b/test/unit/stdlib/threads/simple_thread.c3 index 2d541420b..721397048 100644 --- a/test/unit/stdlib/threads/simple_thread.c3 +++ b/test/unit/stdlib/threads/simple_thread.c3 @@ -63,10 +63,7 @@ fn void! testrun_mutex_timeout() @test Mutex m; m.init()!; m.lock()!; - if (catch m.lock_timeout(100)) - { - } - else + if (try m.lock_timeout(100)) { assert(false, "lock_timeout should fail"); }