CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
filter_rdd.h
1#ifndef CPARK_FILTER_RDD_H
2#define CPARK_FILTER_RDD_H
3
4#include <vector>
5#include "base_rdd.h"
6#include "utils.h"
7
8namespace cpark {
9
25template <std::ranges::view V, typename Func>
26requires std::invocable<Func, std::ranges::range_value_t<V>>&& std::is_convertible_v<
27 std::invoke_result_t<Func, std::ranges::range_value_t<V>>, bool> class FilterView
28 : public std::ranges::view_interface<FilterView<V, Func>> {
29public:
30 using OriginalIterator = std::ranges::iterator_t<V>;
31 using OriginalSentinel = std::ranges::sentinel_t<V>;
32
39 class Iterator : std::forward_iterator_tag {
40 public:
41 using difference_type = std::ptrdiff_t;
42 using value_type = std::ranges::range_value_t<V>;
43
44 using OriginalIterator = std::ranges::iterator_t<V>;
45 using OriginalSentinel = std::ranges::sentinel_t<V>;
46
47 Iterator() = default;
48
50 Iterator(OriginalIterator iterator, OriginalSentinel sentinel, Func* func)
51 : iterator_{std::move(iterator)}, sentinel_{std::move(sentinel)}, func_{func} {
52 // Find next element that satisfy the predication by `func_`.
53 iterator_ = std::ranges::find_if(iterator_, sentinel_, *func_);
54 }
55
57 // Not sure if returning value_type would satisfy input_range requirements.
58 // Works for now.
59 value_type operator*() const { return *iterator_; }
60
63 ++iterator_;
64 // Find next element that satisfy the predication by `func_`.
65 iterator_ = std::ranges::find_if(iterator_, sentinel_, *func_);
66 return *this;
67 }
68
71 auto old = *this;
72 ++(*this);
73 return old;
74 }
75
77 bool operator==(const Iterator& other) const { return iterator_ == other.iterator_; }
78
80 bool operator!=(const Iterator& other) const { return !(*this == other); }
81
82 private:
83 OriginalIterator iterator_;
84 OriginalSentinel sentinel_;
85 Func* func_;
86 };
87
88public:
90 constexpr FilterView(V view, Func func)
91 : original_view_{std::move(view)}, func_{std::move(func)} {}
92
94 constexpr auto begin() const {
95 return Iterator{std::ranges::begin(original_view_), std::ranges::end(original_view_),
96 const_cast<Func*>(&func_)};
97 }
98
100 constexpr auto end() const {
101 return Iterator{std::ranges::end(original_view_), std::ranges::end(original_view_),
102 const_cast<Func*>(&func_)};
103 }
104
105private:
106 V original_view_;
107 Func func_;
108};
109
117template <concepts::Rdd R, typename Func>
118requires std::invocable<Func, utils::RddElementType<R>>&& std::is_convertible_v<
119 std::invoke_result_t<Func, utils::RddElementType<R>>, bool> class FilterRdd
120 : public BaseRdd<FilterRdd<R, Func>> {
121public:
123 friend Base;
124
125public:
131 constexpr FilterRdd(const R& prev, Func func) : Base{prev, false} /*, func_{std::move(func)}*/ {
133 "Instance of FilterRdd does not satisfy Rdd concept.");
134 // Create the filtered splits.
135 for (const concepts::Split auto& prev_split : prev) {
136 splits_.emplace_back(FilterView(prev_split, func), prev_split);
137 splits_.back().addDependency(prev_split);
138 }
139 }
140
141 // Explicitly define default copy constrictor and assignment operator,
142 // because some linters or compilers can not define implicit copy constructors for this class,
143 // though they are supposed to do so.
144 // TODO: find out why.
145 constexpr FilterRdd(const FilterRdd&) = default;
146 FilterRdd& operator=(const FilterRdd&) = default;
147
148private:
149 constexpr auto beginImpl() const { return std::ranges::begin(splits_); }
150
151 constexpr auto endImpl() const { return std::ranges::end(splits_); }
152
153private:
154 std::vector<ViewSplit<FilterView<std::ranges::range_value_t<R>, Func>>> splits_{};
155};
156
160template <typename Func>
161class Filter {
162public:
163 explicit Filter(Func func) : func_{std::move(func)} {}
164
165 template <concepts::Rdd R, typename T = utils::RddElementType<R>>
166 requires std::invocable<Func, T>&& std::is_same_v<std::invoke_result_t<Func, T>, bool> auto
167 operator()(const R& r) const {
168 return FilterRdd(r, func_);
169 }
170
171private:
172 Func func_;
173};
174
178template <typename Func, concepts::Rdd R>
179auto operator|(const R& r, const Filter<Func>& filter) {
180 return filter(r);
181}
182
// end of t_Filter
189
190} // namespace cpark
191
192#endif //CPARK_FILTER_RDD_H
Definition base_rdd.h:522
Definition base_rdd.h:94
Definition filter_rdd.h:161
Definition filter_rdd.h:120
constexpr FilterRdd(const R &prev, Func func)
Definition filter_rdd.h:131
Definition filter_rdd.h:39
Iterator & operator++()
Definition filter_rdd.h:62
bool operator==(const Iterator &other) const
Definition filter_rdd.h:77
bool operator!=(const Iterator &other) const
Definition filter_rdd.h:80
value_type operator*() const
Definition filter_rdd.h:59
Iterator(OriginalIterator iterator, OriginalSentinel sentinel, Func *func)
Definition filter_rdd.h:50
Iterator operator++(int)
Definition filter_rdd.h:70
Definition filter_rdd.h:28
constexpr FilterView(V view, Func func)
Definition filter_rdd.h:90
constexpr auto begin() const
Definition filter_rdd.h:94
constexpr auto end() const
Definition filter_rdd.h:100
Definition base_rdd.h:53
Definition base_rdd.h:44
auto operator|(const R &r, const Collect &collect)
Definition collect.h:48