CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
zipped_rdd.h
1#ifndef CPARK_ZIPPED_RDD_H
2#define CPARK_ZIPPED_RDD_H
3
4#include <iterator>
5#include <ranges>
6#include <vector>
7
8#include "base_rdd.h"
9#include "utils.h"
10
11namespace cpark {
12
26template <concepts::Rdd R1, concepts::Rdd R2>
27class ZippedRdd : public BaseRdd<ZippedRdd<R1, R2>> {
28public:
30 friend Base;
31
32 using V1 = utils::RddElementType<R1>;
33 using V2 = utils::RddElementType<R2>;
34
35 constexpr ZippedRdd(const R1& prev1, const R2& prev2) : Base{prev1, false} {
37 "Instance of ZippedRdd does not satisfy Rdd concept.");
38 if (std::ranges::distance(prev1) != std::ranges::distance(prev2)) {
39 throw std::runtime_error("R1 and R2 do not have the same number of splits.");
40 }
41
42 int cnt = 0;
43 for (const concepts::Split auto& prev_split_1 : prev1) {
44 auto targeted_split = prev2[cnt].begin();
45 std::function<std::pair<V1, V2>(const V1&)> func = [cnt,
46 targeted_split](const V1& x) mutable {
47 return std::make_pair(x, *(targeted_split++));
48 };
49 auto zippedView = prev_split_1 | std::views::transform(func);
50 splits_.emplace_back(zippedView, prev_split_1);
51 splits_.back().addDependency(prev_split_1);
52 cnt++;
53 }
54 }
55
56 constexpr ZippedRdd(const ZippedRdd&) = default;
57 ZippedRdd& operator=(const ZippedRdd&) = default;
58
59private:
60 constexpr auto beginImpl() const { return std::ranges::begin(splits_); }
61
62 constexpr auto endImpl() const { return std::ranges::end(splits_); }
63
64private:
65 using ZippedViewType =
66 decltype(std::declval<R1>().front() |
67 std::views::transform(std::declval<std::function<std::pair<V1, V2>(const V1&)>>()));
68 std::vector<ViewSplit<ZippedViewType>> splits_{};
69};
// end of t_Zip
71
72} // namespace cpark
73
74#endif // CPARK_TRANSFORMED_RDD_H
Definition base_rdd.h:522
Definition base_rdd.h:94
auto begin() const
Definition base_rdd.h:142
Definition zipped_rdd.h:27
Definition base_rdd.h:53
Definition base_rdd.h:44