CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
merge_rdd.h
1#ifndef CPARK_MERGE_RDD_H
2#define CPARK_MERGE_RDD_H
3
4#include <vector>
5
6#include "base_rdd.h"
7#include "utils.h"
8
9namespace cpark {
10
26template <concepts::Rdd R>
27class MergeRdd : public BaseRdd<MergeRdd<R>> {
28public:
30 friend Base;
31
32public:
33 class Iterator : std::forward_iterator_tag {
34 public:
35 using difference_type = std::ptrdiff_t;
36 // using iterator_category = std::random_access_iterator_tag;
37 using iterator_category = std::forward_iterator_tag;
38 using value_type = utils::RddElementType<R>;
39 using OriginalIterator = std::ranges::iterator_t<std::ranges::range_value_t<R>>;
40 using OriginalSentinel = std::ranges::sentinel_t<std::ranges::range_value_t<R>>;
41
42 Iterator() = default;
43
44 Iterator(std::vector<OriginalIterator> begins, std::vector<OriginalIterator> ends, bool isEnd = false)
45 : begins_(std::move(begins)), ends_(std::move(ends)), isEnd_(isEnd) {
46 if (!isEnd_) {
47 row_ = 0;
48 current_ = begins_.size() > 0 ? begins_[row_] : OriginalIterator();
49 }
50 else {
51 row_ = ends_.size() > 0 ? ends_.size() - 1 : 0;
52 current_ = ends_.size() > 0 ? ends_[row_] : OriginalIterator();
53 }
54 }
55
57 value_type operator*() const { return *current_; }
58
60 value_type* operator->() const { return current_; }
61
64 ++current_;
65 while (current_ == ends_[row_] && row_ + 1 < begins_.size()) {
66 ++row_;
67 current_ = begins_[row_];
68 }
69 return *this;
70 }
71
74 auto old = *this;
75 ++(*this);
76 return old;
77 }
78
79 // TODO: Consider extend to random_access_iterator. Still need -- operator.
80
82 Iterator& operator+=(difference_type n) {
83 // TODO: This does not have constant time complexity!
84 if (n >= 0)
85 while (n > 0) {
86 ++(*this);
87 --n;
88 }
89 else
90 while (n < 0) {
91 --(*this);
92 ++n;
93 }
94 return *this;
95 }
96
98 Iterator& operator-=(difference_type n) {
99 return *this += -n;
100 }
101
103 Iterator operator+(difference_type n) const {
104 Iterator temp = *this;
105 temp += n;
106 return temp;
107 }
108
110 Iterator operator-(difference_type n) const {
111 Iterator temp = *this;
112 temp -= n;
113 return temp;
114 }
115
117 value_type& operator[](difference_type n) const {
118 return *(*this + n);
119 }
120
122 bool operator==(const Iterator& other) const { return current_ == other.current_; }
123
125 bool operator!=(const Iterator& other) const { return !(*this == other); }
126
128 bool operator<(const Iterator& other) const {
129 return current_ < other.current_;
130 }
131
133 bool operator>(const Iterator& other) const {
134 return current_ > other.current_;
135 }
136
138 bool operator<=(const Iterator& other) const {
139 return current_ <= other.current_;
140 }
141
143 bool operator>=(const Iterator& other) const {
144 return current_ >= other.current_;
145 }
146
148 difference_type operator-(const Iterator& other) const {
149 return current_ - other.current_;
150 }
151
152 private:
153 std::vector<OriginalIterator> begins_;
154 std::vector<OriginalIterator> ends_;
155 OriginalIterator current_;
156 bool isEnd_;
157 size_t row_ = 0;
158 };
159
164 constexpr MergeRdd(const R& prev) : Base{prev, false} {
165 static_assert(concepts::Rdd<MergeRdd<R>>,
166 "Instance of MergeRdd does not satisfy Rdd concept.");
167 // Prepare nested splits vector
168 using OriginalIterator = std::ranges::iterator_t<std::ranges::range_value_t<R>>;
169 std::vector<OriginalIterator> all_prev_splits_begins;
170 std::vector<OriginalIterator> all_prev_splits_ends;
171 for (const concepts::Split auto& prev_split : prev) {
172 all_prev_splits_begins.emplace_back(std::ranges::begin(prev_split));
173 all_prev_splits_ends.emplace_back(std::ranges::end(prev_split));
174 }
175
176 // Create the single splits_ element
177 splits_.emplace_back(
178 std::ranges::subrange{
181 },
182 prev.front());
183 for (const concepts::Split auto& prev_split : prev)
184 splits_.back().addDependency(prev_split);
185 }
186
187 // Explicitly define default copy constrictor and assignment operator,
188 // because some linters or compilers can not define implicit copy constructors for this class,
189 // though they are supposed to do so.
190 // TODO: find out why.
191 constexpr MergeRdd(const MergeRdd&) = default;
192 MergeRdd& operator=(const MergeRdd&) = default;
193
194private:
195 constexpr auto beginImpl() const { return std::ranges::begin(splits_); }
196
197 constexpr auto endImpl() const { return std::ranges::end(splits_); }
198
199private:
200 std::vector<ViewSplit<std::ranges::subrange<Iterator>>> splits_{};
201};
202
206class Merge {
207public:
208 explicit Merge() = default;
209
210 template <concepts::Rdd R>
211 auto operator()(const R& r) const {
212 return MergeRdd(r);
213 }
214};
215
219template <concepts::Rdd R>
220auto operator|(const R& r, const Merge& merge) {
221 return merge(r);
222}
223
// end of t_Merge
231
232} // namespace cpark
233
234#endif //CPARK_MERGE_RDD_H
Definition base_rdd.h:522
Definition base_rdd.h:94
Definition merge_rdd.h:206
Definition merge_rdd.h:33
value_type * operator->() const
Definition merge_rdd.h:60
Iterator & operator++()
Definition merge_rdd.h:63
bool operator<(const Iterator &other) const
Definition merge_rdd.h:128
difference_type operator-(const Iterator &other) const
Definition merge_rdd.h:148
bool operator==(const Iterator &other) const
Definition merge_rdd.h:122
bool operator!=(const Iterator &other) const
Definition merge_rdd.h:125
bool operator>=(const Iterator &other) const
Definition merge_rdd.h:143
Iterator & operator-=(difference_type n)
Definition merge_rdd.h:98
bool operator<=(const Iterator &other) const
Definition merge_rdd.h:138
Iterator operator++(int)
Definition merge_rdd.h:73
Iterator & operator+=(difference_type n)
Definition merge_rdd.h:82
value_type operator*() const
Definition merge_rdd.h:57
Iterator operator-(difference_type n) const
Definition merge_rdd.h:110
Iterator operator+(difference_type n) const
Definition merge_rdd.h:103
value_type & operator[](difference_type n) const
Definition merge_rdd.h:117
bool operator>(const Iterator &other) const
Definition merge_rdd.h:133
Definition merge_rdd.h:27
constexpr MergeRdd(const R &prev)
Definition merge_rdd.h:164
Definition base_rdd.h:53
Definition base_rdd.h:44
auto operator|(const R &r, const Collect &collect)
Definition collect.h:48