183 std::shared_lock
guard(cache_mutex_);
184 return dependent_by_.contains(
split_id) && dependent_by_.at(
split_id).size() >= 2;
189 std::shared_lock
guard(cache_mutex_);
190 return cache_done_.contains(
split_id) &&
191 cache_done_[
split_id].wait_for(std::chrono::seconds(0)) == std::future_status::ready;
195 std::shared_lock
guard(cache_mutex_);
196 dependent_by_[
to].insert(
from);
201 std::shared_lock
guard(cache_mutex_);
205 template <
typename CacheType,
typename OriginalIterator>
206 std::shared_future<void> startCalculationOrGetFuture(
SplitId split_id, OriginalIterator begin,
207 OriginalIterator end) {
209 std::shared_lock
guard(cache_mutex_);
210 if (cache_done_.contains(
split_id)) {
215 std::promise<void> promise{};
218 std::unique_lock guard(cache_mutex_);
219 if (cache_done_.contains(split_id)) {
220 return cache_done_[split_id];
222 cache_done_[split_id] = promise.get_future();
226 std::copy(begin, end, std::back_inserter(cache));
229 std::unique_lock guard(cache_mutex_);
230 cache_[split_id] = std::move(cache);
232 return cache_done_[split_id];
240 std::atomic<RddId> next_rdd_id_{};
241 std::atomic<SplitId> next_split_id_{};
244 std::unordered_map<SplitId, std::unordered_set<SplitId>> dependent_by_{};
247 std::unordered_map<SplitId, std::any> cache_{};
248 mutable std::unordered_map<SplitId, std::shared_future<void>> cache_done_{};
249 mutable std::shared_mutex cache_mutex_{};
SplitId getAndIncSplitId()
Definition cpark.h:179
const std::any & getSplitCache(SplitId split_id) const
Definition cpark.h:200
bool splitCached(SplitId split_id) const noexcept
Definition cpark.h:188
void setConfig(Config config)
Definition cpark.h:170
ExecutionContext(Config config)
Definition cpark.h:167
ExecutionContext()=default
bool splitShouldCache(SplitId split_id) const noexcept
Definition cpark.h:182
const Config & getConfig() const noexcept
Definition cpark.h:173