server.cpp 5.2 KB


  1. #include "server.hpp"
  2. Server::Server(size_t _max_clients,int port){
  3. //--------------------------
  4. // Create connection socket
  5. //--------------------------
  6. connection_socket=socket(AF_INET, SOCK_STREAM, 0);
  7. if(connection_socket<0){
  8. cerr<<"[Error] Can not open socket"<<endl;
  9. exit(-1);
  10. }
  11. //Set socket for nonblocking
  12. if(fcntl(connection_socket, F_SETFL, O_NONBLOCK)<0){
  13. cerr<<"[Error] Can not set socket for unblocking"<<endl;
  14. exit(-1);
  15. }
  16. sockaddr_in serv_addr;
  17. bzero((char *) &serv_addr, sizeof(serv_addr));
  18. serv_addr.sin_family = AF_INET;
  19. serv_addr.sin_addr.s_addr = INADDR_ANY;
  20. serv_addr.sin_port = htons(port);
  21. if(bind(connection_socket,(struct sockaddr *)&serv_addr,sizeof(serv_addr))<0){
  22. cerr<<"[Error] Can not bind socket"<<endl;
  23. exit(-1);
  24. }
  25. listen(connection_socket,5);
  26. //---------
  27. // Clients
  28. //---------
  29. int pid=getpid();
  30. cout<<"Pid is "<<pid<<endl;
  31. max_clients=_max_clients;
  32. struct rlimit limit;
  33. getrlimit(RLIMIT_NOFILE,&limit);
  34. limit.rlim_cur=max_clients;
  35. int res=prlimit(0,RLIMIT_NOFILE,&limit,NULL);
  36. cout<<"Maximal number of clients = "<<max_clients<<endl;
  37. nb_clients=0;
  38. clients=new ClientInformation[max_clients];
  39. //---------
  40. // Tasks
  41. //---------
  42. tasks=nullptr;
  43. nb_tasks=0;
  44. nb_finished_tasks=0;
  45. nb_workers=0;
  46. treat=nullptr;
  47. }
  48. void Server::set_tasks(Task* _tasks,size_t _nb_tasks){
  49. tasks=_tasks;
  50. nb_tasks=_nb_tasks;
  51. nb_finished_tasks=0;
  52. }
  53. Server::~Server(){
  54. cout<<" Close server."<<endl;
  55. for(size_t c=0;c<nb_clients;++c){
  56. close(clients[c].socket);
  57. }
  58. delete[] clients;
  59. close(connection_socket);
  60. }
  61. void
  62. Server::listen_for_new_clients(){
  63. if(nb_clients<max_clients){
  64. int socket=accept(connection_socket, NULL, NULL);
  65. if(socket>=0){
  66. //Set socket for nonblocking
  67. fcntl(socket, F_SETFL, O_NONBLOCK);
  68. clients[nb_clients].socket=socket;
  69. clients[nb_clients].has_message=false;
  70. clients[nb_clients].is_worker=false;
  71. clients[nb_clients].current_task=nullptr;
  72. ++nb_clients;
  73. }
  74. }
  75. }
  76. void Server::listen_clients(){
  77. char buffer;
  78. for(size_t c=0;c<nb_clients;++c){
  79. int l=recv(clients[c].socket,&buffer,1,MSG_PEEK);
  80. if(l>0){
  81. if(not clients[c].has_message){
  82. get_message(c);
  83. }
  84. }
  85. if(l==0) remove_client(c);
  86. }
  87. }
  88. void Server::remove_client(size_t c){
  89. ClientInformation& client=clients[c];
  90. if(client.is_worker){
  91. Task* task=client.current_task;
  92. if(task!=nullptr){
  93. if(task->get_statut()!=Task::Done){
  94. task->set_statut(Task::Unaffected);
  95. }
  96. }
  97. --nb_workers;
  98. }
  99. --nb_clients;
  100. close(clients[c].socket);
  101. for(size_t i=c;i<nb_clients;++i){
  102. clients[i]=clients[i+1];
  103. }
  104. }
  105. void
  106. Server::get_message(size_t c){
  107. //cout<<"Receive message from "<<c<<endl;
  108. char buffer;
  109. Message& message=clients[c].message;
  110. message.clear();
  111. do{
  112. recv(clients[c].socket,&buffer,1,0);
  113. }while(message.add(buffer));
  114. clients[c].has_message=true;
  115. }
  116. void
  117. Server::send_string(size_t c,string str){
  118. size_t offset=0;
  119. Message msg;
  120. while(offset<str.size()){
  121. size_t len=min((size_t)(MAX_MSG_SIZE-3),str.size()-offset);
  122. msg.set_string(str.substr(offset,len));
  123. send_message(c,msg);
  124. offset+=len;
  125. }
  126. }
  127. void
  128. Server::treat_message(size_t c){
  129. //cout<<"Treat message from "<<c<<endl;
  130. Message& message=clients[c].message;
  131. switch(message.get_type()){
  132. case CODE:
  133. switch(message.get_code()){
  134. case 'I':
  135. send_informations(c);
  136. break;
  137. case 'K':
  138. for(size_t c=0;c<nb_clients;++c){
  139. send_code(c,'K');
  140. }
  141. exit(0);
  142. break;
  143. case 'W':
  144. clients[c].is_worker=true;
  145. clients[c].current_task=nullptr;
  146. ++nb_workers;
  147. break;
  148. default:
  149. cerr<<"[Error] Receive unexpected code "<<message.get_code()<<" from client "<<c<<endl;
  150. break;
  151. };
  152. break;
  153. case DATA:
  154. //cout<<" DATA"<<endl;
  155. get_task(c);
  156. clients[c].current_task=nullptr;
  157. break;
  158. default:
  159. cerr<<"[Error] Receive unexpected message from client "<<c<<" with type "<<message.get_type()<<endl;
  160. break;
  161. }
  162. clients[c].has_message=false;
  163. }
  164. void
  165. Server::send_informations(size_t c){
  166. send_string(c,"Tasks : "+to_string(nb_tasks));
  167. send_string(c,"Tasks done : "+to_string(nb_finished_tasks));
  168. send_string(c,"Clients : "+to_string(nb_clients));
  169. send_string(c,"Workers : "+to_string(nb_workers));
  170. send_code(c,'E');
  171. }
  172. void
  173. Server::affect_tasks(){
  174. for(size_t c=0;c<nb_clients;++c){
  175. if(clients[c].is_worker and clients[c].current_task==nullptr){
  176. affect_task(c);
  177. }
  178. }
  179. }
  180. void
  181. Server::affect_task(size_t c){
  182. //Find first unafected task
  183. for(size_t i=0;i<nb_tasks;++i){
  184. if(tasks[i].get_statut()==Task::Unaffected){
  185. //Affect task i
  186. clients[c].current_task=&tasks[i];
  187. tasks[i].set_statut(Task::Affected);
  188. Message msg;
  189. msg.set_data(tasks[i].get_input(),tasks[i].get_input_size());
  190. send_message(c,msg);
  191. return;
  192. }
  193. }
  194. }
  195. void
  196. Server::get_task(size_t c){
  197. Task* task=clients[c].current_task;
  198. Message& message=clients[c].message;
  199. task->set_output(message.get_data_buffer(),message.get_data_size());
  200. task->set_statut(Task::Done);
  201. ++nb_finished_tasks;
  202. if(treat!=nullptr){
  203. bool stop=treat(*task);
  204. if(stop) nb_finished_tasks=nb_tasks;
  205. }
  206. }