VOX
A little voxel engine
Loading...
Searching...
No Matches
Executor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <vector>
4#include <queue>
5#include <deque>
6#include <thread>
7#include <unordered_set>
8#include <variant>
9#include "JoinThreads.hpp"
10#include "tasks.hpp"
11#include "Future.hpp"
12#include "Graph.hpp"
13#include "TaskGraph.hpp"
14#include "Tracy.hpp"
15#include "tracy_globals.hpp"
16
17namespace task
18{
19
21{
22 friend class TaskNode;
23 friend class TaskGraph;
24public:
25 ~Executor();
26
28 {
29 static Executor instance;
30 return instance;
31 }
32
33 Executor(const Executor &) = delete;
34 Executor & operator=(const Executor &) = delete;
35 Executor(Executor &&) = delete;
36 Executor & operator=(Executor &&) = delete;
37
38 std::future<void> run(std::shared_ptr<TaskGraph> graph);
39
40 template <typename F>
41 std::future<void> run(F && f)
42 {
43 auto packed_f = [func = std::move(f), this]() mutable {
44 m_running_graphs += 1;
45 try {
46 func();
47 }
48 catch (const std::exception & e) {
49 m_running_graphs -= 1;
50 m_running_graphs_cond.notify_all();
51
52 throw;
53 }
54 m_running_graphs -= 1;
55 m_running_graphs_cond.notify_all();
56 };
57 std::packaged_task<void()> task(std::move(packed_f));
58 auto future = task.get_future();
59 {
60 std::lock_guard<std::mutex> lock(m_queue_mutex);
61 m_work_queue.emplace_back(std::move(task));
62 }
63 m_cond.notify_one();
64 return future;
65 }
66
67 void waitForAll();
68private:
69 Executor(unsigned const thread_count = std::thread::hardware_concurrency() - 2);
70 struct runningGraph;
71 // struct Module;
72 struct Module
73 {
74 Module(std::vector<TaskNode *> & successors, std::size_t nodesToRun, Module * externalModule)
75 : sucessors(successors), nodesToRun(nodesToRun), externalModule(externalModule) {};
76 std::vector<TaskNode*> rootNodes;
77 std::vector<TaskNode*> sucessors;
78 std::atomic_int nodesToRun;
79 Module * externalModule; //some modules can be nested
80 };
81
82 struct info
83 {
84 info () : t(type::NONE) {}
85 info (info && other) = default;
86 info & operator=(info && other) = default;
87 info (TaskNode * node, runningGraph * graph, Module * module)
88 : data(NodeInfo{graph, node, module})
89 {
90 t = type::NODE;
91 }
92 info (TaskNode * node, runningGraph * graph, Module * internalModule, Module * externalModule)
93 : data(GraphInfo{graph, node, internalModule})
94 {
95 t = type::GRAPH;
96 }
97 info (std::packaged_task<void()> task)
98 {
99 t = type::ASYNC;
100 data = AsyncInfo{std::move(task)};
101 }
102 //maybe add a union and a type enum to enable other types of tasks to be queued
103 enum class type : uint16_t
104 {
105 NONE,
106 NODE,
107 GRAPH,
108 ASYNC
109 };
111 {
112 runningGraph * graph;
115 };
116 struct NodeInfo
117 {
118 runningGraph * graph;
120 Module * module = nullptr;
121 };
123 {
124 std::packaged_task<void()> task;
125 };
126
127 type t;
128 std::variant<AsyncInfo, NodeInfo, GraphInfo> data;
129 };
130
131 struct runningGraph
132 {
133
134 runningGraph(std::shared_ptr<TaskGraph> graph);
135 std::shared_ptr<TaskGraph> graph;
136 std::exception_ptr eptr = nullptr;
137 std::promise<void> promise;
138 std::atomic_bool done = false;
139 std::mutex mutex;
140 std::atomic_int graphDependencies;
141 std::unordered_set<TaskNode*> waitingNodes;
142 std::unordered_set<TaskNode*> runningNodes;
143 std::unordered_map<TaskNode*, std::atomic_int> nodeDependencies;
144 std::list<Module> modules;
145 // nodeStatus selfNodes;
146 // std::vector<nodeStatus> moduleNodes;
147
148 std::unordered_map<TaskNode *, info> nodeInfos;
149 private:
150 bool checkCycles(const TaskGraph & graph) const;
151 };
152
153
154
155 typedef uint64_t runningGraphId;
156
157 std::atomic_int m_running_graphs;
158 std::mutex m_running_graphs_mutex;
159 std::condition_variable m_running_graphs_cond;
160
161 std::deque<info> m_work_queue;
162 std::mutex m_queue_mutex;
163
164 std::atomic_bool m_done;
165 std::condition_variable m_cond;
166 std::vector<std::thread> m_threads;
167 JoinThreads m_joiner;
168
169 /*****************************************\
170 * WORKER THREADS
171 \*****************************************/
172 void workerThread(const int & id);
173 void workerEndGraph(info::NodeInfo & node_info);
174 void workerExecNode(info::NodeInfo & node_info);
175 void workerExecAsync(info::AsyncInfo & async_info);
176 void workerExecGraphNode(info::GraphInfo & graph_info);
177 void workerEndNode(info::NodeInfo & node_info);
178 void workerEndGraphNode(info::GraphInfo & node_info);
179 void workerEndModule(info::NodeInfo & node_info);
180 void workerUpdateSuccessors(info::NodeInfo & node_info);
181};
182
183}
Definition: JoinThreads.hpp:8
Definition: Executor.hpp:21
void waitForAll()
Definition: Executor.cpp:54
static Executor & getInstance()
Definition: Executor.hpp:27
~Executor()
Definition: Executor.cpp:25
Executor & operator=(const Executor &)=delete
Executor(const Executor &)=delete
friend class TaskNode
Definition: Executor.hpp:22
Executor(Executor &&)=delete
std::future< void > run(F &&f)
Definition: Executor.hpp:41
std::future< void > run(std::shared_ptr< TaskGraph > graph)
Definition: Executor.cpp:33
Executor & operator=(Executor &&)=delete
Definition: TaskGraph.hpp:22
Definition: Node.hpp:16
Definition: Executor.cpp:5
Definition: Executor.hpp:123
std::packaged_task< void()> task
Definition: Executor.hpp:124
Definition: Executor.hpp:111
TaskNode * node
Definition: Executor.hpp:113
runningGraph * graph
Definition: Executor.hpp:112
Module * internalModule
Definition: Executor.hpp:114
Definition: Executor.hpp:117
TaskNode * node
Definition: Executor.hpp:119
Module * module
Definition: Executor.hpp:120
runningGraph * graph
Definition: Executor.hpp:118