CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
cpark.h
1#ifndef CPARK_CPARK_H
2#define CPARK_CPARK_H
3
4#include <any>
5#include <future>
6#include <ostream>
7#include <queue>
8#include <set>
9#include <shared_mutex>
10#include <string>
11#include <thread>
12#include <unordered_map>
13#include <unordered_set>
14#include <vector>
15
16#include "utils.h"
17
30namespace cpark {
31
35class Config {
36public:
37 enum class ParallelPolicy {
38 Sequential,
39 Thread,
40#ifdef CPARK_DISTRIBUTED
42#endif // CPARK_DISTRIBUTED
43 };
44
45public:
49 [[nodiscard]] constexpr const std::string& getDebugName() const noexcept { return debug_name_; }
50
55 [[nodiscard]] constexpr size_t getParallelTaskNum() const noexcept { return parallel_task_num_; }
56
60 [[nodiscard]] constexpr ParallelPolicy getParallelPolicy() const noexcept {
61 return parallel_policy_;
62 }
63
65 [[nodiscard]] constexpr std::ostream* getLoggerPtr() const noexcept { return logger_; }
66
71 [[nodiscard]] constexpr std::ostream& getLoggerOrNullStream() const noexcept {
72 return logger_ ? *logger_ : utils::g_null_ostream;
73 }
74
76 Config& setDebugName(std::string name) noexcept {
77 debug_name_ = std::move(name);
78 return *this;
79 }
80
83 if (num == 0) {
84 unsigned int n = std::thread::hardware_concurrency();
85 if (n != 0) {
86 parallel_task_num_ = n;
87 }
88 } else {
89 parallel_task_num_ = num;
90 }
91 return *this;
92 }
93
95 Config& setParallelPolicy(ParallelPolicy policy) noexcept {
96 parallel_policy_ = policy;
97 return *this;
98 }
99
101 Config& setLogger(std::ostream* logger) noexcept {
102 logger_ = logger;
103 return *this;
104 }
105
106private:
107 std::string debug_name_{};
108 size_t parallel_task_num_{8};
109 ParallelPolicy parallel_policy_{ParallelPolicy::Thread};
110 std::ostream* logger_{nullptr}; // Should this be here?
111
112#ifdef CPARK_DISTRIBUTED
113public:
114 struct Address {
115 uint32_t ip_;
116 uint16_t port_;
117 };
118
119public:
120 Config& addWorker(const Address& address) {
121 workers_.push_back(address);
122 return *this;
123 }
124
125 Config& setMaster(const Address& address) noexcept {
126 master_ = address;
127 return *this;
128 }
129
130private:
131 std::vector<Address> workers_;
132 Address master_;
133#endif //CPARK_DISTRIBUTED
134};
135
149public:
155
161
162public:
164 ExecutionContext() = default;
165
167 explicit ExecutionContext(Config config) : config_{std::move(config)} {}
168
170 void setConfig(Config config) { config_ = std::move(config); }
171
173 const Config& getConfig() const noexcept { return config_; }
174
176 RddId getAndIncRddId() { return next_rdd_id_++; }
177
179 SplitId getAndIncSplitId() { return next_split_id_++; }
180
182 bool splitShouldCache(SplitId split_id) const noexcept {
183 std::shared_lock guard(cache_mutex_);
184 return dependent_by_.contains(split_id) && dependent_by_.at(split_id).size() >= 2;
185 }
186
188 bool splitCached(SplitId split_id) const noexcept {
189 std::shared_lock guard(cache_mutex_);
190 return cache_done_.contains(split_id) &&
191 cache_done_[split_id].wait_for(std::chrono::seconds(0)) == std::future_status::ready;
192 }
193
194 void markDependency(SplitId from, SplitId to) noexcept {
195 std::shared_lock guard(cache_mutex_);
196 dependent_by_[to].insert(from);
197 }
198
200 const std::any& getSplitCache(SplitId split_id) const {
201 std::shared_lock guard(cache_mutex_);
202 return cache_.at(split_id);
203 }
204
205 template <typename CacheType, typename OriginalIterator>
206 std::shared_future<void> startCalculationOrGetFuture(SplitId split_id, OriginalIterator begin,
207 OriginalIterator end) {
208 {
209 std::shared_lock guard(cache_mutex_);
210 if (cache_done_.contains(split_id)) {
211 return cache_done_[split_id];
212 }
213 }
214
215 std::promise<void> promise{};
216
217 {
218 std::unique_lock guard(cache_mutex_);
219 if (cache_done_.contains(split_id)) {
220 return cache_done_[split_id];
221 }
222 cache_done_[split_id] = promise.get_future();
223 }
224
225 CacheType cache;
226 std::copy(begin, end, std::back_inserter(cache));
227
228 {
229 std::unique_lock guard(cache_mutex_);
230 cache_[split_id] = std::move(cache);
231 promise.set_value();
232 return cache_done_[split_id];
233 }
234 }
235
236private:
237 Config config_{};
238
239 // Using them to create incremental unique id for Rdd and Split.
240 std::atomic<RddId> next_rdd_id_{};
241 std::atomic<SplitId> next_split_id_{};
242
243 // Which splits are relying on this one.
244 std::unordered_map<SplitId, std::unordered_set<SplitId>> dependent_by_{};
245
246 // Cache information for the Splits.
247 std::unordered_map<SplitId, std::any> cache_{};
248 mutable std::unordered_map<SplitId, std::shared_future<void>> cache_done_{};
249 mutable std::shared_mutex cache_mutex_{};
250
251 // Thread synchronization information.
252
253 // Scheduler.
254};
255
256} // namespace cpark
257
258#endif //CPARK_CPARK_H
Definition base_rdd.h:94
Definition cpark.h:35
constexpr std::ostream * getLoggerPtr() const noexcept
Definition cpark.h:65
Config & setParallelTaskNum(size_t num=0) noexcept
Definition cpark.h:82
Config & setParallelPolicy(ParallelPolicy policy) noexcept
Definition cpark.h:95
constexpr const std::string & getDebugName() const noexcept
Definition cpark.h:49
constexpr std::ostream & getLoggerOrNullStream() const noexcept
Definition cpark.h:71
constexpr size_t getParallelTaskNum() const noexcept
Definition cpark.h:55
Config & setDebugName(std::string name) noexcept
Definition cpark.h:76
Config & setLogger(std::ostream *logger) noexcept
Definition cpark.h:101
constexpr ParallelPolicy getParallelPolicy() const noexcept
Definition cpark.h:60
Definition cpark.h:148
SplitId getAndIncSplitId()
Definition cpark.h:179
const std::any & getSplitCache(SplitId split_id) const
Definition cpark.h:200
RddId getAndIncRddId()
Definition cpark.h:176
bool splitCached(SplitId split_id) const noexcept
Definition cpark.h:188
uint32_t SplitId
Definition cpark.h:160
void setConfig(Config config)
Definition cpark.h:170
ExecutionContext(Config config)
Definition cpark.h:167
uint32_t RddId
Definition cpark.h:154
bool splitShouldCache(SplitId split_id) const noexcept
Definition cpark.h:182
const Config & getConfig() const noexcept
Definition cpark.h:173