7#include <unordered_set>
38 std::future<void>
run(std::shared_ptr<TaskGraph> graph);
41 std::future<void>
run(F && f)
43 auto packed_f = [func = std::move(f),
this]()
mutable {
44 m_running_graphs += 1;
48 catch (
const std::exception & e) {
49 m_running_graphs -= 1;
50 m_running_graphs_cond.notify_all();
54 m_running_graphs -= 1;
55 m_running_graphs_cond.notify_all();
57 std::packaged_task<void()>
task(std::move(packed_f));
58 auto future =
task.get_future();
60 std::lock_guard<std::mutex> lock(m_queue_mutex);
61 m_work_queue.emplace_back(std::move(
task));
69 Executor(
unsigned const thread_count = std::thread::hardware_concurrency() - 2);
74 Module(std::vector<TaskNode *> & successors, std::size_t nodesToRun, Module * externalModule)
75 : sucessors(successors), nodesToRun(nodesToRun), externalModule(externalModule) {};
76 std::vector<TaskNode*> rootNodes;
77 std::vector<TaskNode*> sucessors;
78 std::atomic_int nodesToRun;
79 Module * externalModule;
84 info () : t(type::NONE) {}
85 info (info && other) =
default;
86 info & operator=(info && other) =
default;
87 info (
TaskNode * node, runningGraph * graph, Module * module)
88 : data(NodeInfo{graph, node, module})
92 info (
TaskNode * node, runningGraph * graph, Module * internalModule, Module * externalModule)
93 : data(GraphInfo{graph, node, internalModule})
97 info (std::packaged_task<
void()>
task)
100 data = AsyncInfo{std::move(
task)};
103 enum class type : uint16_t
124 std::packaged_task<void()>
task;
128 std::variant<AsyncInfo, NodeInfo, GraphInfo> data;
134 runningGraph(std::shared_ptr<TaskGraph> graph);
135 std::shared_ptr<TaskGraph> graph;
136 std::exception_ptr eptr =
nullptr;
137 std::promise<void> promise;
138 std::atomic_bool done =
false;
140 std::atomic_int graphDependencies;
141 std::unordered_set<TaskNode*> waitingNodes;
142 std::unordered_set<TaskNode*> runningNodes;
143 std::unordered_map<TaskNode*, std::atomic_int> nodeDependencies;
144 std::list<Module> modules;
148 std::unordered_map<TaskNode *, info> nodeInfos;
150 bool checkCycles(
const TaskGraph & graph)
const;
155 typedef uint64_t runningGraphId;
157 std::atomic_int m_running_graphs;
158 std::mutex m_running_graphs_mutex;
159 std::condition_variable m_running_graphs_cond;
161 std::deque<info> m_work_queue;
162 std::mutex m_queue_mutex;
164 std::atomic_bool m_done;
165 std::condition_variable m_cond;
166 std::vector<std::thread> m_threads;
172 void workerThread(
const int &
id);
173 void workerEndGraph(info::NodeInfo & node_info);
174 void workerExecNode(info::NodeInfo & node_info);
175 void workerExecAsync(info::AsyncInfo & async_info);
176 void workerExecGraphNode(info::GraphInfo & graph_info);
177 void workerEndNode(info::NodeInfo & node_info);
178 void workerEndGraphNode(info::GraphInfo & node_info);
179 void workerEndModule(info::NodeInfo & node_info);
180 void workerUpdateSuccessors(info::NodeInfo & node_info);
Definition: JoinThreads.hpp:8
Definition: Executor.hpp:21
void waitForAll()
Definition: Executor.cpp:54
static Executor & getInstance()
Definition: Executor.hpp:27
~Executor()
Definition: Executor.cpp:25
Executor & operator=(const Executor &)=delete
Executor(const Executor &)=delete
friend class TaskNode
Definition: Executor.hpp:22
Executor(Executor &&)=delete
std::future< void > run(F &&f)
Definition: Executor.hpp:41
std::future< void > run(std::shared_ptr< TaskGraph > graph)
Definition: Executor.cpp:33
Executor & operator=(Executor &&)=delete
Definition: TaskGraph.hpp:22
Definition: Executor.cpp:5
Definition: Executor.hpp:123
std::packaged_task< void()> task
Definition: Executor.hpp:124
Definition: Executor.hpp:111
TaskNode * node
Definition: Executor.hpp:113
runningGraph * graph
Definition: Executor.hpp:112
Module * internalModule
Definition: Executor.hpp:114
Definition: Executor.hpp:117
TaskNode * node
Definition: Executor.hpp:119
Module * module
Definition: Executor.hpp:120
runningGraph * graph
Definition: Executor.hpp:118