diff options
Diffstat (limited to 'src/Compute/Queue.hpp')
| -rw-r--r-- | src/Compute/Queue.hpp | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/src/Compute/Queue.hpp b/src/Compute/Queue.hpp new file mode 100644 index 0000000..6aba1ab --- /dev/null +++ b/src/Compute/Queue.hpp @@ -0,0 +1,96 @@ +#pragma once + +#include <functional> +#include <sys/types.h> +#include <mutex> +#include <utility> +#include <vector> +#include <iostream> +#include <thread> + +namespace MC::Compute { + +template <typename X, typename I = uint> +class Queue { +public: + explicit Queue(uint workers) : m_control(std::make_shared<Control>()) { + for (uint w = 0; w < workers; w++) { + std::thread thread{[=]() { Queue::run_thread(m_control); }}; + thread.detach(); + } + }; + + void add(I id, std::function<X()> execute) { + std::scoped_lock work_lock(m_control->work.mutex); + m_control->work.jobs.push_back({id, std::move(execute)}); + } + + struct Result { + I id{}; + X res; + }; + + std::vector<Result> done() { + std::vector<Result> done_results; + + std::scoped_lock result_lock(m_control->results.mutex); + for (auto r : m_control->results.results) { + done_results.push_back(r); + } + m_control->results.results.clear(); + return done_results; + } + +private: + struct Job { + I id; + std::function<X()> execute; + }; + + struct Work { + std::mutex mutex; + std::vector<Job> jobs; + }; + + struct Results { + std::mutex mutex; + std::vector<Result> results; + }; + + struct Control { + Work work; + Results results; + }; + + [[noreturn]] static void run_thread(std::shared_ptr<Control> control) { + using namespace std::chrono_literals; + + while (true) { + bool nothing_to_do = true; + Job job; + { + std::scoped_lock work_lock(control->work.mutex); + if (!control->work.jobs.empty()) { + job = control->work.jobs.back(); + control->work.jobs.pop_back(); + nothing_to_do = false; + } + } + + if (nothing_to_do) { + std::this_thread::sleep_for(100ms); + continue; + } + + auto res = job.execute(); + { + std::scoped_lock result_lock(control->results.mutex); + control->results.results.push_back({job.id, res}); + } + } + } + + std::shared_ptr<Control> m_control; +}; + +} |
