#pragma once #include "../Common/Sizes.hpp" #include "../ThreadRole.hpp" #include #include #include #include #include #include namespace MC::Compute { template class Queue { public: explicit Queue(UInt workers) : m_control(std::make_shared()) { for (UInt w = 0; w < workers; w++) { std::thread thread{[=]{ Queue::run_thread(m_control); }}; thread.detach(); } } void add(I id, Real priority, std::function execute) { std::scoped_lock work_lock(m_control->work.mutex); m_control->work.jobs.emplace(id, priority, execute); } USize size() const { std::scoped_lock work_lock(m_control->work.mutex); return m_control->work.jobs.size(); } struct Result { I id{}; X res; }; std::vector done() { std::vector done_results; std::scoped_lock result_lock(m_control->results.mutex); for (auto &r : m_control->results.results) { done_results.push_back(r); } m_control->results.results.clear(); return done_results; } struct Reassession { enum { Keep, Reassess, Cancel } type; Real new_priority = 0; }; template void reassess(F assess) { decltype(m_control->work.jobs) new_jobs{}; std::scoped_lock work_lock(m_control->work.mutex); while (!m_control->work.jobs.empty()) { // I don't like std::priority_queue :( Job job = m_control->work.jobs.top(); m_control->work.jobs.pop(); Reassession reassession = assess(job.id); switch (reassession.type) { case Reassession::Keep: new_jobs.push(job); break; case Reassession::Reassess: new_jobs.emplace(job.id, reassession.new_priority, job.execute); break; default: break; } } m_control->work.jobs = std::move(new_jobs); } private: struct Job { Job() = default; Job(I id, Real priority, std::function execute) : id(id), priority(priority), execute(execute) {} I id; Real priority; std::function execute; Bool operator>(const Job& other) const { return priority > other.priority; } }; struct Work { std::mutex mutex; std::priority_queue, std::greater> jobs; }; struct Results { std::mutex mutex; std::vector results; }; struct Control { Work work; Results results; }; [[noreturn]] static void run_thread(std::shared_ptr control) { using namespace std::chrono_literals; HELLO_I_AM(ThreadRole::Worker); while (true) { Bool nothing_to_do = true; Job job; { std::scoped_lock work_lock(control->work.mutex); if (!control->work.jobs.empty()) { job = control->work.jobs.top(); control->work.jobs.pop(); nothing_to_do = false; } } if (nothing_to_do) { std::this_thread::sleep_for(100ms); continue; } auto res = job.execute(); { std::scoped_lock result_lock(control->results.mutex); control->results.results.push_back({job.id, res}); } } } std::shared_ptr m_control; }; }