1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
#pragma once
#include <functional>
#include <sys/types.h>
#include <mutex>
#include <utility>
#include <vector>
#include <queue>
#include <thread>
namespace MC::Compute {
template <typename X, typename I = uint>
class Queue {
public:
explicit Queue(uint workers) : m_control(std::make_shared<Control>()) {
for (uint w = 0; w < workers; w++) {
std::thread thread{[=]() { Queue::run_thread(m_control); }};
thread.detach();
}
};
void add(I id, float priority, std::function<X()> execute) {
std::scoped_lock work_lock(m_control->work.mutex);
m_control->work.jobs.emplace(id, priority, execute);
}
struct Result {
I id{};
X res;
};
std::vector<Result> done() {
std::vector<Result> done_results;
std::scoped_lock result_lock(m_control->results.mutex);
for (auto r : m_control->results.results) {
done_results.push_back(r);
}
m_control->results.results.clear();
return done_results;
}
private:
struct Job {
Job() = default;
Job(I id, float priority, std::function<X()> execute) : id(id), priority(priority), execute(execute) {}
I id;
float priority;
std::function<X()> execute;
bool operator>(const Job& other) const { return priority > other.priority; }
};
struct Work {
std::mutex mutex;
std::priority_queue<Job, std::vector<Job>, std::greater<Job>> jobs;
};
struct Results {
std::mutex mutex;
std::vector<Result> results;
};
struct Control {
Work work;
Results results;
};
[[noreturn]] static void run_thread(std::shared_ptr<Control> control) {
using namespace std::chrono_literals;
while (true) {
bool nothing_to_do = true;
Job job;
{
std::scoped_lock work_lock(control->work.mutex);
if (!control->work.jobs.empty()) {
job = control->work.jobs.top();
control->work.jobs.pop();
nothing_to_do = false;
}
}
if (nothing_to_do) {
std::this_thread::sleep_for(100ms);
continue;
}
auto res = job.execute();
{
std::scoped_lock result_lock(control->results.mutex);
control->results.results.push_back({job.id, res});
}
}
}
std::shared_ptr<Control> m_control;
};
}
|