CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
group_by_key_rdd.h
1#ifndef CPARK_GROUP_BY_KEY_RDD_H
2#define CPARK_GROUP_BY_KEY_RDD_H
3
4#include <concepts>
5#include <memory>
6#include <ranges>
7#include <shared_mutex>
8#include <mutex>
9
10#include "base_rdd.h"
11#include "utils.h"
12
13namespace cpark {
14
29template <concepts::KeyValueRdd R>
30 requires concepts::KeyValueRdd<R>
31class GroupByKeyRdd : public BaseRdd<GroupByKeyRdd<R>> {
32public:
34 friend Base;
35
36public:
37 using KeyType = utils::RddKeyType<R>;
38 using ValueType = utils::RddValueType<R>;
39
43 class Iterator : public std::forward_iterator_tag {
44 public:
45 using value_type =
46 std::pair<KeyType, std::ranges::subrange<std::ranges::iterator_t<std::vector<ValueType>>>>;
47 using difference_type = std::ptrdiff_t;
48
49 public:
55 explicit Iterator(
56 std::ranges::subrange<std::ranges::iterator_t<std::ranges::range_value_t<R>>> original_data,
57 bool is_end)
58 : original_data_{std::move(original_data)},
59 inner_data_mutex_{std::make_shared<std::shared_mutex>()},
60 is_end_{is_end} {}
61
66 explicit Iterator(bool is_end) : is_end_{is_end} {}
67
68 Iterator() = default;
69
70 value_type operator*() const {
71 waitOrEvaluate();
72 // Note that all operations to the shared data after any call of waitOrEvaluate() are read operations,
73 // so mutex is no longer needed.
74 checkOrInitInnerIterator();
75 return {inner_iterator_->first, std::ranges::subrange(inner_iterator_->second)};
76 }
77
78 Iterator& operator++() {
79 waitOrEvaluate();
80 // Note that all operations to the shared data after any call of waitOrEvaluate() are read operations,
81 // so mutex is no longer needed.
82 checkOrInitInnerIterator();
83 ++inner_iterator_;
84 return *this;
85 }
86
87 Iterator operator++(int) {
88 auto old = *this;
89 ++(*this);
90 return old;
91 }
92
97 bool operator==(const Iterator& other) const {
98 if (isEnd() && other.isEnd()) {
99 return true;
100 }
101 if (isEnd() || other.isEnd()) {
102 return false;
103 }
104 return inner_data_ == other.inner_data_ && inner_iterator_ == other.inner_iterator_;
105 }
106
107 bool operator!=(const Iterator& other) const { return !(*this == other); }
108
109 private:
111 void evaluate() const {
112 inner_data_ = std::make_shared<std::unordered_map<KeyType, std::vector<ValueType>>>();
113 for (const auto& [key, value] : original_data_) {
114 (*inner_data_)[key].push_back(value);
115 }
116 }
117
122 void waitOrEvaluate() const {
123 {
124 std::shared_lock the_shared_lock(*inner_data_mutex_);
125 if (inner_data_) {
126 return;
127 }
128 }
129 {
130 std::unique_lock the_unique_lock(*inner_data_mutex_);
131 if (inner_data_) {
132 return;
133 }
134 evaluate();
135 }
136 }
137
138 void checkOrInitInnerIterator() const {
139 if (!inner_iterator_init_) {
140 inner_iterator_ = std::ranges::begin(*inner_data_);
141 inner_iterator_init_ = true;
142 }
143 }
144
145 bool isEnd() const {
146 return is_end_ || (inner_iterator_init_ && inner_iterator_ == std::ranges::end(*inner_data_));
147 }
148
149 private:
150 // The shared state that stores the results of the group operation.
151 mutable std::shared_ptr<std::unordered_map<KeyType, std::vector<ValueType>>> inner_data_{
152 nullptr};
153 // An iterator pointing tho the current element in the shared state.
154 mutable std::ranges::iterator_t<std::unordered_map<KeyType, std::vector<ValueType>>>
155 inner_iterator_{};
156 // Whether `inner_iterator_` is initialized.
157 mutable bool inner_iterator_init_{false};
158 // The mutex to protect the shared state.
159 mutable std::shared_ptr<std::shared_mutex> inner_data_mutex_{nullptr};
160 std::ranges::subrange<std::ranges::iterator_t<std::ranges::range_value_t<R>>> original_data_;
161 bool is_end_{false};
162 };
163
164public:
166 explicit GroupByKeyRdd(const R& prev) : Base{prev} {
167 for (const cpark::concepts::Split auto& split : prev) {
168 splits_.emplace_back(std::ranges::subrange(Iterator{split, false}, Iterator{true}),
169 Base::context_);
170 splits_.back().addDependency(split);
171 }
172 }
173
174 GroupByKeyRdd(const GroupByKeyRdd&) = default;
175 GroupByKeyRdd& operator=(const GroupByKeyRdd&) = default;
176
177private:
179 constexpr auto beginImpl() const { return std::ranges::begin(splits_); }
180
182 constexpr auto endImpl() const { return std::ranges::end(splits_); }
183
184private:
185 std::vector<ViewSplit<std::ranges::subrange<Iterator>>> splits_;
186};
187
188// TODO: Helper classes to be added later.
189
// end of t_GroupByKey
191
192} // namespace cpark
193
194#endif // CPARK_GROUP_BY_KEY_RDD_H
Definition base_rdd.h:522
Definition base_rdd.h:94
Definition group_by_key_rdd.h:43
Iterator(std::ranges::subrange< std::ranges::iterator_t< std::ranges::range_value_t< R > > > original_data, bool is_end)
Definition group_by_key_rdd.h:55
bool operator==(const Iterator &other) const
Definition group_by_key_rdd.h:97
Iterator(bool is_end)
Definition group_by_key_rdd.h:66
Definition group_by_key_rdd.h:31
GroupByKeyRdd(const R &prev)
Definition group_by_key_rdd.h:166
Definition base_rdd.h:44