]> git.sesse.net Git - stockfish/blob - src/cluster.cpp
Merge NNUE (master) in the cluster branch.
[stockfish] / src / cluster.cpp
1 /*
2   Stockfish, a UCI chess playing engine derived from Glaurung 2.1
3   Copyright (C) 2004-2020 The Stockfish developers (see AUTHORS file)
4
5   Stockfish is free software: you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation, either version 3 of the License, or
8   (at your option) any later version.
9
10   Stockfish is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #ifdef USE_MPI
20
21 #include <array>
22 #include <map>
23 #include <cstddef>
24 #include <cstdlib>
25 #include <iostream>
26 #include <istream>
27 #include <mpi.h>
28 #include <string>
29 #include <vector>
30
31 #include "cluster.h"
32 #include "thread.h"
33 #include "tt.h"
34 #include "timeman.h"
35
36 namespace Cluster {
37
38 // Total number of ranks and rank within the communicator
39 static int world_rank = MPI_PROC_NULL;
40 static int world_size = 0;
41
42 // Signals between ranks exchange basic info using a dedicated communicator
43 static MPI_Comm signalsComm = MPI_COMM_NULL;
44 static MPI_Request reqSignals = MPI_REQUEST_NULL;
45 static uint64_t signalsCallCounter = 0;
46
47 // Signals are the number of nodes searched, stop, table base hits, transposition table saves
48 enum Signals : int { SIG_NODES = 0, SIG_STOP = 1, SIG_TB = 2, SIG_TTS = 3, SIG_NB = 4};
49 static uint64_t signalsSend[SIG_NB] = {};
50 static uint64_t signalsRecv[SIG_NB] = {};
51 static uint64_t nodesSearchedOthers = 0;
52 static uint64_t tbHitsOthers = 0;
53 static uint64_t TTsavesOthers = 0;
54 static uint64_t stopSignalsPosted = 0;
55
56 // The UCI threads of each rank exchange use a dedicated communicator
57 static MPI_Comm InputComm = MPI_COMM_NULL;
58
59 // bestMove requires MoveInfo communicators and data types
60 static MPI_Comm MoveComm = MPI_COMM_NULL;
61 static MPI_Datatype MIDatatype = MPI_DATATYPE_NULL;
62
63 // TT entries are communicated with a dedicated communicator.
64 // The receive buffer is used to gather information from all ranks.
65 // THe TTCacheCounter tracks the number of local elements that are ready to be sent.
66 static MPI_Comm TTComm = MPI_COMM_NULL;
67 static std::array<std::vector<KeyedTTEntry>, 2> TTSendRecvBuffs;
68 static std::array<MPI_Request, 2> reqsTTSendRecv = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
69 static uint64_t sendRecvPosted = 0;
70 static std::atomic<uint64_t> TTCacheCounter = {};
71
72 /// Initialize MPI and associated data types. Note that the MPI library must be configured
73 /// to support MPI_THREAD_MULTIPLE, since multiple threads access MPI simultaneously.
74 void init() {
75
76   int thread_support;
77   MPI_Init_thread(nullptr, nullptr, MPI_THREAD_MULTIPLE, &thread_support);
78   if (thread_support < MPI_THREAD_MULTIPLE)
79   {
80       std::cerr << "Stockfish requires support for MPI_THREAD_MULTIPLE."
81                 << std::endl;
82       std::exit(EXIT_FAILURE);
83   }
84
85   MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
86   MPI_Comm_size(MPI_COMM_WORLD, &world_size);
87
88   const std::array<MPI_Aint, 5> MIdisps = {offsetof(MoveInfo, move),
89                                            offsetof(MoveInfo, ponder),
90                                            offsetof(MoveInfo, depth),
91                                            offsetof(MoveInfo, score),
92                                            offsetof(MoveInfo, rank)};
93   MPI_Type_create_hindexed_block(5, 1, MIdisps.data(), MPI_INT, &MIDatatype);
94   MPI_Type_commit(&MIDatatype);
95
96   MPI_Comm_dup(MPI_COMM_WORLD, &InputComm);
97   MPI_Comm_dup(MPI_COMM_WORLD, &TTComm);
98   MPI_Comm_dup(MPI_COMM_WORLD, &MoveComm);
99   MPI_Comm_dup(MPI_COMM_WORLD, &signalsComm);
100 }
101
102 /// Finalize MPI and free the associated data types.
103 void finalize() {
104
105   MPI_Type_free(&MIDatatype);
106
107   MPI_Comm_free(&InputComm);
108   MPI_Comm_free(&TTComm);
109   MPI_Comm_free(&MoveComm);
110   MPI_Comm_free(&signalsComm);
111
112   MPI_Finalize();
113 }
114
115 /// Return the total number of ranks
116 int size() {
117
118   return world_size;
119 }
120
121 /// Return the rank (index) of the process
122 int rank() {
123
124   return world_rank;
125 }
126
127 /// The receive buffer depends on the number of MPI ranks and threads, resize as needed
128 void ttSendRecvBuff_resize(size_t nThreads) {
129
130   for (int i : {0, 1})
131   {
132      TTSendRecvBuffs[i].resize(TTCacheSize * world_size * nThreads);
133      std::fill(TTSendRecvBuffs[i].begin(), TTSendRecvBuffs[i].end(), KeyedTTEntry());
134   }
135 }
136
137 /// As input is only received by the root (rank 0) of the cluster, this input must be relayed
138 /// to the UCI threads of all ranks, in order to setup the position, etc. We do this with a
139 /// dedicated getline implementation, where the root broadcasts to all other ranks the received
140 /// information.
141 bool getline(std::istream& input, std::string& str) {
142
143   int size;
144   std::vector<char> vec;
145   bool state;
146
147   if (is_root())
148   {
149       state = static_cast<bool>(std::getline(input, str));
150       vec.assign(str.begin(), str.end());
151       size = vec.size();
152   }
153
154   // Some MPI implementations use busy-wait polling, while we need yielding as otherwise
155   // the UCI thread on the non-root ranks would be consuming resources.
156   static MPI_Request reqInput = MPI_REQUEST_NULL;
157   MPI_Ibcast(&size, 1, MPI_INT, 0, InputComm, &reqInput);
158   if (is_root())
159       MPI_Wait(&reqInput, MPI_STATUS_IGNORE);
160   else
161   {
162       while (true)
163       {
164           int flag;
165           MPI_Test(&reqInput, &flag, MPI_STATUS_IGNORE);
166           if (flag)
167               break;
168           else
169               std::this_thread::sleep_for(std::chrono::milliseconds(10));
170       }
171   }
172
173   // Broadcast received string
174   if (!is_root())
175       vec.resize(size);
176   MPI_Bcast(vec.data(), size, MPI_CHAR, 0, InputComm);
177   if (!is_root())
178       str.assign(vec.begin(), vec.end());
179   MPI_Bcast(&state, 1, MPI_CXX_BOOL, 0, InputComm);
180
181   return state;
182 }
183
184 /// Sending part of the signal communication loop
185 void signals_send() {
186
187   signalsSend[SIG_NODES] = Threads.nodes_searched();
188   signalsSend[SIG_TB] = Threads.tb_hits();
189   signalsSend[SIG_TTS] = Threads.TT_saves();
190   signalsSend[SIG_STOP] = Threads.stop;
191   MPI_Iallreduce(signalsSend, signalsRecv, SIG_NB, MPI_UINT64_T,
192                  MPI_SUM, signalsComm, &reqSignals);
193   ++signalsCallCounter;
194 }
195
196 /// Processing part of the signal communication loop.
197 /// For some counters (e.g. nodes) we only keep their sum on the other nodes
198 /// allowing to add local counters at any time for more fine grained process,
199 /// which is useful to indicate progress during early iterations, and to have
200 /// node counts that exactly match the non-MPI code in the single rank case.
201 /// This call also propagates the stop signal between ranks.
202 void signals_process() {
203
204   nodesSearchedOthers = signalsRecv[SIG_NODES] - signalsSend[SIG_NODES];
205   tbHitsOthers = signalsRecv[SIG_TB] - signalsSend[SIG_TB];
206   TTsavesOthers = signalsRecv[SIG_TTS] - signalsSend[SIG_TTS];
207   stopSignalsPosted = signalsRecv[SIG_STOP];
208   if (signalsRecv[SIG_STOP] > 0)
209       Threads.stop = true;
210 }
211
212 void sendrecv_post() {
213
214    ++sendRecvPosted;
215    MPI_Irecv(TTSendRecvBuffs[sendRecvPosted       % 2].data(),
216              TTSendRecvBuffs[sendRecvPosted       % 2].size() * sizeof(KeyedTTEntry), MPI_BYTE,
217              (rank() + size() - 1) % size(), 42, TTComm, &reqsTTSendRecv[0]);
218    MPI_Isend(TTSendRecvBuffs[(sendRecvPosted + 1) % 2].data(),
219              TTSendRecvBuffs[(sendRecvPosted + 1) % 2].size() * sizeof(KeyedTTEntry), MPI_BYTE,
220              (rank() + 1         ) % size(), 42, TTComm, &reqsTTSendRecv[1]);
221 }
222
223 /// During search, most message passing is asynchronous, but at the end of
224 /// search it makes sense to bring them to a common, finalized state.
225 void signals_sync() {
226
227   while(stopSignalsPosted < uint64_t(size()))
228       signals_poll();
229
230   // Finalize outstanding messages of the signal loops.
231   // We might have issued one call less than needed on some ranks.
232   uint64_t globalCounter;
233   MPI_Allreduce(&signalsCallCounter, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
234   if (signalsCallCounter < globalCounter)
235   {
236       MPI_Wait(&reqSignals, MPI_STATUS_IGNORE);
237       signals_send();
238   }
239   assert(signalsCallCounter == globalCounter);
240   MPI_Wait(&reqSignals, MPI_STATUS_IGNORE);
241   signals_process();
242
243   // Finalize outstanding messages in the sendRecv loop
244   MPI_Allreduce(&sendRecvPosted, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
245   while (sendRecvPosted < globalCounter)
246   {
247       MPI_Waitall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), MPI_STATUSES_IGNORE);
248       sendrecv_post();
249   }
250   assert(sendRecvPosted == globalCounter);
251   MPI_Waitall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), MPI_STATUSES_IGNORE);
252
253 }
254
255 /// Initialize signal counters to zero.
256 void signals_init() {
257
258   stopSignalsPosted = tbHitsOthers = TTsavesOthers = nodesSearchedOthers = 0;
259
260   signalsSend[SIG_NODES] = signalsRecv[SIG_NODES] = 0;
261   signalsSend[SIG_TB] = signalsRecv[SIG_TB] = 0;
262   signalsSend[SIG_TTS] = signalsRecv[SIG_TTS] = 0;
263   signalsSend[SIG_STOP] = signalsRecv[SIG_STOP] = 0;
264
265 }
266
267 /// Poll the signal loop, and start next round as needed.
268 void signals_poll() {
269
270   int flag;
271   MPI_Test(&reqSignals, &flag, MPI_STATUS_IGNORE);
272   if (flag)
273   {
274      signals_process();
275      signals_send();
276   }
277 }
278
279 /// Provide basic info related the cluster performance, in particular, the number of signals send,
280 /// signals per sounds (sps), the number of gathers, the number of positions gathered (per node and per second, gpps)
281 /// The number of TT saves and TT saves per second. If gpps equals approximately TTSavesps the gather loop has enough bandwidth.
282 void cluster_info(Depth depth) {
283
284   TimePoint elapsed = Time.elapsed() + 1;
285   uint64_t TTSaves = TT_saves();
286
287   sync_cout << "info depth " << depth << " cluster "
288             << " signals " << signalsCallCounter << " sps " << signalsCallCounter * 1000 / elapsed
289             << " sendRecvs " << sendRecvPosted << " srpps " <<  TTSendRecvBuffs[0].size() * sendRecvPosted * 1000 / elapsed
290             << " TTSaves " << TTSaves << " TTSavesps " << TTSaves * 1000 / elapsed
291             << sync_endl;
292 }
293
294 /// When a TT entry is saved, additional steps are taken if the entry is of sufficient depth.
295 /// If sufficient entries has been collected, a communication is initiated.
296 /// If a communication has been completed, the received results are saved to the TT.
297 void save(Thread* thread, TTEntry* tte,
298           Key k, Value v, bool PvHit, Bound b, Depth d, Move m, Value ev) {
299
300   // Standard save to the TT
301   tte->save(k, v, PvHit, b, d, m, ev);
302
303   // If the entry is of sufficient depth to be worth communicating, take action.
304   if (d > 3)
305   {
306      // count the TTsaves to information: this should be relatively similar
307      // to the number of entries we can send/recv.
308      thread->TTsaves.fetch_add(1, std::memory_order_relaxed);
309
310      // Add to thread's send buffer, the locking here avoids races when the master thread
311      // prepares the send buffer.
312      {
313          std::lock_guard<std::mutex> lk(thread->ttCache.mutex);
314          thread->ttCache.buffer.replace(KeyedTTEntry(k,*tte));
315          ++TTCacheCounter;
316      }
317
318      size_t recvBuffPerRankSize = Threads.size() * TTCacheSize;
319
320      // Communicate on main search thread, as soon the threads combined have collected
321      // sufficient data to fill the send buffers.
322      if (thread == Threads.main() && TTCacheCounter > recvBuffPerRankSize)
323      {
324          // Test communication status
325          int flag;
326          MPI_Testall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), &flag, MPI_STATUSES_IGNORE);
327
328          // Current communication is complete
329          if (flag)
330          {
331              // Save all received entries to TT, and store our TTCaches, ready for the next round of communication
332              for (size_t irank = 0; irank < size_t(size()) ; ++irank)
333              {
334                  if (irank == size_t(rank())) // this is our part, fill the part of the buffer for sending
335                  {
336                     // Copy from the thread caches to the right spot in the buffer
337                     size_t i = irank * recvBuffPerRankSize;
338                     for (auto&& th : Threads)
339                     {
340                         std::lock_guard<std::mutex> lk(th->ttCache.mutex);
341
342                         for (auto&& e : th->ttCache.buffer)
343                             TTSendRecvBuffs[sendRecvPosted % 2][i++] = e;
344
345                         // Reset thread's send buffer
346                         th->ttCache.buffer = {};
347                     }
348
349                     TTCacheCounter = 0;
350                  }
351                  else // process data received from the corresponding rank.
352                     for (size_t i = irank * recvBuffPerRankSize; i < (irank + 1) * recvBuffPerRankSize; ++i)
353                     {
354                         auto&& e = TTSendRecvBuffs[sendRecvPosted % 2][i];
355                         bool found;
356                         TTEntry* replace_tte;
357                         replace_tte = TT.probe(e.first, found);
358                         replace_tte->save(e.first, e.second.value(), e.second.is_pv(), e.second.bound(), e.second.depth(),
359                                           e.second.move(), e.second.eval());
360                     }
361              }
362
363              // Start next communication
364              sendrecv_post();
365
366              // Force check of time on the next occasion, the above actions might have taken some time.
367              static_cast<MainThread*>(thread)->callsCnt = 0;
368
369          }
370      }
371   }
372 }
373
374 /// Picks the bestMove across ranks, and send the associated info and PV to the root of the cluster.
375 /// Note that this bestMove and PV must be output by the root, the guarantee proper ordering of output.
376 /// TODO update to the scheme in master.. can this use aggregation of votes?
377 void pick_moves(MoveInfo& mi, std::string& PVLine) {
378
379   MoveInfo* pMoveInfo = NULL;
380   if (is_root())
381   {
382       pMoveInfo = (MoveInfo*)malloc(sizeof(MoveInfo) * size());
383   }
384   MPI_Gather(&mi, 1, MIDatatype, pMoveInfo, 1, MIDatatype, 0, MoveComm);
385
386   if (is_root())
387   {
388       std::map<int, int> votes;
389       int minScore = pMoveInfo[0].score;
390       for (int i = 0; i < size(); ++i)
391       {
392           minScore = std::min(minScore, pMoveInfo[i].score);
393           votes[pMoveInfo[i].move] = 0;
394       }
395       for (int i = 0; i < size(); ++i)
396       {
397           votes[pMoveInfo[i].move] += pMoveInfo[i].score - minScore + pMoveInfo[i].depth;
398       }
399       int bestVote = votes[pMoveInfo[0].move];
400       for (int i = 0; i < size(); ++i)
401       {
402           if (votes[pMoveInfo[i].move] > bestVote)
403           {
404               bestVote = votes[pMoveInfo[i].move];
405               mi = pMoveInfo[i];
406           }
407       }
408       free(pMoveInfo);
409   }
410
411   // Send around the final result
412   MPI_Bcast(&mi, 1, MIDatatype, 0, MoveComm);
413
414   // Send PV line to root as needed
415   if (mi.rank != 0 && mi.rank == rank()) {
416       int size;
417       std::vector<char> vec;
418       vec.assign(PVLine.begin(), PVLine.end());
419       size = vec.size();
420       MPI_Send(&size, 1, MPI_INT, 0, 42, MoveComm);
421       MPI_Send(vec.data(), size, MPI_CHAR, 0, 42, MoveComm);
422   }
423   if (mi.rank != 0 && is_root()) {
424       int size;
425       std::vector<char> vec;
426       MPI_Recv(&size, 1, MPI_INT, mi.rank, 42, MoveComm, MPI_STATUS_IGNORE);
427       vec.resize(size);
428       MPI_Recv(vec.data(), size, MPI_CHAR, mi.rank, 42, MoveComm, MPI_STATUS_IGNORE);
429       PVLine.assign(vec.begin(), vec.end());
430   }
431
432 }
433
434 /// Return nodes searched (lazily updated cluster wide in the signal loop)
435 uint64_t nodes_searched() {
436
437   return nodesSearchedOthers + Threads.nodes_searched();
438 }
439
440 /// Return table base hits (lazily updated cluster wide in the signal loop)
441 uint64_t tb_hits() {
442
443   return tbHitsOthers + Threads.tb_hits();
444 }
445
446 /// Return the number of saves to the TT buffers, (lazily updated cluster wide in the signal loop)
447 uint64_t TT_saves() {
448
449   return TTsavesOthers + Threads.TT_saves();
450 }
451
452
453 }
454
455 #else
456
457 #include "cluster.h"
458 #include "thread.h"
459
460 namespace Cluster {
461
462 uint64_t nodes_searched() {
463
464   return Threads.nodes_searched();
465 }
466
467 uint64_t tb_hits() {
468
469   return Threads.tb_hits();
470 }
471
472 uint64_t TT_saves() {
473
474   return Threads.TT_saves();
475 }
476
477 }
478
479 #endif // USE_MPI