2 #include <boost/asio/io_service.hpp> 3 #include <boost/asio/ip/tcp.hpp> 4 #include <boost/asio/spawn.hpp> 5 #include <boost/chrono.hpp> 20 template <
typename Res,
typename Params>
42 struct session :
public std::enable_shared_from_this<session>
75 unsigned int init_work_queue(
const std::vector<Params>& parameters_to_evaluate);
85 template <
typename Client>
93 void run(Client client);
96 template <
typename Res,
typename Params>
99 using namespace boost;
100 auto self(this->shared_from_this());
106 message::request<Params> request;
109 asio::deadline_timer timer(master.io_service);
110 system::error_code ec;
111 std::string remote_address = connection->get_socket().remote_endpoint().address().to_string();
112 while (connection->get_socket().is_open())
114 while (master.work_queue.empty())
116 timer.expires_from_now(posix_time::seconds(1));
117 timer.async_wait(yield[ec]);
119 request = master.work_queue.front();
120 master.work_queue.pop();
121 asio::deadline_timer timer_slave(master.io_service, posix_time::seconds(1800));
123 timer_slave.async_wait([self, connection, request, remote_address, &master](const system::error_code& ec)
125 if (ec != boost::asio::error::operation_aborted)
127 master.work_queue.emplace(request);
128 std::cout <<
"abort slave from " 131 self->close(master, connection);
136 connection->async_write(request, yield[ec]);
139 timer.expires_from_now(posix_time::seconds(2));
140 timer.async_wait(yield[ec]);
143 message::result<Res> res;
144 connection->async_read(res, yield[ec]);
145 timer_slave.cancel();
148 std::cout <<
"master read rejected" << std::endl;
149 master.work_queue.emplace(request);
155 master.intermediate_results.emplace_back(res);
160 catch (
const std::exception& e)
163 std::cout << e.what() << std::endl;
169 template <
typename Res,
typename Params>
176 template <
typename Res,
typename Params>
179 unsigned int nb_jobs = 0;
182 for (
const auto& p : parameters_to_evaluate)
211 template <
typename Res,
typename Params>
217 template <
typename Res,
typename Params>
218 template <
typename Client>
221 using namespace boost;
223 asio::spawn(io_service,
224 [&](asio::yield_context yield)
226 asio::ip::tcp::acceptor acceptor(io_service, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port));
229 system::error_code ec;
230 asio::ip::tcp::socket socket(io_service);
231 acceptor.async_accept(socket, yield[ec]);
234 slaves.emplace_back(
new connection(std::move(socket)));
235 auto session_ptr = std::make_shared<session>();
236 std::cout <<
"new connection from: " 237 << slaves.back()->get_socket().remote_endpoint().address().to_string()
239 session_ptr->run(*
this, slaves.back());
245 asio::spawn(io_service,
246 [&](asio::yield_context yield)
248 system::error_code ec;
249 asio::deadline_timer timer(io_service);
250 while (!client->finished())
252 unsigned int nb_params, nb_jobs;
253 const auto params = client->get_parameters(yield);
254 nb_params = params.size();
255 nb_jobs = init_work_queue(params);
256 while (intermediate_results.size() < nb_jobs)
258 timer.expires_from_now(posix_time::seconds(2));
259 timer.async_wait(yield[ec]);
261 std::cout <<
"master " 262 << intermediate_results.size()
265 std::vector<std::list<Res>> res(nb_params);
266 for (
auto& r : intermediate_results)
268 res[r.id].splice(res[r.id].end(), r.results);
270 client->set_results(std::move(res), yield);
271 intermediate_results.clear();
void run(master< Res, Params > &master, connection_ptr connection)
Automatically launches test on agents when master is ready.
Definition: master.hpp:97
unsigned int init_work_queue(const std::vector< Params > ¶meters_to_evaluate)
Fills the work queue with the list of agent in accordance the evaluations parameters.
Definition: master.hpp:177
void run(Client client)
Start a coroutine handling incoming slave connection and a coroutine handling the tests.
Definition: master.hpp:219
std::list< message::result< Res > > intermediate_results
Definition: master.hpp:35
int id
Definition: message.hpp:26
int port
Definition: master.hpp:28
master(int port=54321, int nb_eval_by_parameter=1, int nb_eval_by_slave=1)
Standard constructor.
Definition: master.hpp:212
boost::asio::io_service io_service
Definition: master.hpp:32
int nb_eval_by_slave
Definition: master.hpp:30
void close(master< Res, Params > &master, connection_ptr connection)
Remove a slave from the list of available slaves and close his connection.
Definition: master.hpp:170
Handle serializable representations of agents.
std::queue< message::request< Params > > work_queue
Definition: master.hpp:34
uint64_t seed
Definition: message.hpp:28
Master part of the algorithm, handle server-side connection and redistribute evenly the tests to the ...
Definition: master.hpp:26
int nb_eval
Definition: message.hpp:27
void close()
Disable then close the socket. May return an error if it fails to disable the socket.
Definition: connection.hpp:72
Handle networking.
Definition: connection.hpp:25
Represent an agent to be tested, can be (un)serialized.
Definition: message.hpp:24
int nb_eval_by_parameter
Definition: master.hpp:29
T params
Definition: message.hpp:29
std::list< connection_ptr > slaves
Definition: master.hpp:33
Coroutine managing ONE slave.
Definition: master.hpp:42
std::shared_ptr< connection > connection_ptr
Definition: connection.hpp:133