CPARK 1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
reduce.h
1#ifndef CPARK_REDUCE_H
2#define CPARK_REDUCE_H
3
4#include <future>
5#include <numeric>
6
7#include "base_rdd.h"
8#include "utils.h"
9
10namespace cpark {
11
23template <typename Func>
24class Reduce {
25public:
27 explicit Reduce(Func func) : func_{std::move(func)} {}
28
30 template <concepts::Rdd R, typename T = utils::RddElementType<R>>
31 requires std::invocable<Func, T, T>&& std::convertible_to<std::invoke_result_t<Func, T, T>, T>&&
32 std::is_default_constructible_v<T>
33 T operator()(const R& rdd) const {
34 // Parallelly compute the result.
35 // TODO: add complete parallel control and sync logic.
36 std::vector<std::future<T>> futures{};
37 for (const auto& split : rdd) {
38 futures.emplace_back(std::async([this, &split]() {
39 return std::reduce(std::ranges::begin(split), std::ranges::end(split), T{}, func_);
40 }));
41 }
42 auto results = std::ranges::subrange(futures) |
43 std::views::transform([](std::future<T>& fut) { return fut.get(); });
44 return std::reduce(std::ranges::begin(results), std::ranges::end(results), T{}, func_);
45 }
46
47private:
48 Func func_;
49};
50
54template <typename Func, concepts::Rdd R>
55auto operator|(const R& r, const Reduce<Func>& reduce) {
56 return reduce(r);
57}
58
// end of a_Reduce
66
67} // namespace cpark
68
69#endif //CPARK_REDUCE_H
Definition base_rdd.h:94
Definition reduce.h:24
Reduce(Func func)
Definition reduce.h:27
T operator()(const R &rdd) const
Definition reduce.h:33
auto operator|(const R &r, const Collect &collect)
Definition collect.h:48