Cppgres
Build Postgres extensions in C++
Loading...
Searching...
No Matches
threading.hpp
1#pragma once
2
3#include <future>
4#include <queue>
5#include <type_traits>
6
7#include "imports.h"
8
9#ifdef __linux__
10#include <sys/syscall.h>
11#ifndef _GNU_SOURCE
12#define _GNU_SOURCE 1
13#endif
14#include <unistd.h>
15#if defined(__GLIBC__) && (__GLIBC__ < 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ < 30))
16#include <sys/syscall.h>
17static inline pid_t gettid() { return static_cast<pid_t>(syscall(SYS_gettid)); }
18#endif
19#elif __APPLE__
20#include <pthread.h>
21#endif
22
23namespace cppgres {
24
25#if defined(__linux__)
26static inline bool is_main_thread() { return gettid() == getpid(); }
27#elif defined(__APPLE__)
28static inline bool is_main_thread() { return pthread_main_np() != 0; }
29#else
30#warning "is_main_thread() not implemented"
31static inline bool is_main_thread() { return false; }
32#endif
33
41struct worker {
42 worker() : done(false), terminated(false) {}
43
44 ~worker() { terminate(); }
45
46 void terminate() {
47 {
48 std::scoped_lock lock(mutex);
49 if (terminated)
50 return;
51 done = true;
52 cv.notify_one();
53 }
54 }
55
56 template <typename F, typename... Args>
57 auto post(F &&f, Args &&...args) -> std::future<std::invoke_result_t<F, Args...>> {
58 using ReturnType = std::invoke_result_t<F, Args...>;
59
60 auto task = std::make_shared<std::packaged_task<ReturnType()>>(
61 [f = std::move(f), ... args = std::move(args)]() { return f(args...); });
62 std::future<ReturnType> result = task->get_future();
63
64 {
65 std::scoped_lock lock(mutex);
66 tasks.emplace([task]() { (*task)(); });
67 }
68 cv.notify_one();
69 return result;
70 }
71
77 void run() {
78 if (!is_main_thread()) {
79 throw std::runtime_error("Worker can only run on main thread");
80 }
81 while (true) {
82 std::function<void()> task;
83 {
84 std::unique_lock<std::mutex> lock(mutex);
85 cv.wait(lock, [&]() { return done.load() || !tasks.empty(); });
86 if (done.load() && tasks.empty())
87 break;
88 task = std::move(tasks.front());
89 tasks.pop();
90 }
91 task();
92 }
93 terminated = true;
94 }
95
96private:
97 std::mutex mutex;
98 std::condition_variable cv;
99 std::queue<std::function<void()>> tasks;
100 std::atomic<bool> done;
101 std::atomic<bool> terminated;
102};
103
104} // namespace cppgres
Single-threaded Postgres workload worker.
Definition: threading.hpp:41
void run()
Run the worker.
Definition: threading.hpp:77