IAtari
Genetic algorithm generating AI capable to play Atari2600 games.
master.hpp
Go to the documentation of this file.
1 #pragma once
2 #include <boost/asio/io_service.hpp>
3 #include <boost/asio/ip/tcp.hpp>
4 #include <boost/asio/spawn.hpp>
5 #include <boost/chrono.hpp>
6 #include <iostream>
7 #include <memory>
8 #include <list>
9 #include <queue>
10 #include <vector>
11 #include "message.hpp"
12 #include "connection.hpp"
13 
20 template <typename Res, typename Params>
26 class master
27 {
28  int port;
32  boost::asio::io_service io_service;
33  std::list<connection_ptr> slaves;
34  std::queue<message::request<Params>> work_queue;
35  std::list<message::result<Res>> intermediate_results;
42  struct session : public std::enable_shared_from_this<session>
43  {
56 
65  };
66 
75  unsigned int init_work_queue(const std::vector<Params>& parameters_to_evaluate);
76 public:
83  master(int port = 54321, int nb_eval_by_parameter = 1, int nb_eval_by_slave = 1);
84 
85  template <typename Client>
86 
93  void run(Client client);
94 };
95 
96 template <typename Res, typename Params>
98 {
99  using namespace boost;
100  auto self(this->shared_from_this());
101 
102  //Coroutine de run
103  asio::spawn(master.io_service,
104  [self, connection, &master](asio::yield_context yield)
105  {
106  message::request<Params> request;
107  try
108  {
109  asio::deadline_timer timer(master.io_service);
110  system::error_code ec;
111  std::string remote_address = connection->get_socket().remote_endpoint().address().to_string();
112  while (connection->get_socket().is_open()) //As long as the slave is online
113  {
114  while (master.work_queue.empty()) //Wait if we have no AI to test
115  {
116  timer.expires_from_now(posix_time::seconds(1));
117  timer.async_wait(yield[ec]);
118  }
119  request = master.work_queue.front(); //Unpack AI
120  master.work_queue.pop();
121  asio::deadline_timer timer_slave(master.io_service, posix_time::seconds(1800));
122  //Start ultimatum for 1800s,
123  timer_slave.async_wait([self, connection, request, remote_address, &master](const system::error_code& ec)
124  {
125  if (ec != boost::asio::error::operation_aborted) //Issue with slave, test crashed
126  {
127  master.work_queue.emplace(request);
128  std::cout << "abort slave from "
129  << remote_address
130  << std::endl;
131  self->close(master, connection);
132  }
133  });
134  do //Force slave to test the AI -> If impossible the timer will make it crash anyway
135  {
136  connection->async_write(request, yield[ec]);
137  if (ec)
138  {
139  timer.expires_from_now(posix_time::seconds(2));
140  timer.async_wait(yield[ec]);
141  }
142  } while (ec);
143  message::result<Res> res;
144  connection->async_read(res, yield[ec]); //Get the result
145  timer_slave.cancel(); //Wait for slave to finish testing
146  if (ec) //Issue with result -> re-add AI to the testList
147  {
148  std::cout << "master read rejected" << std::endl;
149  master.work_queue.emplace(request);
150  continue;
151  }
152  // std::cout << "received results from: "
153  // << remote_address
154  // << std::endl;
155  master.intermediate_results.emplace_back(res);
156  // if (master.intermediate_results.size() % 50 == 0)
157  // std::cout << master.intermediate_results.size() << std::endl;
158  }
159  }
160  catch (const std::exception& e)
161  {
162  master.work_queue.emplace(request);
163  std::cout << e.what() << std::endl;
164  self->close(master, connection);
165  }
166  });
167 }
168 
169 template <typename Res, typename Params>
171 {
172  master.slaves.remove(connection);
173  connection->close();
174 }
175 
176 template <typename Res, typename Params>
177 unsigned int master<Res, Params>::init_work_queue(const std::vector<Params>& parameters_to_evaluate)
178 {
179  unsigned int nb_jobs = 0;
180  int id = 0;
181  uint64_t seed = 1;
182  for (const auto& p : parameters_to_evaluate) //For each AI in params
183  {
185  int n = std::min(nb_eval_by_parameter, nb_eval_by_slave);
186  int rest = nb_eval_by_parameter;
187  for (int i = 0; i < nb_eval_by_parameter; i += n, rest -= n)
188  {
189  r.id = id;
190  r.nb_eval = n;
191  r.seed = seed++;
192  r.params = p;
193  work_queue.emplace(r);
194  ++nb_jobs;
195  }
196  if (rest != 0)
197  {
198  r.id = id;
199  r.nb_eval = rest;
200  r.seed = seed++;
201  r.params = p;
202  work_queue.emplace(r);
203  ++nb_jobs;
204  }
205  ++id;
206  seed = 1;
207  }
208  return nb_jobs;
209 }
210 
211 template <typename Res, typename Params>
214 {
215 }
216 
217 template <typename Res, typename Params>
218 template <typename Client>
219 void master<Res, Params>::run(Client client)
220 {
221  using namespace boost;
222  //Coroutine de connexion
223  asio::spawn(io_service,
224  [&](asio::yield_context yield)
225  {
226  asio::ip::tcp::acceptor acceptor(io_service, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port));
227  while (true)
228  {
229  system::error_code ec;
230  asio::ip::tcp::socket socket(io_service);
231  acceptor.async_accept(socket, yield[ec]);//Try to accept a connexion
232  if (!ec) //If succesfull
233  {
234  slaves.emplace_back(new connection(std::move(socket)));//Add the connection to the list of slaves
235  auto session_ptr = std::make_shared<session>();
236  std::cout << "new connection from: "
237  << slaves.back()->get_socket().remote_endpoint().address().to_string()
238  << std::endl;
239  session_ptr->run(*this, slaves.back());//Call run on the new slave with us as master
240  }
241  }
242  });
243 
244  //Coroutine de job
245  asio::spawn(io_service,
246  [&](asio::yield_context yield)
247  {
248  system::error_code ec;
249  asio::deadline_timer timer(io_service);
250  while (!client->finished())//We still to iterate
251  {
252  unsigned int nb_params, nb_jobs;
253  const auto params = client->get_parameters(yield); //Get AI list
254  nb_params = params.size();
255  nb_jobs = init_work_queue(params);
256  while (intermediate_results.size() < nb_jobs)
257  {
258  timer.expires_from_now(posix_time::seconds(2));
259  timer.async_wait(yield[ec]);
260  }
261  std::cout << "master "
262  << intermediate_results.size()
263  << " jobs done"
264  << std::endl;
265  std::vector<std::list<Res>> res(nb_params);
266  for (auto& r : intermediate_results)
267  {
268  res[r.id].splice(res[r.id].end(), r.results);
269  }
270  client->set_results(std::move(res), yield);
271  intermediate_results.clear();
272  }
273  io_service.stop();
274  });
275  io_service.run();
276 }
void run(master< Res, Params > &master, connection_ptr connection)
Automatically launches test on agents when master is ready.
Definition: master.hpp:97
unsigned int init_work_queue(const std::vector< Params > &parameters_to_evaluate)
Fills the work queue with the list of agent in accordance the evaluations parameters.
Definition: master.hpp:177
void run(Client client)
Start a coroutine handling incoming slave connection and a coroutine handling the tests.
Definition: master.hpp:219
std::list< message::result< Res > > intermediate_results
Definition: master.hpp:35
int id
Definition: message.hpp:26
int port
Definition: master.hpp:28
master(int port=54321, int nb_eval_by_parameter=1, int nb_eval_by_slave=1)
Standard constructor.
Definition: master.hpp:212
boost::asio::io_service io_service
Definition: master.hpp:32
int nb_eval_by_slave
Definition: master.hpp:30
void close(master< Res, Params > &master, connection_ptr connection)
Remove a slave from the list of available slaves and close his connection.
Definition: master.hpp:170
Handle serializable representations of agents.
std::queue< message::request< Params > > work_queue
Definition: master.hpp:34
uint64_t seed
Definition: message.hpp:28
Master part of the algorithm, handle server-side connection and redistribute evenly the tests to the ...
Definition: master.hpp:26
int nb_eval
Definition: message.hpp:27
void close()
Disable then close the socket. May return an error if it fails to disable the socket.
Definition: connection.hpp:72
Handle networking.
Definition: connection.hpp:25
Represent an agent to be tested, can be (un)serialized.
Definition: message.hpp:24
int nb_eval_by_parameter
Definition: master.hpp:29
T params
Definition: message.hpp:29
std::list< connection_ptr > slaves
Definition: master.hpp:33
Coroutine managing ONE slave.
Definition: master.hpp:42
Handle networking.
std::shared_ptr< connection > connection_ptr
Definition: connection.hpp:133