summary refs log tree commit diff
path: root/src/Compute/Queue.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Compute/Queue.hpp')
-rw-r--r--src/Compute/Queue.hpp96
1 files changed, 96 insertions, 0 deletions
diff --git a/src/Compute/Queue.hpp b/src/Compute/Queue.hpp
new file mode 100644
index 0000000..6aba1ab
--- /dev/null
+++ b/src/Compute/Queue.hpp
@@ -0,0 +1,96 @@
+#pragma once
+
+#include <functional>
+#include <sys/types.h>
+#include <mutex>
+#include <utility>
+#include <vector>
+#include <iostream>
+#include <thread>
+
+namespace MC::Compute {
+
+template <typename X, typename I = uint>
+class Queue {
+public:
+    explicit Queue(uint workers) : m_control(std::make_shared<Control>()) {
+        for (uint w = 0; w < workers; w++) {
+            std::thread thread{[=]() { Queue::run_thread(m_control); }};
+            thread.detach();
+        }
+    };
+
+    void add(I id, std::function<X()> execute) {
+        std::scoped_lock work_lock(m_control->work.mutex);
+        m_control->work.jobs.push_back({id, std::move(execute)});
+    }
+
+    struct Result {
+        I id{};
+        X res;
+    };
+
+    std::vector<Result> done() {
+        std::vector<Result> done_results;
+
+        std::scoped_lock result_lock(m_control->results.mutex);
+        for (auto r : m_control->results.results) {
+            done_results.push_back(r);
+        }
+        m_control->results.results.clear();
+        return done_results;
+    }
+
+private:
+    struct Job {
+        I id;
+        std::function<X()> execute;
+    };
+
+    struct Work {
+        std::mutex mutex;
+        std::vector<Job> jobs;
+    };
+
+    struct Results {
+        std::mutex mutex;
+        std::vector<Result> results;
+    };
+
+    struct Control {
+        Work work;
+        Results results;
+    };
+
+    [[noreturn]] static void run_thread(std::shared_ptr<Control> control) {
+        using namespace std::chrono_literals;
+
+        while (true) {
+            bool nothing_to_do = true;
+            Job job;
+            {
+                std::scoped_lock work_lock(control->work.mutex);
+                if (!control->work.jobs.empty()) {
+                    job = control->work.jobs.back();
+                    control->work.jobs.pop_back();
+                    nothing_to_do = false;
+                }
+            }
+
+            if (nothing_to_do) {
+                std::this_thread::sleep_for(100ms);
+                continue;
+            }
+
+            auto res = job.execute();
+            {
+                std::scoped_lock result_lock(control->results.mutex);
+                control->results.results.push_back({job.id, res});
+            }
+        }
+    }
+
+    std::shared_ptr<Control> m_control;
+};
+
+}