CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
base_rdd.h
1#ifndef CPARK_BASE_RDD_H
2#define CPARK_BASE_RDD_H
3
4#include <concepts>
5#include <ranges>
6#include <utility>
7#include <variant>
8
9#include "cpark.h"
10
11namespace cpark {
12
13namespace concepts {
14
20template <typename T>
21concept HasId = requires(const T& t) {
22 t.id();
23 requires std::is_arithmetic_v<decltype(t.id())>;
24};
25
32template <typename T>
33concept HasDependency = concepts::HasId<T> && requires(T & t) {
34 { t.dependencies() } -> std::ranges::input_range;
35 t.addDependency(t.id());
36};
37
43template <typename S>
44concept Split = std::ranges::input_range<S> && concepts::HasDependency<S>;
45
52template <typename R>
53concept Rdd = std::ranges::input_range<R> && concepts::Split<std::ranges::range_value_t<R>>;
54
58template <typename R>
59concept HasBeginImpl = requires(const R& r) {
60 { r.beginImpl() } -> std::input_iterator;
61};
62
66template <typename R>
67concept HasEndImpl = requires(const R& r) {
68 { r.endImpl() } -> std::sentinel_for<decltype(r.beginImpl())>;
69};
70
71template <class P>
72concept Pair = requires(P p) {
73 typename P::first_type;
74 typename P::second_type;
75 p.first;
76 p.second;
77 requires std::same_as<decltype(p.first), typename P::first_type>;
78 requires std::same_as<decltype(p.second), typename P::second_type>;
79};
80
81template <typename R>
83
84} // namespace concepts
85
93template <typename DerivedSplit>
94class BaseSplit : public std::ranges::view_interface<BaseSplit<DerivedSplit>> {
95public:
96 template <typename T>
97 friend class BaseSplit;
98
99public:
102 : context_{context}, split_id_{context_->getAndIncSplitId()} {}
103
105 template <concepts::Split S>
106 explicit BaseSplit(const BaseSplit<S>& prev)
107 : context_{prev.context_}, split_id_{prev.split_id_}, dependencies_{prev.dependencies_} {}
108
110 template <concepts::Split S>
112 context_ = prev.context_;
113 split_id_ = prev.split_id_;
114 dependencies_ = prev.dependencies_;
115 }
116
124 template <typename T>
126 : context_{other.context_} {
127 if (copy_id) {
128 split_id_ = other.split_id_;
129 } else {
130 split_id_ = context_->getAndIncSplitId();
131 }
132 if (copy_dependencies) {
133 dependencies_ = other.dependencies_;
134 }
135 }
136
142 auto begin() const requires concepts::HasBeginImpl<DerivedSplit> {
143 return static_cast<const DerivedSplit&>(*this).beginImpl();
144 }
145
151 auto end() const requires concepts::HasEndImpl<DerivedSplit> {
152 return static_cast<const DerivedSplit&>(*this).endImpl();
153 }
154
160 return std::ranges::subrange(std::ranges::begin(dependencies_),
161 std::ranges::end(dependencies_));
162 }
163
168 dependencies_.push_back(split_id);
169 context_->markDependency(split_id_, split_id);
170 }
171
175 template <typename T>
177 addDependency(split.split_id_);
178 }
179
182
183protected:
184 ExecutionContext* context_{};
185 ExecutionContext::SplitId split_id_{};
186 std::vector<ExecutionContext::SplitId> dependencies_{};
187};
188
210template <typename DerivedSplit, typename DerivedSplitIterator>
211class CachedSplit : public BaseSplit<CachedSplit<DerivedSplit, DerivedSplitIterator>> {
212public:
213 template <typename T, typename U>
214 friend class CachedSplit;
215
216public:
218 friend Base;
219 using ValueType = std::iter_value_t<DerivedSplitIterator>;
220 using CacheType = std::vector<ValueType>;
221
228 class Iterator : public std::random_access_iterator_tag {
229 public:
230 using difference_type = std::ptrdiff_t;
231 using value_type = ValueType;
232 using CacheIterator = std::ranges::iterator_t<const CacheType>;
233 using OriginalIterator = DerivedSplitIterator;
234
235 Iterator() = default;
236
240 explicit Iterator(const CacheIterator& iterator) : iterator_{iterator} {
241 static_assert(std::random_access_iterator<Iterator>,
242 "CachedSplit::Iterator does not satisfy random_access_iterator, please check "
243 "the implementation of the DerivedSplit.");
244 }
245
249 explicit Iterator(const OriginalIterator& iterator) : iterator_{iterator} {
250 static_assert(std::random_access_iterator<Iterator>,
251 "CachedSplit::Iterator does not satisfy random_access_iterator, please check "
252 "the implementation of the DerivedSplit.");
253 }
254
255 // Member functions to implement a forward_iterator.
256
257 value_type operator*() const {
258 // Read the value pointed by the actual iterator inside this class.
259 // First check which type of iterator is actually held by `iterator_`,
260 // then get the right type from it and read the value.
261 if (std::holds_alternative<CacheIterator>(iterator_)) [[unlikely]] {
262 return *std::get<CacheIterator>(iterator_);
263 } else {
264 return *std::get<OriginalIterator>(iterator_);
265 }
266 }
267
270 if (std::holds_alternative<CacheIterator>(iterator_)) [[unlikely]] {
271 ++std::get<CacheIterator>(iterator_);
272 } else {
273 ++std::get<OriginalIterator>(iterator_);
274 }
275 return *this;
276 }
277
280 auto old = *this;
281 ++(*this);
282 return old;
283 }
284
290 bool operator==(const Iterator& other) const {
291 if (std::holds_alternative<CacheIterator>(iterator_) &&
292 std::holds_alternative<CacheIterator>(other.iterator_)) [[unlikely]] {
293 return std::get<CacheIterator>(iterator_) == std::get<CacheIterator>(other.iterator_);
294 } else if (std::holds_alternative<OriginalIterator>(iterator_) &&
295 std::holds_alternative<OriginalIterator>(other.iterator_)) {
296 return std::get<OriginalIterator>(iterator_) == std::get<OriginalIterator>(other.iterator_);
297 } else [[unlikely]] {
298 return false;
299 }
300 }
301
302 bool operator!=(const Iterator& other) const { return !(*this == other); }
303
304 // Member functions to implement bidirectional_iterator.
305
311 if (std::holds_alternative<CacheIterator>(iterator_)) [[unlikely]] {
312 --std::get<CacheIterator>(iterator_);
313 } else {
314 if constexpr (std::bidirectional_iterator<OriginalIterator>) {
315 --std::get<OriginalIterator>(iterator_);
316 } else {
317 throw std::runtime_error("calculate-cache-on-miss not implemented yet");
318 }
319 }
320 return *this;
321 }
322
328 auto old = *this;
329 ++(*this);
330 return old;
331 }
332
333 // Member functions to implement random_access_iterator.
334
339 Iterator& operator+=(const difference_type& n) {
340 if (std::holds_alternative<CacheIterator>(iterator_)) [[unlikely]] {
341 std::get<CacheIterator>(iterator_) += n;
342 } else {
343 if constexpr (std::random_access_iterator<OriginalIterator>) {
344 std::get<OriginalIterator>(iterator_) += n;
345 } else {
346 throw std::runtime_error("Not implemented");
347 }
348 }
349 return *this;
350 }
351
356 Iterator& operator-=(const difference_type& n) {
357 if (std::holds_alternative<CacheIterator>(iterator_)) [[unlikely]] {
358 std::get<CacheIterator>(iterator_) -= n;
359 } else {
360 if constexpr (std::random_access_iterator<OriginalIterator>) {
361 std::get<OriginalIterator>(iterator_) -= n;
362 } else {
363 throw std::runtime_error("Not implemented");
364 }
365 }
366 return *this;
367 }
368
370 Iterator operator+(const difference_type& n) const {
371 auto res = *this;
372 return res += n;
373 }
374
376 friend Iterator operator+(const difference_type& n, const Iterator& iter) { return iter + n; }
377
378 Iterator operator-(const difference_type& n) const {
379 auto res = *this;
380 return res -= n;
381 }
382
384 value_type operator[](const difference_type& n) const { return *(*this + n); }
385
390 difference_type operator-(const Iterator& other) const {
391 if (std::holds_alternative<CacheIterator>(iterator_) &&
392 std::holds_alternative<CacheIterator>(other.iterator_)) {
393 return std::get<CacheIterator>(iterator_) - std::get<CacheIterator>(other.iterator_);
394 } else if (std::holds_alternative<OriginalIterator>(iterator_) &&
395 std::holds_alternative<OriginalIterator>(other.iterator_)) {
396 if constexpr (requires(const OriginalIterator& a, const OriginalIterator& b) { a - b; }) {
397 return std::get<OriginalIterator>(iterator_) -
398 std::get<OriginalIterator>(other.iterator_);
399 } else {
400 throw std::runtime_error("Not Implemented");
401 }
402 } else {
403 throw std::runtime_error("Bad Compare");
404 }
405 }
406
407 // Functions to implement totally_ordered.
408
409 bool operator<(const Iterator& other) const { return *this - other < 0; }
410
411 bool operator>(const Iterator& other) const { return *this - other > 0; }
412
413 bool operator<=(const Iterator& other) const { return *this - other <= 0; }
414
415 bool operator>=(const Iterator& other) const { return *this - other >= 0; }
416
417 private:
418 private:
419 // A variant holds either an iterator from the original split, or an iterator of the cache.
420 std::variant<CacheIterator, OriginalIterator> iterator_;
421 };
422
423public:
424 explicit CachedSplit(ExecutionContext* context) : Base{context} {
425 static_assert(std::ranges::random_access_range<CachedSplit>,
426 "CachedSplit instance does not satisfy random_access_range, please check the "
427 "DerivedSplit.");
428 }
429
437 template <typename T, typename U>
440 static_assert(std::ranges::random_access_range<CachedSplit>,
441 "CachedSplit instance does not satisfy random_access_range, please check the "
442 "DerivedSplit.");
443 }
444
445private:
447 bool shouldCache() const noexcept { return Base::context_->splitShouldCache(Base::split_id_); }
448
450 bool hasCached() const noexcept { return Base::context_->splitCached(Base::split_id_); }
451
453 const CacheType& getCache() const {
454 return std::any_cast<const CacheType&>(Base::context_->getSplitCache(Base::split_id_));
455 }
456
457 void waitOrCalculate() const {
458 auto future = Base::context_->template startCalculationOrGetFuture<CacheType>(
459 Base::split_id_, static_cast<const DerivedSplit&>(*this).beginImpl(),
460 static_cast<const DerivedSplit&>(*this).endImpl());
461 future.wait();
462 }
463
465 auto beginImpl() const requires concepts::HasBeginImpl<DerivedSplit> {
466 if (shouldCache()) [[unlikely]] {
467 waitOrCalculate();
468 return Iterator{std::ranges::begin(getCache())};
469 } else {
470 return Iterator{static_cast<const DerivedSplit&>(*this).beginImpl()};
471 }
472 }
473
475 auto endImpl() const requires concepts::HasEndImpl<DerivedSplit> {
476 if (shouldCache()) [[unlikely]] {
477 waitOrCalculate();
478 return Iterator{std::ranges::end(getCache())};
479 } else {
480 return Iterator{static_cast<const DerivedSplit&>(*this).endImpl()};
481 }
482 }
483};
484
489template <std::ranges::view V>
490class ViewSplit : public CachedSplit<ViewSplit<V>, std::ranges::iterator_t<const V>> {
491public:
492 template <std::ranges::view T>
493 friend class ViewSplit;
494
495public:
496 using Base = CachedSplit<ViewSplit<V>, std::ranges::iterator_t<const V>>;
497 friend Base;
498
499public:
501
502 template <concepts::Split S>
503 ViewSplit(V view, const S& prev) : Base{prev, false, false}, view_{view} {}
504
505private:
506 auto beginImpl() const { return std::ranges::begin(view_); }
507
508 auto endImpl() const { return std::ranges::end(view_); }
509
510private:
511 V view_;
512};
513
521template <typename DerivedRdd>
522class BaseRdd : public std::ranges::view_interface<BaseRdd<DerivedRdd>> {
523public:
524 template <typename T>
525 friend class BaseRdd;
526
527public:
534 template <concepts::Rdd R>
535 explicit BaseRdd(const BaseRdd<R>& prev, bool copy_id)
536 : context_{prev.context_}, splits_num_{prev.splits_num_} {
537 if (copy_id) {
538 rdd_id_ = prev.rdd_id_;
539 } else {
540 rdd_id_ = context_->getAndIncRddId();
541 }
542 }
543
545 template <concepts::Rdd R>
546 explicit BaseRdd(const BaseRdd<R>& prev)
547 : context_{prev.context_}, rdd_id_{prev.rdd_id_}, splits_num_{prev.splits_num_} {}
548
550 template <concepts::Rdd R>
552 context_ = prev.context_;
553 rdd_id_ = prev.rdd_id_;
554 splits_num_ = prev.splits_num_;
555 }
556
564 : context_{context},
565 rdd_id_{context_->getAndIncRddId()},
566 splits_num_{context_->getConfig().getParallelTaskNum()} {}
567
571 auto begin() const requires concepts::HasBeginImpl<DerivedRdd> {
572 return static_cast<const DerivedRdd&>(*this).beginImpl();
573 }
574
579 auto end() const requires concepts::HasEndImpl<DerivedRdd> {
580 return static_cast<const DerivedRdd&>(*this).endImpl();
581 }
582
585
586protected:
587 ExecutionContext* context_{};
588 ExecutionContext::RddId rdd_id_{};
589 size_t splits_num_{};
590};
591
592} // namespace cpark
593
594#endif // CPARK_BASE_RDD_H
Definition base_rdd.h:522
ExecutionContext::RddId id() const noexcept
Definition base_rdd.h:584
BaseRdd(const BaseRdd< R > &prev, bool copy_id)
Definition base_rdd.h:535
auto end() const
Definition base_rdd.h:579
BaseRdd & operator=(const BaseRdd< R > &prev)
Definition base_rdd.h:551
BaseRdd(ExecutionContext *context)
Definition base_rdd.h:563
auto begin() const
Definition base_rdd.h:571
BaseRdd(const BaseRdd< R > &prev)
Definition base_rdd.h:546
Definition base_rdd.h:94
auto end() const
Definition base_rdd.h:151
BaseSplit & operator=(const BaseSplit< S > &prev)
Definition base_rdd.h:111
BaseSplit(const BaseSplit< T > &other, bool copy_id, bool copy_dependencies)
Definition base_rdd.h:125
void addDependency(ExecutionContext::SplitId split_id)
Definition base_rdd.h:167
BaseSplit(const BaseSplit< S > &prev)
Definition base_rdd.h:106
void addDependency(const BaseSplit< T > &split)
Definition base_rdd.h:176
auto begin() const
Definition base_rdd.h:142
ExecutionContext::SplitId id() const noexcept
Definition base_rdd.h:181
auto dependencies() const noexcept
Definition base_rdd.h:159
Definition base_rdd.h:228
Iterator & operator--()
Definition base_rdd.h:310
Iterator(const CacheIterator &iterator)
Definition base_rdd.h:240
Iterator operator--(int)
Definition base_rdd.h:327
bool operator==(const Iterator &other) const
Definition base_rdd.h:290
Iterator & operator-=(const difference_type &n)
Definition base_rdd.h:356
Iterator(const OriginalIterator &iterator)
Definition base_rdd.h:249
friend Iterator operator+(const difference_type &n, const Iterator &iter)
Definition base_rdd.h:376
difference_type operator-(const Iterator &other) const
Definition base_rdd.h:390
Iterator operator++(int)
Definition base_rdd.h:279
value_type operator[](const difference_type &n) const
Definition base_rdd.h:384
Iterator & operator+=(const difference_type &n)
Definition base_rdd.h:339
Iterator & operator++()
Definition base_rdd.h:269
Iterator operator+(const difference_type &n) const
Definition base_rdd.h:370
Definition base_rdd.h:211
CachedSplit(const CachedSplit< T, U > &other, bool copy_id, bool copy_dependencies)
Definition base_rdd.h:438
Definition cpark.h:148
SplitId getAndIncSplitId()
Definition cpark.h:179
RddId getAndIncRddId()
Definition cpark.h:176
uint32_t SplitId
Definition cpark.h:160
uint32_t RddId
Definition cpark.h:154
Definition base_rdd.h:490
Definition base_rdd.h:59
Definition base_rdd.h:33
Definition base_rdd.h:67
Definition base_rdd.h:21
Definition base_rdd.h:82
Definition base_rdd.h:72
Definition base_rdd.h:53
Definition base_rdd.h:44