CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
partition_by_rdd.h
1#ifndef CPARK_PARTITION_BY_RDD_H
2#define CPARK_PARTITION_BY_RDD_H
3
4#include <concepts>
5#include <ranges>
6
7#include "base_rdd.h"
8#include "filter_rdd.h"
9#include "merged_view.h"
10#include "utils.h"
11
12namespace cpark {
13
29template <concepts::KeyValueRdd R, typename Partitioner = std::hash<utils::RddKeyType<R>>>
30class PartitionByRdd : public BaseRdd<PartitionByRdd<R, Partitioner>> {
31public:
33 friend Base;
34
35public:
40 : Base{prev}, partitioner_{std::move(partitioner)} {
41 for (size_t i : std::views::iota(0u, Base::splits_num_)) {
42 auto split_view =
43 FilterView(MergedSameView(prev), PartitionerHelper{i, Base::splits_num_, &partitioner_});
44 splits_.emplace_back(std::move(split_view), Base::context_);
45 for (const concepts::Split auto split : prev) {
46 splits_.back().addDependency(split);
47 }
48 }
49 }
50
56
58 PartitionByRdd(const PartitionByRdd&) = default;
59 PartitionByRdd& operator=(const PartitionByRdd&) = default;
60
61private:
63 constexpr auto beginImpl() const { return std::ranges::begin(splits_); }
64
66 constexpr auto endImpl() const { return std::ranges::end(splits_); }
67
68private:
73 class PartitionerHelper {
74 public:
81 PartitionerHelper(size_t split_index, size_t splits_num, Partitioner* partitioner)
82 : split_index_{split_index}, splits_num_{splits_num}, partitioner_{partitioner} {}
83
87 bool operator()(const utils::RddElementType<R>& x) const {
88 return (*partitioner_)(x.first) % splits_num_ == split_index_;
89 }
90
91 private:
92 size_t split_index_, splits_num_;
93 Partitioner* partitioner_;
94 };
95
96private:
97 std::vector<ViewSplit<FilterView<MergedSameView<R>, PartitionerHelper>>> splits_;
98 Partitioner partitioner_;
99};
100
101template <typename... Args>
103
107template <typename Partitioner>
109public:
110 explicit PartitionBy(Partitioner partitioner) : partitioner_{std::move(partitioner)} {}
111
112 template <concepts::Rdd R, typename K = utils::RddKeyType<R>>
113 requires std::invocable<Partitioner, K> &&
114 std::convertible_to<std::invoke_result_t<Partitioner, K>, size_t>
115 auto operator()(const R& r) const {
116 return PartitionByRdd(r, partitioner_);
117 }
118
119private:
120 Partitioner partitioner_;
121};
122
126template <>
127class PartitionBy<> {
128public:
129 template <concepts::Rdd R>
130 auto operator()(const R& r) const {
131 return PartitionByRdd(r);
132 }
133};
134
136template <cpark::concepts::Rdd R, typename P>
137auto operator|(const R& r, const P& p) {
138 return p(r);
139}
140
// end of t_PartitionBy
142
143} // namespace cpark
144
145#endif // CPARK_PARTITION_BY_RDD_H
Definition base_rdd.h:522
Definition base_rdd.h:94
Definition filter_rdd.h:28
Definition merged_view.h:22
Definition partition_by_rdd.h:102
Definition partition_by_rdd.h:30
PartitionByRdd(const PartitionByRdd &)=default
PartitionByRdd(const R &prev)
Definition partition_by_rdd.h:55
PartitionByRdd(const R &prev, Partitioner partitioner)
Definition partition_by_rdd.h:39
Definition base_rdd.h:44
auto operator|(const R &r, const Collect &collect)
Definition collect.h:48