2 #include <boost/asio/io_service.hpp> 3 #include <boost/asio/ip/tcp.hpp> 4 #include <boost/asio/spawn.hpp> 5 #include <boost/chrono.hpp> 14 template <
typename Res,
typename Params>
18 int nb_eval_by_parameter;
21 boost::asio::io_service io_service;
22 std::list<connection_ptr> slaves;
23 std::queue<message::request<Params>> work_queue;
24 std::list<message::result<Res>> intermediate_results;
26 struct session :
public std::enable_shared_from_this<session>
32 unsigned int init_work_queue(
const std::vector<Params>& parameters_to_evaluate);
34 master(
int port = 54321,
int nb_eval_by_parameter = 1,
int nb_eval_by_slave = 1);
35 template <
typename Client>
36 void run(Client client);
39 template <
typename Res,
typename Params>
42 using namespace boost;
43 auto self(this->shared_from_this());
44 asio::spawn(
master.io_service,
47 message::request<Params> request;
50 asio::deadline_timer timer(master.io_service);
51 system::error_code ec;
52 std::string remote_address = connection->get_socket().remote_endpoint().address().to_string();
53 while (connection->get_socket().is_open())
55 while (master.work_queue.empty())
57 timer.expires_from_now(posix_time::seconds(1));
58 timer.async_wait(yield[ec]);
60 request = master.work_queue.front();
61 master.work_queue.pop();
62 asio::deadline_timer timer_slave(master.io_service, posix_time::seconds(1800));
63 timer_slave.async_wait([self, connection, request, remote_address, &master](const system::error_code& ec)
65 if (ec != boost::asio::error::operation_aborted)
67 master.work_queue.emplace(request);
68 std::cout <<
"abort slave from " 71 self->close(master, connection);
76 connection->async_write(request, yield[ec]);
79 timer.expires_from_now(posix_time::seconds(2));
80 timer.async_wait(yield[ec]);
83 message::result<Res> res;
84 connection->async_read(res, yield[ec]);
88 std::cout <<
"master read rejected" << std::endl;
89 master.work_queue.emplace(request);
95 master.intermediate_results.emplace_back(res);
100 catch (
const std::exception& e)
102 master.work_queue.emplace(request);
103 std::cout << e.what() << std::endl;
109 template <
typename Res,
typename Params>
116 template <
typename Res,
typename Params>
119 unsigned int nb_jobs = 0;
122 for (
const auto& p : parameters_to_evaluate)
125 int n = std::min(nb_eval_by_parameter, nb_eval_by_slave);
126 int rest = nb_eval_by_parameter;
127 for (
int i = 0; i < nb_eval_by_parameter; i += n, rest -= n)
133 work_queue.emplace(r);
142 work_queue.emplace(r);
151 template <
typename Res,
typename Params>
153 : port(port), nb_eval_by_parameter(nb_eval_by_parameter), nb_eval_by_slave(nb_eval_by_slave)
157 template <
typename Res,
typename Params>
158 template <
typename Client>
161 using namespace boost;
162 asio::spawn(io_service,
163 [&](asio::yield_context yield)
165 asio::ip::tcp::acceptor acceptor(io_service,
166 asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port));
169 system::error_code ec;
170 asio::ip::tcp::socket socket(io_service);
171 acceptor.async_accept(socket, yield[ec]);
174 slaves.emplace_back(
new connection(std::move(socket)));
175 auto session_ptr = std::make_shared<session>();
176 std::cout <<
"new connection from: " 177 << slaves.back()->get_socket().remote_endpoint().address().to_string()
179 session_ptr->run(*
this, slaves.back());
184 asio::spawn(io_service,
185 [&](asio::yield_context yield)
187 system::error_code ec;
188 asio::deadline_timer timer(io_service);
189 while (!client->finished())
191 unsigned int nb_params, nb_jobs;
192 const auto params = client->get_parameters(yield);
193 nb_params = params.size();
194 nb_jobs = init_work_queue(params);
195 while (intermediate_results.size() < nb_jobs)
197 timer.expires_from_now(posix_time::seconds(2));
198 timer.async_wait(yield[ec]);
200 std::cout <<
"master " 201 << intermediate_results.size()
204 std::vector<std::list<Res>> res(nb_params);
205 for (
auto& r : intermediate_results)
207 res[r.
id].splice(res[r.
id].end(), r.results);
209 client->set_results(std::move(res), yield);
210 intermediate_results.clear();
void run(Client client)
Definition: master.hpp:159
int id
Definition: message.hpp:9
void close()
Definition: connection.hpp:39
void run(int argc, char *argv[])
Definition: master_slave.cpp:11
master(int port=54321, int nb_eval_by_parameter=1, int nb_eval_by_slave=1)
Definition: master.hpp:152
uint64_t seed
Definition: message.hpp:11
Definition: master.hpp:15
int nb_eval
Definition: message.hpp:10
Definition: connection.hpp:13
Definition: message.hpp:7
T params
Definition: message.hpp:12
std::shared_ptr< connection > connection_ptr
Definition: connection.hpp:85