add ThreadPool (#926)

* lib/std/collections: fix tab indentation

Signed-off-by: Pierre Curto <pierre.curto@gmail.com>

* lib/std/threads: add ThreadPool

Signed-off-by: Pierre Curto <pierre.curto@gmail.com>

* ats/lib/threads: add num_cpu()

Signed-off-by: Pierre Curto <pierre.curto@gmail.com>

---------

Signed-off-by: Pierre Curto <pierre.curto@gmail.com>
This commit is contained in:
Pierre Curto
2023-08-14 15:33:51 +02:00
committed by GitHub
parent f912e53038
commit 65bea1cb2d
11 changed files with 333 additions and 34 deletions

View File

@@ -2,9 +2,9 @@ module collections::ringbuffer(<Type, SIZE>);
struct RingBuffer
{
Type[SIZE] buf;
usz written;
usz head;
Type[SIZE] buf;
usz written;
usz head;
}
fn void RingBuffer.init(&self) @inline
@@ -14,16 +14,16 @@ fn void RingBuffer.init(&self) @inline
fn void RingBuffer.putc(&self, Type c)
{
if (self.written < SIZE)
{
self.buf[self.written] = c;
self.written++;
}
else
{
self.buf[self.head] = c;
self.head = (self.head + 1) % SIZE;
}
if (self.written < SIZE)
{
self.buf[self.written] = c;
self.written++;
}
else
{
self.buf[self.head] = c;
self.head = (self.head + 1) % SIZE;
}
}
fn Type RingBuffer.getc(&self, usz index)
@@ -32,15 +32,15 @@ fn Type RingBuffer.getc(&self, usz index)
usz avail = SIZE - self.head;
if (index < avail)
{
return self.buf[self.head + index];
return self.buf[self.head + index];
}
return self.buf[index - avail];
return self.buf[index - avail];
}
fn usz RingBuffer.get(&self, usz index, Type[] buffer)
{
index %= SIZE;
if (self.written < SIZE)
if (self.written < SIZE)
{
if (index >= self.written) return 0;
usz end = self.written - index;

View File

@@ -31,3 +31,5 @@ struct Stat
}
extern fn int stat(ZString str, Stat* stat) @extern("stat64");
extern fn CInt sysctl(CInt *name, CUInt namelen, void *oldp, usz *oldlenp, void *newp, usz newlen);

View File

@@ -57,3 +57,6 @@ struct Stat @if(!env::X86_64)
}
extern fn CInt stat(ZString path, Stat* stat);
extern fn CInt get_nprocs();
extern fn CInt get_nprocs_conf();

View File

@@ -21,6 +21,28 @@ extern fn CFile _wfreopen(WString, WString, CFile);
extern fn CInt _write(Fd fd, void* buffer, CUInt count);
extern fn CInt _wremove(WString);
struct SystemInfo
{
union {
uint dwOemId;
struct {
ushort wProcessorArchitecture;
ushort wReserved;
}
}
uint dwPageSize;
void* lpMinimumApplicationAddress;
void* lpMaximumApplicationAddress;
usz dwActiveProcessorMask;
uint dwNumberOfProcessors;
uint dwProcessorType;
uint dwAllocationGranularity;
ushort wProcessorLevel;
ushort wProcessorRevision;
}
extern fn CInt get_system_info(SystemInfo*) @extern("GetSystemInfo");
// Aliases to simplify libc use
macro Tm* localtime_r(Time_t* timer, Tm* buf) => _localtime64_s(buf, timer);
macro CInt setjmp(JmpBuf* buffer) => _setjmp($$frameaddress(), buffer);

69
lib/std/threads/os/cpu.c3 Normal file
View File

@@ -0,0 +1,69 @@
// https://www.cprogramming.com/snippets/source-code/find-the-number-of-cpu-cores-for-windows-mac-or-linux
module std::thread::cpu @if(env::DARWIN);
import libc;
const CTL_UNSPEC = 0; /* unused */
const CTL_KERN = 1; /* "high kernel": proc, limits */
const CTL_VM = 2; /* virtual memory */
const CTL_VFS = 3; /* file system, mount type is next */
const CTL_NET = 4; /* network, see socket.h */
const CTL_DEBUG = 5; /* debugging parameters */
const CTL_HW = 6; /* generic cpu/io */
const CTL_MACHDEP = 7; /* machine dependent */
const CTL_USER = 8; /* user-level */
const CTL_MAXID = 9; /* number of valid top-level ids */
const HW_MACHINE = 1; /* string: machine class */
const HW_MODEL = 2; /* string: specific machine model */
const HW_NCPU = 3; /* int: number of cpus */
const HW_BYTEORDER = 4; /* int: machine byte order */
const HW_PHYSMEM = 5; /* int: total memory */
const HW_USERMEM = 6; /* int: non-kernel memory */
const HW_PAGESIZE = 7; /* int: software page size */
const HW_DISKNAMES = 8; /* strings: disk drive names */
const HW_DISKSTATS = 9; /* struct: diskstats[] */
const HW_EPOCH = 10; /* int: 0 for Legacy, else NewWorld */
const HW_FLOATINGPT = 11; /* int: has HW floating point? */
const HW_MACHINE_ARCH = 12; /* string: machine architecture */
const HW_VECTORUNIT = 13; /* int: has HW vector unit? */
const HW_BUS_FREQ = 14; /* int: Bus Frequency */
const HW_CPU_FREQ = 15; /* int: CPU Frequency */
const HW_CACHELINE = 16; /* int: Cache Line Size in Bytes */
const HW_L1ICACHESIZE = 17; /* int: L1 I Cache Size in Bytes */
const HW_L1DCACHESIZE = 18; /* int: L1 D Cache Size in Bytes */
const HW_L2SETTINGS = 19; /* int: L2 Cache Settings */
const HW_L2CACHESIZE = 20; /* int: L2 Cache Size in Bytes */
const HW_L3SETTINGS = 21; /* int: L3 Cache Settings */
const HW_L3CACHESIZE = 22; /* int: L3 Cache Size in Bytes */
const HW_MAXID = 23; /* number of valid hw ids */
fn uint native_cpu()
{
int[2] nm;
usz len = 4;
uint count;
nm = { CTL_HW, HW_NCPU };
libc::sysctl(&nm, 2, &count, &len, null, 0);
if (count < 1) count = 1;
return count;
}
module std::thread::cpu @if(env::LINUX);
import libc;
fn uint native_cpu()
{
return libc::get_nprocs_conf();
}
module std::thread::cpu @if(env::WIN32);
import libc;
fn uint native_cpu()
{
SystemInfo info;
libc::get_system_info(&info);
return info.dwNumberOfProcessors;
}

View File

@@ -150,7 +150,7 @@ fn void! NativeThread.create(&thread, ThreadFn thread_fn, void* arg)
fn void! NativeThread.detach(thread)
{
if (!posix::pthread_detach(thread)) return ThreadFault.DETACH_FAILED?;
if (posix::pthread_detach(thread)) return ThreadFault.DETACH_FAILED?;
}
fn void native_thread_exit(int result)

137
lib/std/threads/pool.c3 Normal file
View File

@@ -0,0 +1,137 @@
module std::thread::pool(<SIZE>);
import std::thread;
struct ThreadPool
{
Mutex mu;
QueueItem[SIZE] queue;
usz qindex;
usz num_threads;
bitstruct : char {
bool initialized;
bool stop;
bool stop_now;
}
Thread[SIZE] pool;
ConditionVariable notify;
}
struct QueueItem
{
ThreadFn func;
void* arg;
}
/**
* @require !self.initialized "ThreadPool must not be already initialized"
**/
fn void! ThreadPool.init(&self)
{
defer catch @ok(self.destroy());
*self = { .num_threads = SIZE, .initialized = true };
self.mu.init()!;
self.notify.init()!;
foreach (&thread : self.pool)
{
thread.create(&process_work, self)!;
// The thread resources will be cleaned up when the thread exits.
thread.detach()!;
}
}
/*
* Stop all the threads and cleanup the pool.
* Any pending work will be dropped.
*/
fn void! ThreadPool.destroy(&self)
{
return self.@shutdown(stop_now);
}
/*
* Stop all the threads and cleanup the pool.
* Any pending work will be processed.
*/
fn void! ThreadPool.stop_and_destroy(&self)
{
return self.@shutdown(stop);
}
macro void! ThreadPool.@shutdown(&self, #stop) @private
{
if (self.initialized)
{
self.mu.lock()!;
self.#stop = true;
self.notify.broadcast()!;
self.mu.unlock()!;
// Wait for all threads to shutdown.
while (true)
{
self.mu.lock()!;
defer self.mu.unlock()!!;
if (self.num_threads == 0)
{
break;
}
self.notify.signal()!;
}
self.mu.destroy()!;
self.initialized = false;
}
}
/*
* Push a new job to the pool.
* Returns whether the queue is full, in which case the job is ignored.
*/
fn void! ThreadPool.push(&self, ThreadFn func, void* arg)
{
while (true)
{
self.mu.lock()!;
defer self.mu.unlock()!!;
if (self.qindex < SIZE)
{
self.queue[self.qindex] = { .func = func, .arg = arg };
self.qindex++;
// Notify the threads that work is available.
self.notify.broadcast()!;
return;
}
}
}
fn int process_work(void* arg) @private
{
ThreadPool* self = arg;
while (true)
{
self.mu.lock()!!;
// Wait for work.
while (self.qindex == 0)
{
if (self.stop)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
return 0;
}
self.notify.wait(&self.mu)!!;
if (self.stop_now)
{
// Shutdown requested.
self.num_threads--;
self.mu.unlock()!!;
return 0;
}
}
// Process the job.
self.qindex--;
QueueItem item = self.queue[self.qindex];
self.mu.unlock()!!;
item.func(item.arg);
}
}

View File

@@ -1,4 +1,5 @@
module std::thread;
import std::thread::cpu;
import std::thread::os;
def MutexType = distinct int;
@@ -65,3 +66,4 @@ macro void! sleep(double s) @maydiscard => os::native_sleep(s);
macro void! sleep_ms(ulong ms) @maydiscard => os::native_sleep_ms(ms);
macro void! sleep_ns(ulong ns) @maydiscard => os::native_sleep_nano(ns);
macro uint num_cpu() => cpu::native_cpu();