summary refs log tree commit diff
path: root/src/Compute/Queue.hpp
blob: 4035d5619b81511153d684705f7f71cff2dc2b3d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#pragma once

#include "../Common/Sizes.hpp"
#include <functional>
#include <mutex>
#include <vector>
#include <queue>
#include <thread>

namespace MC::Compute {

template <typename X, typename I = UInt>
class Queue {
public:
    explicit Queue(UInt workers) : m_control(std::make_shared<Control>()) {
        for (UInt w = 0; w < workers; w++) {
            std::thread thread{[=]{ Queue::run_thread(m_control); }};
            thread.detach();
        }
    }

    void add(I id, Real priority, std::function<X()> execute) {
        std::scoped_lock work_lock(m_control->work.mutex);
        m_control->work.jobs.emplace(id, priority, execute);
    }

    struct Result {
        I id{};
        X res;
    };

    std::vector<Result> done() {
        std::vector<Result> done_results;

        std::scoped_lock result_lock(m_control->results.mutex);
        for (auto &r : m_control->results.results) {
            done_results.push_back(r);
        }
        m_control->results.results.clear();
        return done_results;
    }

private:
    struct Job {
        Job() = default;
        Job(I id, Real priority, std::function<X()> execute) : id(id), priority(priority), execute(execute) {}

        I id;
        Real priority;
        std::function<X()> execute;

        Bool operator>(const Job& other) const { return priority > other.priority; }
    };

    struct Work {
        std::mutex mutex;
        std::priority_queue<Job, std::vector<Job>, std::greater<Job>> jobs;
    };

    struct Results {
        std::mutex mutex;
        std::vector<Result> results;
    };

    struct Control {
        Work work;
        Results results;
    };

    [[noreturn]] static void run_thread(std::shared_ptr<Control> control) {
        using namespace std::chrono_literals;

        while (true) {
            Bool nothing_to_do = true;
            Job job;
            {
                std::scoped_lock work_lock(control->work.mutex);
                if (!control->work.jobs.empty()) {
                    job = control->work.jobs.top();
                    control->work.jobs.pop();
                    nothing_to_do = false;
                }
            }

            if (nothing_to_do) {
                std::this_thread::sleep_for(100ms);
                continue;
            }

            auto res = job.execute();
            {
                std::scoped_lock result_lock(control->results.mutex);
                control->results.results.push_back({job.id, res});
            }
        }
    }

    std::shared_ptr<Control> m_control;
};

}