CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
sample_rdd.h
1#ifndef CPARK_SAMPLE_RDD_H
2#define CPARK_SAMPLE_RDD_H
3
4#include <random>
5#include <vector>
6#include "base_rdd.h"
7#include "filter_rdd.h"
8#include "utils.h"
9namespace cpark {
10
26template <concepts::Rdd R>
27requires std::invocable<std::function<bool(int)>, utils::RddElementType<R>>&& std::is_convertible_v<
28 std::invoke_result_t<std::function<bool(int)>, utils::RddElementType<R>>, bool> class SampleRdd
29 : public BaseRdd<SampleRdd<R>> {
30public:
32 friend Base;
33
34public:
40 constexpr SampleRdd(const R& prev, double fraction) : Base{prev, false}, fraction_{fraction} {
41 static_assert(concepts::Rdd<SampleRdd<R>>,
42 "Instance of SampleRdd does not satisfy Rdd concept.");
43 // Create the sampled splits.
44 for (const concepts::Split auto& prev_split : prev) {
45 splits_.emplace_back(FilterView(prev_split, func), prev_split);
46 splits_.back().addDependency(prev_split);
47 }
48 };
49
50 // Explicitly define default copy constrictor and assignment operator,
51 // because some linters or compilers can not define implicit copy constructors for this class,
52 // though they are supposed to do so.
53 // TODO: find out why.
54 constexpr SampleRdd(const SampleRdd&) = default;
55 SampleRdd& operator=(const SampleRdd&) = default;
56
57private:
58 constexpr auto beginImpl() const { return std::ranges::begin(splits_); }
59
60 constexpr auto endImpl() const { return std::ranges::end(splits_); }
61
62private:
63 double fraction_;
64 std::function<bool(int)> func = [this](int i) {
65 std::random_device rd;
66 std::mt19937 gen(rd());
67 std::bernoulli_distribution d(fraction_);
68 return d(gen);
69 };
70 std::vector<ViewSplit<FilterView<std::ranges::range_value_t<R>, std::function<bool(int)>>>>
71 splits_{};
72};
73
77class Sample {
78public:
79 explicit Sample(double fraction) : fraction_{fraction} {}
80 template <concepts::Rdd R, typename T = utils::RddElementType<R>>
81 requires std::invocable<std::function<bool(int)>, T>&&
82 std::is_same_v<std::invoke_result_t<std::function<bool(int)>, T>, bool> auto
83 operator()(const R& r) const {
84 return SampleRdd(r, fraction_);
85 }
86
87private:
88 double fraction_;
89};
90
94template <concepts::Rdd R>
95auto operator|(const R& r, const Sample& sample) {
96 return sample(r);
97}
98
// end of t_Sample
100} // namespace cpark
101
102#endif //CPARK_SAMPLE_RDD_H
Definition base_rdd.h:522
Definition base_rdd.h:94
Definition filter_rdd.h:28
Definition sample_rdd.h:77
Definition sample_rdd.h:29
constexpr SampleRdd(const R &prev, double fraction)
Definition sample_rdd.h:40
Definition base_rdd.h:53
Definition base_rdd.h:44
auto operator|(const R &r, const Collect &collect)
Definition collect.h:48