1#ifndef CPARK_BASE_RDD_H
2#define CPARK_BASE_RDD_H
21concept HasId =
requires(
const T& t) {
23 requires std::is_arithmetic_v<
decltype(t.id())>;
34 { t.dependencies() } -> std::ranges::input_range;
35 t.addDependency(t.id());
60 { r.beginImpl() } -> std::input_iterator;
68 { r.endImpl() } -> std::sentinel_for<
decltype(r.beginImpl())>;
72concept Pair =
requires(P p) {
73 typename P::first_type;
74 typename P::second_type;
77 requires std::same_as<
decltype(p.first),
typename P::first_type>;
78 requires std::same_as<
decltype(p.second),
typename P::second_type>;
93template <
typename DerivedSplit>
94class BaseSplit :
public std::ranges::view_interface<BaseSplit<DerivedSplit>> {
102 : context_{
context}, split_id_{context_->getAndIncSplitId()} {}
105 template <concepts::Split S>
107 : context_{
prev.context_}, split_id_{
prev.split_id_}, dependencies_{
prev.dependencies_} {}
110 template <concepts::Split S>
112 context_ =
prev.context_;
113 split_id_ =
prev.split_id_;
114 dependencies_ =
prev.dependencies_;
124 template <
typename T>
126 : context_{
other.context_} {
128 split_id_ =
other.split_id_;
133 dependencies_ =
other.dependencies_;
143 return static_cast<const DerivedSplit&
>(*this).beginImpl();
152 return static_cast<const DerivedSplit&
>(*this).endImpl();
160 return std::ranges::subrange(std::ranges::begin(dependencies_),
161 std::ranges::end(dependencies_));
169 context_->markDependency(split_id_,
split_id);
175 template <
typename T>
186 std::vector<ExecutionContext::SplitId> dependencies_{};
210template <
typename DerivedSplit,
typename DerivedSplitIterator>
213 template <
typename T,
typename U>
219 using ValueType = std::iter_value_t<DerivedSplitIterator>;
220 using CacheType = std::vector<ValueType>;
228 class Iterator :
public std::random_access_iterator_tag {
230 using difference_type = std::ptrdiff_t;
231 using value_type = ValueType;
232 using CacheIterator = std::ranges::iterator_t<const CacheType>;
241 static_assert(std::random_access_iterator<Iterator>,
242 "CachedSplit::Iterator does not satisfy random_access_iterator, please check "
243 "the implementation of the DerivedSplit.");
250 static_assert(std::random_access_iterator<Iterator>,
251 "CachedSplit::Iterator does not satisfy random_access_iterator, please check "
252 "the implementation of the DerivedSplit.");
257 value_type operator*()
const {
261 if (std::holds_alternative<CacheIterator>(iterator_)) [[
unlikely]] {
262 return *std::get<CacheIterator>(iterator_);
264 return *std::get<OriginalIterator>(iterator_);
270 if (std::holds_alternative<CacheIterator>(iterator_)) [[
unlikely]] {
271 ++std::get<CacheIterator>(iterator_);
273 ++std::get<OriginalIterator>(iterator_);
291 if (std::holds_alternative<CacheIterator>(iterator_) &&
292 std::holds_alternative<CacheIterator>(
other.iterator_)) [[
unlikely]] {
293 return std::get<CacheIterator>(iterator_) == std::get<CacheIterator>(
other.iterator_);
294 }
else if (std::holds_alternative<OriginalIterator>(iterator_) &&
295 std::holds_alternative<OriginalIterator>(
other.iterator_)) {
296 return std::get<OriginalIterator>(iterator_) == std::get<OriginalIterator>(
other.iterator_);
311 if (std::holds_alternative<CacheIterator>(iterator_)) [[
unlikely]] {
312 --std::get<CacheIterator>(iterator_);
314 if constexpr (std::bidirectional_iterator<OriginalIterator>) {
315 --std::get<OriginalIterator>(iterator_);
317 throw std::runtime_error(
"calculate-cache-on-miss not implemented yet");
340 if (std::holds_alternative<CacheIterator>(iterator_)) [[
unlikely]] {
341 std::get<CacheIterator>(iterator_) +=
n;
343 if constexpr (std::random_access_iterator<OriginalIterator>) {
344 std::get<OriginalIterator>(iterator_) +=
n;
346 throw std::runtime_error(
"Not implemented");
357 if (std::holds_alternative<CacheIterator>(iterator_)) [[
unlikely]] {
358 std::get<CacheIterator>(iterator_) -=
n;
360 if constexpr (std::random_access_iterator<OriginalIterator>) {
361 std::get<OriginalIterator>(iterator_) -=
n;
363 throw std::runtime_error(
"Not implemented");
378 Iterator operator-(
const difference_type&
n)
const {
384 value_type
operator[](
const difference_type&
n)
const {
return *(*
this +
n); }
391 if (std::holds_alternative<CacheIterator>(iterator_) &&
392 std::holds_alternative<CacheIterator>(
other.iterator_)) {
393 return std::get<CacheIterator>(iterator_) - std::get<CacheIterator>(
other.iterator_);
394 }
else if (std::holds_alternative<OriginalIterator>(iterator_) &&
395 std::holds_alternative<OriginalIterator>(
other.iterator_)) {
396 if constexpr (
requires(
const OriginalIterator&
a,
const OriginalIterator&
b) {
a -
b; }) {
397 return std::get<OriginalIterator>(iterator_) -
398 std::get<OriginalIterator>(
other.iterator_);
400 throw std::runtime_error(
"Not Implemented");
403 throw std::runtime_error(
"Bad Compare");
411 bool operator>(
const Iterator&
other)
const {
return *
this -
other > 0; }
413 bool operator<=(
const Iterator& other)
const {
return *
this - other <= 0; }
415 bool operator>=(
const Iterator& other)
const {
return *
this - other >= 0; }
420 std::variant<CacheIterator, OriginalIterator> iterator_;
424 explicit CachedSplit(ExecutionContext* context) : Base{context} {
425 static_assert(std::ranges::random_access_range<CachedSplit>,
426 "CachedSplit instance does not satisfy random_access_range, please check the "
437 template <
typename T,
typename U>
440 static_assert(std::ranges::random_access_range<CachedSplit>,
441 "CachedSplit instance does not satisfy random_access_range, please check the "
447 bool shouldCache()
const noexcept {
return Base::context_->splitShouldCache(Base::split_id_); }
450 bool hasCached() const noexcept {
return Base::context_->splitCached(Base::split_id_); }
453 const CacheType& getCache()
const {
454 return std::any_cast<const CacheType&>(Base::context_->getSplitCache(Base::split_id_));
457 void waitOrCalculate()
const {
458 auto future = Base::context_->template startCalculationOrGetFuture<CacheType>(
459 Base::split_id_,
static_cast<const DerivedSplit&
>(*this).beginImpl(),
460 static_cast<const DerivedSplit&
>(*this).endImpl());
465 auto beginImpl() const requires concepts::HasBeginImpl<DerivedSplit> {
466 if (shouldCache()) [[unlikely]] {
468 return Iterator{std::ranges::begin(getCache())};
470 return Iterator{
static_cast<const DerivedSplit&
>(*this).beginImpl()};
475 auto endImpl() const requires concepts::HasEndImpl<DerivedSplit> {
476 if (shouldCache()) [[unlikely]] {
478 return Iterator{std::ranges::end(getCache())};
480 return Iterator{
static_cast<const DerivedSplit&
>(*this).endImpl()};
489template <std::ranges::view V>
492 template <std::ranges::view T>
502 template <concepts::Split S>
506 auto beginImpl()
const {
return std::ranges::begin(view_); }
508 auto endImpl()
const {
return std::ranges::end(view_); }
521template <
typename DerivedRdd>
522class BaseRdd :
public std::ranges::view_interface<BaseRdd<DerivedRdd>> {
524 template <
typename T>
534 template <concepts::Rdd R>
536 : context_{
prev.context_}, splits_num_{
prev.splits_num_} {
538 rdd_id_ =
prev.rdd_id_;
545 template <concepts::Rdd R>
547 : context_{
prev.context_}, rdd_id_{
prev.rdd_id_}, splits_num_{
prev.splits_num_} {}
550 template <concepts::Rdd R>
552 context_ =
prev.context_;
553 rdd_id_ =
prev.rdd_id_;
554 splits_num_ =
prev.splits_num_;
565 rdd_id_{context_->getAndIncRddId()},
566 splits_num_{context_->getConfig().getParallelTaskNum()} {}
572 return static_cast<const DerivedRdd&
>(*this).beginImpl();
580 return static_cast<const DerivedRdd&
>(*this).endImpl();
589 size_t splits_num_{};
Definition base_rdd.h:522
ExecutionContext::RddId id() const noexcept
Definition base_rdd.h:584
BaseRdd(const BaseRdd< R > &prev, bool copy_id)
Definition base_rdd.h:535
auto end() const
Definition base_rdd.h:579
BaseRdd & operator=(const BaseRdd< R > &prev)
Definition base_rdd.h:551
BaseRdd(ExecutionContext *context)
Definition base_rdd.h:563
auto begin() const
Definition base_rdd.h:571
BaseRdd(const BaseRdd< R > &prev)
Definition base_rdd.h:546
auto end() const
Definition base_rdd.h:151
BaseSplit & operator=(const BaseSplit< S > &prev)
Definition base_rdd.h:111
BaseSplit(const BaseSplit< T > &other, bool copy_id, bool copy_dependencies)
Definition base_rdd.h:125
void addDependency(ExecutionContext::SplitId split_id)
Definition base_rdd.h:167
BaseSplit(const BaseSplit< S > &prev)
Definition base_rdd.h:106
void addDependency(const BaseSplit< T > &split)
Definition base_rdd.h:176
auto begin() const
Definition base_rdd.h:142
ExecutionContext::SplitId id() const noexcept
Definition base_rdd.h:181
auto dependencies() const noexcept
Definition base_rdd.h:159
Definition base_rdd.h:228
Iterator & operator--()
Definition base_rdd.h:310
Iterator(const CacheIterator &iterator)
Definition base_rdd.h:240
Iterator operator--(int)
Definition base_rdd.h:327
bool operator==(const Iterator &other) const
Definition base_rdd.h:290
Iterator & operator-=(const difference_type &n)
Definition base_rdd.h:356
Iterator(const OriginalIterator &iterator)
Definition base_rdd.h:249
friend Iterator operator+(const difference_type &n, const Iterator &iter)
Definition base_rdd.h:376
difference_type operator-(const Iterator &other) const
Definition base_rdd.h:390
Iterator operator++(int)
Definition base_rdd.h:279
value_type operator[](const difference_type &n) const
Definition base_rdd.h:384
Iterator & operator+=(const difference_type &n)
Definition base_rdd.h:339
Iterator & operator++()
Definition base_rdd.h:269
Iterator operator+(const difference_type &n) const
Definition base_rdd.h:370
Definition base_rdd.h:211
CachedSplit(const CachedSplit< T, U > &other, bool copy_id, bool copy_dependencies)
Definition base_rdd.h:438
SplitId getAndIncSplitId()
Definition cpark.h:179
RddId getAndIncRddId()
Definition cpark.h:176
uint32_t SplitId
Definition cpark.h:160
uint32_t RddId
Definition cpark.h:154
Definition base_rdd.h:490