CPARK
1.0
A light-weighted, distributed computing framework for C++ that offers a fast and general-purpose large data processing solution.
Loading...
Searching...
No Matches
include
reduce.h
1
#ifndef CPARK_REDUCE_H
2
#define CPARK_REDUCE_H
3
4
#include <future>
5
#include <numeric>
6
7
#include "base_rdd.h"
8
#include "utils.h"
9
10
namespace
cpark {
11
23
template
<
typename
Func>
24
class
Reduce
{
25
public
:
27
explicit
Reduce
(
Func
func) : func_{std::
move
(func)} {}
28
30
template
<concepts::Rdd R,
typename
T = utils::RddElementType<R>>
31
requires
std::invocable<Func, T, T>&& std::convertible_to<std::invoke_result_t<Func, T, T>,
T
>&&
32
std::is_default_constructible_v<T>
33
T
operator()
(
const
R&
rdd
)
const
{
34
// Parallelly compute the result.
35
// TODO: add complete parallel control and sync logic.
36
std::vector<std::future<T>>
futures
{};
37
for
(
const
auto
&
split
:
rdd
) {
38
futures
.emplace_back(std::async([
this
, &
split
]() {
39
return
std::reduce(std::ranges::begin(
split
), std::ranges::end(
split
),
T
{}, func_);
40
}));
41
}
42
auto
results
= std::ranges::subrange(
futures
) |
43
std::views::transform([](std::future<T>&
fut
) {
return
fut
.get(); });
44
return
std::reduce(std::ranges::begin(
results
), std::ranges::end(
results
),
T
{}, func_);
45
}
46
47
private
:
48
Func
func_;
49
};
50
54
template
<
typename
Func, concepts::Rdd R>
55
auto
operator|
(
const
R&
r
,
const
Reduce<Func>
&
reduce
) {
56
return
reduce
(
r
);
57
}
58
// end of a_Reduce
66
67
}
// namespace cpark
68
69
#endif
//CPARK_REDUCE_H
cpark::BaseSplit
Definition
base_rdd.h:94
cpark::Reduce
Definition
reduce.h:24
cpark::Reduce::Reduce
Reduce(Func func)
Definition
reduce.h:27
cpark::Reduce::operator()
T operator()(const R &rdd) const
Definition
reduce.h:33
cpark::operator|
auto operator|(const R &r, const Collect &collect)
Definition
collect.h:48
Generated by
1.9.8