]> git.sesse.net Git - stockfish/blob - src/cluster.cpp
Merge remote-tracking branch 'upstream/master' into clusterMergeMaster7
[stockfish] / src / cluster.cpp
1 /*
2   Stockfish, a UCI chess playing engine derived from Glaurung 2.1
3   Copyright (C) 2004-2008 Tord Romstad (Glaurung author)
4   Copyright (C) 2008-2015 Marco Costalba, Joona Kiiski, Tord Romstad
5   Copyright (C) 2015-2019 Marco Costalba, Joona Kiiski, Gary Linscott, Tord Romstad
6
7   Stockfish is free software: you can redistribute it and/or modify
8   it under the terms of the GNU General Public License as published by
9   the Free Software Foundation, either version 3 of the License, or
10   (at your option) any later version.
11
12   Stockfish is distributed in the hope that it will be useful,
13   but WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15   GNU General Public License for more details.
16
17   You should have received a copy of the GNU General Public License
18   along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #ifdef USE_MPI
22
23 #include <array>
24 #include <map>
25 #include <cstddef>
26 #include <cstdlib>
27 #include <iostream>
28 #include <istream>
29 #include <mpi.h>
30 #include <string>
31 #include <vector>
32
33 #include "cluster.h"
34 #include "thread.h"
35 #include "tt.h"
36 #include "timeman.h"
37
38 namespace Cluster {
39
40 // Total number of ranks and rank within the communicator
41 static int world_rank = MPI_PROC_NULL;
42 static int world_size = 0;
43
44 // Signals between ranks exchange basic info using a dedicated communicator
45 static MPI_Comm signalsComm = MPI_COMM_NULL;
46 static MPI_Request reqSignals = MPI_REQUEST_NULL;
47 static uint64_t signalsCallCounter = 0;
48
49 // Signals are the number of nodes searched, stop, table base hits, transposition table saves
50 enum Signals : int { SIG_NODES = 0, SIG_STOP = 1, SIG_TB = 2, SIG_TTS = 3, SIG_NB = 4};
51 static uint64_t signalsSend[SIG_NB] = {};
52 static uint64_t signalsRecv[SIG_NB] = {};
53 static uint64_t nodesSearchedOthers = 0;
54 static uint64_t tbHitsOthers = 0;
55 static uint64_t TTsavesOthers = 0;
56 static uint64_t stopSignalsPosted = 0;
57
58 // The UCI threads of each rank exchange use a dedicated communicator
59 static MPI_Comm InputComm = MPI_COMM_NULL;
60
61 // bestMove requires MoveInfo communicators and data types
62 static MPI_Comm MoveComm = MPI_COMM_NULL;
63 static MPI_Datatype MIDatatype = MPI_DATATYPE_NULL;
64
65 // TT entries are communicated with a dedicated communicator.
66 // The receive buffer is used to gather information from all ranks.
67 // THe TTCacheCounter tracks the number of local elements that are ready to be sent.
68 static MPI_Comm TTComm = MPI_COMM_NULL;
69 static std::array<std::vector<KeyedTTEntry>, 2> TTSendRecvBuffs;
70 static std::array<MPI_Request, 2> reqsTTSendRecv = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
71 static uint64_t sendRecvPosted = 0;
72 static std::atomic<uint64_t> TTCacheCounter = {};
73
74 /// Initialize MPI and associated data types. Note that the MPI library must be configured
75 /// to support MPI_THREAD_MULTIPLE, since multiple threads access MPI simultaneously.
76 void init() {
77
78   int thread_support;
79   MPI_Init_thread(nullptr, nullptr, MPI_THREAD_MULTIPLE, &thread_support);
80   if (thread_support < MPI_THREAD_MULTIPLE)
81   {
82       std::cerr << "Stockfish requires support for MPI_THREAD_MULTIPLE."
83                 << std::endl;
84       std::exit(EXIT_FAILURE);
85   }
86
87   MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
88   MPI_Comm_size(MPI_COMM_WORLD, &world_size);
89
90   const std::array<MPI_Aint, 5> MIdisps = {offsetof(MoveInfo, move),
91                                            offsetof(MoveInfo, ponder),
92                                            offsetof(MoveInfo, depth),
93                                            offsetof(MoveInfo, score),
94                                            offsetof(MoveInfo, rank)};
95   MPI_Type_create_hindexed_block(5, 1, MIdisps.data(), MPI_INT, &MIDatatype);
96   MPI_Type_commit(&MIDatatype);
97
98   MPI_Comm_dup(MPI_COMM_WORLD, &InputComm);
99   MPI_Comm_dup(MPI_COMM_WORLD, &TTComm);
100   MPI_Comm_dup(MPI_COMM_WORLD, &MoveComm);
101   MPI_Comm_dup(MPI_COMM_WORLD, &signalsComm);
102 }
103
104 /// Finalize MPI and free the associated data types.
105 void finalize() {
106
107   MPI_Type_free(&MIDatatype);
108
109   MPI_Comm_free(&InputComm);
110   MPI_Comm_free(&TTComm);
111   MPI_Comm_free(&MoveComm);
112   MPI_Comm_free(&signalsComm);
113
114   MPI_Finalize();
115 }
116
117 /// Return the total number of ranks
118 int size() {
119
120   return world_size;
121 }
122
123 /// Return the rank (index) of the process
124 int rank() {
125
126   return world_rank;
127 }
128
129 /// The receive buffer depends on the number of MPI ranks and threads, resize as needed
130 void ttSendRecvBuff_resize(size_t nThreads) {
131
132   for (int i : {0, 1})
133   {
134      TTSendRecvBuffs[i].resize(TTCacheSize * world_size * nThreads);
135      std::fill(TTSendRecvBuffs[i].begin(), TTSendRecvBuffs[i].end(), KeyedTTEntry());
136   }
137 }
138
139 /// As input is only received by the root (rank 0) of the cluster, this input must be relayed
140 /// to the UCI threads of all ranks, in order to setup the position, etc. We do this with a
141 /// dedicated getline implementation, where the root broadcasts to all other ranks the received
142 /// information.
143 bool getline(std::istream& input, std::string& str) {
144
145   int size;
146   std::vector<char> vec;
147   bool state;
148
149   if (is_root())
150   {
151       state = static_cast<bool>(std::getline(input, str));
152       vec.assign(str.begin(), str.end());
153       size = vec.size();
154   }
155
156   // Some MPI implementations use busy-wait polling, while we need yielding as otherwise
157   // the UCI thread on the non-root ranks would be consuming resources.
158   static MPI_Request reqInput = MPI_REQUEST_NULL;
159   MPI_Ibcast(&size, 1, MPI_INT, 0, InputComm, &reqInput);
160   if (is_root())
161       MPI_Wait(&reqInput, MPI_STATUS_IGNORE);
162   else
163   {
164       while (true)
165       {
166           int flag;
167           MPI_Test(&reqInput, &flag, MPI_STATUS_IGNORE);
168           if (flag)
169               break;
170           else
171               std::this_thread::sleep_for(std::chrono::milliseconds(10));
172       }
173   }
174
175   // Broadcast received string
176   if (!is_root())
177       vec.resize(size);
178   MPI_Bcast(vec.data(), size, MPI_CHAR, 0, InputComm);
179   if (!is_root())
180       str.assign(vec.begin(), vec.end());
181   MPI_Bcast(&state, 1, MPI_CXX_BOOL, 0, InputComm);
182
183   return state;
184 }
185
186 /// Sending part of the signal communication loop
187 void signals_send() {
188
189   signalsSend[SIG_NODES] = Threads.nodes_searched();
190   signalsSend[SIG_TB] = Threads.tb_hits();
191   signalsSend[SIG_TTS] = Threads.TT_saves();
192   signalsSend[SIG_STOP] = Threads.stop;
193   MPI_Iallreduce(signalsSend, signalsRecv, SIG_NB, MPI_UINT64_T,
194                  MPI_SUM, signalsComm, &reqSignals);
195   ++signalsCallCounter;
196 }
197
198 /// Processing part of the signal communication loop.
199 /// For some counters (e.g. nodes) we only keep their sum on the other nodes
200 /// allowing to add local counters at any time for more fine grained process,
201 /// which is useful to indicate progress during early iterations, and to have
202 /// node counts that exactly match the non-MPI code in the single rank case.
203 /// This call also propagates the stop signal between ranks.
204 void signals_process() {
205
206   nodesSearchedOthers = signalsRecv[SIG_NODES] - signalsSend[SIG_NODES];
207   tbHitsOthers = signalsRecv[SIG_TB] - signalsSend[SIG_TB];
208   TTsavesOthers = signalsRecv[SIG_TTS] - signalsSend[SIG_TTS];
209   stopSignalsPosted = signalsRecv[SIG_STOP];
210   if (signalsRecv[SIG_STOP] > 0)
211       Threads.stop = true;
212 }
213
214 void sendrecv_post() {
215
216    ++sendRecvPosted;
217    MPI_Irecv(TTSendRecvBuffs[sendRecvPosted       % 2].data(),
218              TTSendRecvBuffs[sendRecvPosted       % 2].size() * sizeof(KeyedTTEntry), MPI_BYTE,
219              (rank() + size() - 1) % size(), 42, TTComm, &reqsTTSendRecv[0]);
220    MPI_Isend(TTSendRecvBuffs[(sendRecvPosted + 1) % 2].data(),
221              TTSendRecvBuffs[(sendRecvPosted + 1) % 2].size() * sizeof(KeyedTTEntry), MPI_BYTE,
222              (rank() + 1         ) % size(), 42, TTComm, &reqsTTSendRecv[1]);
223 }
224
225 /// During search, most message passing is asynchronous, but at the end of
226 /// search it makes sense to bring them to a common, finalized state.
227 void signals_sync() {
228
229   while(stopSignalsPosted < uint64_t(size()))
230       signals_poll();
231
232   // Finalize outstanding messages of the signal loops.
233   // We might have issued one call less than needed on some ranks.
234   uint64_t globalCounter;
235   MPI_Allreduce(&signalsCallCounter, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
236   if (signalsCallCounter < globalCounter)
237   {
238       MPI_Wait(&reqSignals, MPI_STATUS_IGNORE);
239       signals_send();
240   }
241   assert(signalsCallCounter == globalCounter);
242   MPI_Wait(&reqSignals, MPI_STATUS_IGNORE);
243   signals_process();
244
245   // Finalize outstanding messages in the sendRecv loop
246   MPI_Allreduce(&sendRecvPosted, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
247   while (sendRecvPosted < globalCounter)
248   {
249       MPI_Waitall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), MPI_STATUSES_IGNORE);
250       sendrecv_post();
251   }
252   assert(sendRecvPosted == globalCounter);
253   MPI_Waitall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), MPI_STATUSES_IGNORE);
254
255 }
256
257 /// Initialize signal counters to zero.
258 void signals_init() {
259
260   stopSignalsPosted = tbHitsOthers = TTsavesOthers = nodesSearchedOthers = 0;
261
262   signalsSend[SIG_NODES] = signalsRecv[SIG_NODES] = 0;
263   signalsSend[SIG_TB] = signalsRecv[SIG_TB] = 0;
264   signalsSend[SIG_TTS] = signalsRecv[SIG_TTS] = 0;
265   signalsSend[SIG_STOP] = signalsRecv[SIG_STOP] = 0;
266
267 }
268
269 /// Poll the signal loop, and start next round as needed.
270 void signals_poll() {
271
272   int flag;
273   MPI_Test(&reqSignals, &flag, MPI_STATUS_IGNORE);
274   if (flag)
275   {
276      signals_process();
277      signals_send();
278   }
279 }
280
281 /// Provide basic info related the cluster performance, in particular, the number of signals send,
282 /// signals per sounds (sps), the number of gathers, the number of positions gathered (per node and per second, gpps)
283 /// The number of TT saves and TT saves per second. If gpps equals approximately TTSavesps the gather loop has enough bandwidth.
284 void cluster_info(Depth depth) {
285
286   TimePoint elapsed = Time.elapsed() + 1;
287   uint64_t TTSaves = TT_saves();
288
289   sync_cout << "info depth " << depth << " cluster "
290             << " signals " << signalsCallCounter << " sps " << signalsCallCounter * 1000 / elapsed
291             << " sendRecvs " << sendRecvPosted << " srpps " <<  TTSendRecvBuffs[0].size() * sendRecvPosted * 1000 / elapsed
292             << " TTSaves " << TTSaves << " TTSavesps " << TTSaves * 1000 / elapsed
293             << sync_endl;
294 }
295
296 /// When a TT entry is saved, additional steps are taken if the entry is of sufficient depth.
297 /// If sufficient entries has been collected, a communication is initiated.
298 /// If a communication has been completed, the received results are saved to the TT.
299 void save(Thread* thread, TTEntry* tte,
300           Key k, Value v, bool PvHit, Bound b, Depth d, Move m, Value ev) {
301
302   // Standard save to the TT
303   tte->save(k, v, PvHit, b, d, m, ev);
304
305   // If the entry is of sufficient depth to be worth communicating, take action.
306   if (d > 3)
307   {
308      // count the TTsaves to information: this should be relatively similar
309      // to the number of entries we can send/recv.
310      thread->TTsaves.fetch_add(1, std::memory_order_relaxed);
311
312      // Add to thread's send buffer, the locking here avoids races when the master thread
313      // prepares the send buffer.
314      {
315          std::lock_guard<std::mutex> lk(thread->ttCache.mutex);
316          thread->ttCache.buffer.replace(KeyedTTEntry(k,*tte));
317          ++TTCacheCounter;
318      }
319
320      size_t recvBuffPerRankSize = Threads.size() * TTCacheSize;
321
322      // Communicate on main search thread, as soon the threads combined have collected
323      // sufficient data to fill the send buffers.
324      if (thread == Threads.main() && TTCacheCounter > recvBuffPerRankSize)
325      {
326          // Test communication status
327          int flag;
328          MPI_Testall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), &flag, MPI_STATUSES_IGNORE);
329
330          // Current communication is complete
331          if (flag)
332          {
333              // Save all received entries to TT, and store our TTCaches, ready for the next round of communication
334              for (size_t irank = 0; irank < size_t(size()) ; ++irank)
335              {
336                  if (irank == size_t(rank())) // this is our part, fill the part of the buffer for sending
337                  {
338                     // Copy from the thread caches to the right spot in the buffer
339                     size_t i = irank * recvBuffPerRankSize;
340                     for (auto&& th : Threads)
341                     {
342                         std::lock_guard<std::mutex> lk(th->ttCache.mutex);
343
344                         for (auto&& e : th->ttCache.buffer)
345                             TTSendRecvBuffs[sendRecvPosted % 2][i++] = e;
346
347                         // Reset thread's send buffer
348                         th->ttCache.buffer = {};
349                     }
350
351                     TTCacheCounter = 0;
352                  }
353                  else // process data received from the corresponding rank.
354                     for (size_t i = irank * recvBuffPerRankSize; i < (irank + 1) * recvBuffPerRankSize; ++i)
355                     {
356                         auto&& e = TTSendRecvBuffs[sendRecvPosted % 2][i];
357                         bool found;
358                         TTEntry* replace_tte;
359                         replace_tte = TT.probe(e.first, found);
360                         replace_tte->save(e.first, e.second.value(), e.second.is_pv(), e.second.bound(), e.second.depth(),
361                                           e.second.move(), e.second.eval());
362                     }
363              }
364
365              // Start next communication
366              sendrecv_post();
367
368              // Force check of time on the next occasion, the above actions might have taken some time.
369              static_cast<MainThread*>(thread)->callsCnt = 0;
370
371          }
372      }
373   }
374 }
375
376 /// Picks the bestMove across ranks, and send the associated info and PV to the root of the cluster.
377 /// Note that this bestMove and PV must be output by the root, the guarantee proper ordering of output.
378 /// TODO update to the scheme in master.. can this use aggregation of votes?
379 void pick_moves(MoveInfo& mi, std::string& PVLine) {
380
381   MoveInfo* pMoveInfo = NULL;
382   if (is_root())
383   {
384       pMoveInfo = (MoveInfo*)malloc(sizeof(MoveInfo) * size());
385   }
386   MPI_Gather(&mi, 1, MIDatatype, pMoveInfo, 1, MIDatatype, 0, MoveComm);
387
388   if (is_root())
389   {
390       std::map<int, int> votes;
391       int minScore = pMoveInfo[0].score;
392       for (int i = 0; i < size(); ++i)
393       {
394           minScore = std::min(minScore, pMoveInfo[i].score);
395           votes[pMoveInfo[i].move] = 0;
396       }
397       for (int i = 0; i < size(); ++i)
398       {
399           votes[pMoveInfo[i].move] += pMoveInfo[i].score - minScore + pMoveInfo[i].depth;
400       }
401       int bestVote = votes[pMoveInfo[0].move];
402       for (int i = 0; i < size(); ++i)
403       {
404           if (votes[pMoveInfo[i].move] > bestVote)
405           {
406               bestVote = votes[pMoveInfo[i].move];
407               mi = pMoveInfo[i];
408           }
409       }
410       free(pMoveInfo);
411   }
412
413   // Send around the final result
414   MPI_Bcast(&mi, 1, MIDatatype, 0, MoveComm);
415
416   // Send PV line to root as needed
417   if (mi.rank != 0 && mi.rank == rank()) {
418       int size;
419       std::vector<char> vec;
420       vec.assign(PVLine.begin(), PVLine.end());
421       size = vec.size();
422       MPI_Send(&size, 1, MPI_INT, 0, 42, MoveComm);
423       MPI_Send(vec.data(), size, MPI_CHAR, 0, 42, MoveComm);
424   }
425   if (mi.rank != 0 && is_root()) {
426       int size;
427       std::vector<char> vec;
428       MPI_Recv(&size, 1, MPI_INT, mi.rank, 42, MoveComm, MPI_STATUS_IGNORE);
429       vec.resize(size);
430       MPI_Recv(vec.data(), size, MPI_CHAR, mi.rank, 42, MoveComm, MPI_STATUS_IGNORE);
431       PVLine.assign(vec.begin(), vec.end());
432   }
433
434 }
435
436 /// Return nodes searched (lazily updated cluster wide in the signal loop)
437 uint64_t nodes_searched() {
438
439   return nodesSearchedOthers + Threads.nodes_searched();
440 }
441
442 /// Return table base hits (lazily updated cluster wide in the signal loop)
443 uint64_t tb_hits() {
444
445   return tbHitsOthers + Threads.tb_hits();
446 }
447
448 /// Return the number of saves to the TT buffers, (lazily updated cluster wide in the signal loop)
449 uint64_t TT_saves() {
450
451   return TTsavesOthers + Threads.TT_saves();
452 }
453
454
455 }
456
457 #else
458
459 #include "cluster.h"
460 #include "thread.h"
461
462 namespace Cluster {
463
464 uint64_t nodes_searched() {
465
466   return Threads.nodes_searched();
467 }
468
469 uint64_t tb_hits() {
470
471   return Threads.tb_hits();
472 }
473
474 uint64_t TT_saves() {
475
476   return Threads.TT_saves();
477 }
478
479 }
480
481 #endif // USE_MPI