Commit e81cfbf4 authored by Bednárek David RNDr. Ph.D.'s avatar Bednárek David RNDr. Ph.D.
Browse files

reducing across threads

parent 6bd4f7df
......@@ -36,6 +36,7 @@ namespace fmwkng {
class abstract_element;
using element_ptr = std::unique_ptr< abstract_element>;
using element_observer = const abstract_element *;
enum class element_sense { OPEN, CLOSE };
......@@ -55,7 +56,9 @@ namespace fmwkng {
using element_vector = std::vector< element_ptr>;
using element_const_iterator = element_vector::const_iterator;
using element_observer_vector = std::vector< element_observer>;
class element_list;
class element_list_view {
......@@ -406,42 +409,96 @@ namespace fmwkng {
rn.reduced_children.push_back(std::move(a));
}
rn.reduced_children.push_back((*(rn.source_range.end()-1))->clone());
std::cout << "\t";
print_list(rn.reduced_children);
//std::cout << "\t";
//print_list(rn.reduced_children);
}
}
element_list reduce_list(element_list_view el)
using element_observer_vector = std::vector<element_observer>;
void print_node(const reducer_node& rn, std::size_t depth,
const element_observer_vector & openers = {},
const element_observer_vector & closers = {})
{
element_list nel;
if (depth != 0)
{
if (!(*rn.source_range.begin())->reducible())
{
auto o2 = openers;
auto c2 = closers;
o2.push_back(&**rn.source_range.begin());
c2.push_back(&**(rn.source_range.end()-1));
for (auto&& a : rn.children)
{
print_node(a, depth - 1, o2, c2);
}
}
}
else
{
for (auto&& a : openers)
{
a->metadata_marker(std::cout);
}
rn.reduced_children.metadata_text(std::cout);
for (auto&& a : openers)
{
for (std::size_t i = 0; i < a->data_size(); ++i)
{
std::cout << '\t';
a->data_text(std::cout, i);
}
}
if (rn.reduced_children.data_size())
{
std::cout << '\t';
rn.reduced_children.data_text(std::cout);
}
std::cout << std::endl;
}
}
reducer_list reduce_list(element_list_view el)
{
auto b = el.begin();
reducer_list root = view_to_list( b, el.end());
assert(b == el.end());
reducer_list root = view_to_list( el.begin(), el.end());
auto depth = deepest_reducible(root);
//print_list(std::cout, root);
while (depth > 0)
for (auto d = depth; d > 0;)
{
--depth;
--d;
for (auto&& a : root)
{
reduce_node(a, depth);
reduce_node(a, d);
}
}
for (auto&& a : root)
{
print_list(a.reduced_children);
}
return nel;
return root;
}
void print_all(element_list_view el, element_list_view prefix = {})
{
//print_list(el, prefix);
auto nel = reduce_list(el);
print_list(nel, prefix);
auto root = reduce_list(el);
auto depth = deepest_reducible(root);
//print_list(nel, prefix);
element_observer_vector openers;
for (auto&& a : prefix)
{
openers.push_back(&*a);
}
for (auto d = depth; d > 0;)
{
--d;
for (auto&& a : root)
{
print_node(a, d, openers);
}
}
}
template< typename tag, element_sense sense_p, typename data_t>
......@@ -1048,7 +1105,7 @@ namespace fmwkng {
void ba_send(ba_t v) // clears the ab channel
{
abp_ = ba_promise();
abp_ = ab_promise();
bap_.set_value(std::move(v));
}
private:
......@@ -1065,11 +1122,12 @@ namespace fmwkng {
static constexpr std::size_t AVERAGE = 2;
};
using worker_master_message = std::variant<std::monostate, std::monostate, average_t>;
using worker_master_message = std::variant<std::monostate, std::monostate, average_t, element_list>;
struct worker_master_message_type {
static constexpr std::size_t FINISH = 0;
static constexpr std::size_t CONTINUE = 1;
static constexpr std::size_t COMPUTE_AVERAGE = 2;
static constexpr std::size_t SEND_ELEMENTS = 3;
};
using master_worker_seesaw = seesaw<master_worker_message, worker_master_message>;
......@@ -1465,6 +1523,11 @@ namespace fmwkng {
el_.push_back(std::move(p));
}
void append_elements(element_list&& el) const
{
all_el_.push_back(std::move(el));
}
const element_list& el() const
{
return el_;
......@@ -1956,7 +2019,10 @@ namespace fmwkng {
{
print_elements(dt);
}
print_all_elements(dt);
//print_all_elements(dt);
ss_->ba_send(worker_master_message(std::in_place_index< worker_master_message_type::SEND_ELEMENTS>, std::move(all_el_)));
auto rv = ss_->ab_wait();
assert(rv.index() == master_worker_message_type::RUN);
}
template< typename dt_t>
......@@ -2052,12 +2118,14 @@ namespace fmwkng {
}
}
/*
template< typename dt_t>
void print_all_elements(const dt_t& dt) const
{
auto g = guard();
print_all(all_el_, root_pointer(dt)->el());
}
*/
};
template<>
......@@ -2753,6 +2821,7 @@ namespace fmwkng {
std::size_t n_continue = 0;
std::size_t n_average = 0;
std::size_t n_elements = 0;
average_t sum_average = 0;
for (auto&& rv : rvs)
{
......@@ -2764,10 +2833,17 @@ namespace fmwkng {
++n_average;
sum_average += std::get<impl::worker_master_message_type::COMPUTE_AVERAGE>(rv);
break;
case impl::worker_master_message_type::SEND_ELEMENTS:
++n_elements;
auto && elem = std::get<impl::worker_master_message_type::SEND_ELEMENTS>(rv);
impl::thread_pointer(dt_)->append_elements(std::move(elem));
break;
}
}
assert(!n_continue || !n_average);
assert(!n_average || !n_elements);
assert(!n_continue || !n_elements);
if (!!n_continue)
{
{
......@@ -2783,6 +2859,10 @@ namespace fmwkng {
auto avg = sum_average / n_average;
ws.launch_workers(impl::master_worker_message(std::in_place_index<impl::master_worker_message_type::AVERAGE>, avg));
}
else if (!!n_elements)
{
ws.launch_workers(impl::master_worker_message(std::in_place_index<impl::master_worker_message_type::RUN>));
}
else
{
break;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment