]> git.sesse.net Git - stockfish/blobdiff - src/cluster.cpp
Merge branch 'master' into clusterMergeMaster12
[stockfish] / src / cluster.cpp
index 293a5c96f435889d2b4b12a87d86e6d2cca67fba..7d0cc0f8ad684a2c17224f034bc5ecaad6c9f3bc 100644 (file)
@@ -1,8 +1,6 @@
 /*
   Stockfish, a UCI chess playing engine derived from Glaurung 2.1
-  Copyright (C) 2004-2008 Tord Romstad (Glaurung author)
-  Copyright (C) 2008-2015 Marco Costalba, Joona Kiiski, Tord Romstad
-  Copyright (C) 2015-2018 Marco Costalba, Joona Kiiski, Gary Linscott, Tord Romstad
+  Copyright (C) 2004-2020 The Stockfish developers (see AUTHORS file)
 
   Stockfish is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
@@ -21,6 +19,7 @@
 #ifdef USE_MPI
 
 #include <array>
+#include <map>
 #include <cstddef>
 #include <cstdlib>
 #include <iostream>
 #include "cluster.h"
 #include "thread.h"
 #include "tt.h"
+#include "timeman.h"
 
+namespace Stockfish {
 namespace Cluster {
 
+// Total number of ranks and rank within the communicator
 static int world_rank = MPI_PROC_NULL;
 static int world_size = 0;
 
+// Signals between ranks exchange basic info using a dedicated communicator
+static MPI_Comm signalsComm = MPI_COMM_NULL;
 static MPI_Request reqSignals = MPI_REQUEST_NULL;
 static uint64_t signalsCallCounter = 0;
 
-enum Signals : int { SIG_NODES = 0, SIG_STOP = 1, SIG_TB = 2, SIG_NB = 3};
+// Signals are the number of nodes searched, stop, table base hits, transposition table saves
+enum Signals : int { SIG_NODES = 0, SIG_STOP = 1, SIG_TB = 2, SIG_TTS = 3, SIG_NB = 4};
 static uint64_t signalsSend[SIG_NB] = {};
 static uint64_t signalsRecv[SIG_NB] = {};
-
 static uint64_t nodesSearchedOthers = 0;
 static uint64_t tbHitsOthers = 0;
+static uint64_t TTsavesOthers = 0;
 static uint64_t stopSignalsPosted = 0;
 
+// The UCI threads of each rank exchange use a dedicated communicator
 static MPI_Comm InputComm = MPI_COMM_NULL;
-static MPI_Comm TTComm = MPI_COMM_NULL;
-static MPI_Comm MoveComm = MPI_COMM_NULL;
-static MPI_Comm signalsComm = MPI_COMM_NULL;
-
-static std::vector<KeyedTTEntry> TTBuff;
 
+// bestMove requires MoveInfo communicators and data types
+static MPI_Comm MoveComm = MPI_COMM_NULL;
 static MPI_Datatype MIDatatype = MPI_DATATYPE_NULL;
 
+// TT entries are communicated with a dedicated communicator.
+// The receive buffer is used to gather information from all ranks.
+// THe TTCacheCounter tracks the number of local elements that are ready to be sent.
+static MPI_Comm TTComm = MPI_COMM_NULL;
+static std::array<std::vector<KeyedTTEntry>, 2> TTSendRecvBuffs;
+static std::array<MPI_Request, 2> reqsTTSendRecv = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
+static uint64_t sendRecvPosted = 0;
+static std::atomic<uint64_t> TTCacheCounter = {};
+
+/// Initialize MPI and associated data types. Note that the MPI library must be configured
+/// to support MPI_THREAD_MULTIPLE, since multiple threads access MPI simultaneously.
 void init() {
 
   int thread_support;
@@ -72,13 +86,12 @@ void init() {
   MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
   MPI_Comm_size(MPI_COMM_WORLD, &world_size);
 
-  TTBuff.resize(TTSendBufferSize * world_size);
-
-  const std::array<MPI_Aint, 4> MIdisps = {offsetof(MoveInfo, move),
+  const std::array<MPI_Aint, 5> MIdisps = {offsetof(MoveInfo, move),
+                                           offsetof(MoveInfo, ponder),
                                            offsetof(MoveInfo, depth),
                                            offsetof(MoveInfo, score),
                                            offsetof(MoveInfo, rank)};
-  MPI_Type_create_hindexed_block(4, 1, MIdisps.data(), MPI_INT, &MIDatatype);
+  MPI_Type_create_hindexed_block(5, 1, MIdisps.data(), MPI_INT, &MIDatatype);
   MPI_Type_commit(&MIDatatype);
 
   MPI_Comm_dup(MPI_COMM_WORLD, &InputComm);
@@ -87,10 +100,9 @@ void init() {
   MPI_Comm_dup(MPI_COMM_WORLD, &signalsComm);
 }
 
+/// Finalize MPI and free the associated data types.
 void finalize() {
 
-
-  // free data tyes and communicators
   MPI_Type_free(&MIDatatype);
 
   MPI_Comm_free(&InputComm);
@@ -101,17 +113,32 @@ void finalize() {
   MPI_Finalize();
 }
 
+/// Return the total number of ranks
 int size() {
 
   return world_size;
 }
 
+/// Return the rank (index) of the process
 int rank() {
 
   return world_rank;
 }
 
+/// The receive buffer depends on the number of MPI ranks and threads, resize as needed
+void ttSendRecvBuff_resize(size_t nThreads) {
+
+  for (int i : {0, 1})
+  {
+     TTSendRecvBuffs[i].resize(TTCacheSize * world_size * nThreads);
+     std::fill(TTSendRecvBuffs[i].begin(), TTSendRecvBuffs[i].end(), KeyedTTEntry());
+  }
+}
 
+/// As input is only received by the root (rank 0) of the cluster, this input must be relayed
+/// to the UCI threads of all ranks, in order to setup the position, etc. We do this with a
+/// dedicated getline implementation, where the root broadcasts to all other ranks the received
+/// information.
 bool getline(std::istream& input, std::string& str) {
 
   int size;
@@ -125,7 +152,8 @@ bool getline(std::istream& input, std::string& str) {
       size = vec.size();
   }
 
-  // Some MPI implementations use busy-wait polling, while we need yielding
+  // Some MPI implementations use busy-wait polling, while we need yielding as otherwise
+  // the UCI thread on the non-root ranks would be consuming resources.
   static MPI_Request reqInput = MPI_REQUEST_NULL;
   MPI_Ibcast(&size, 1, MPI_INT, 0, InputComm, &reqInput);
   if (is_root())
@@ -143,6 +171,7 @@ bool getline(std::istream& input, std::string& str) {
       }
   }
 
+  // Broadcast received string
   if (!is_root())
       vec.resize(size);
   MPI_Bcast(vec.data(), size, MPI_CHAR, 0, InputComm);
@@ -153,54 +182,90 @@ bool getline(std::istream& input, std::string& str) {
   return state;
 }
 
+/// Sending part of the signal communication loop
 void signals_send() {
 
   signalsSend[SIG_NODES] = Threads.nodes_searched();
   signalsSend[SIG_TB] = Threads.tb_hits();
+  signalsSend[SIG_TTS] = Threads.TT_saves();
   signalsSend[SIG_STOP] = Threads.stop;
   MPI_Iallreduce(signalsSend, signalsRecv, SIG_NB, MPI_UINT64_T,
                  MPI_SUM, signalsComm, &reqSignals);
   ++signalsCallCounter;
 }
 
+/// Processing part of the signal communication loop.
+/// For some counters (e.g. nodes) we only keep their sum on the other nodes
+/// allowing to add local counters at any time for more fine grained process,
+/// which is useful to indicate progress during early iterations, and to have
+/// node counts that exactly match the non-MPI code in the single rank case.
+/// This call also propagates the stop signal between ranks.
 void signals_process() {
 
   nodesSearchedOthers = signalsRecv[SIG_NODES] - signalsSend[SIG_NODES];
   tbHitsOthers = signalsRecv[SIG_TB] - signalsSend[SIG_TB];
+  TTsavesOthers = signalsRecv[SIG_TTS] - signalsSend[SIG_TTS];
   stopSignalsPosted = signalsRecv[SIG_STOP];
   if (signalsRecv[SIG_STOP] > 0)
       Threads.stop = true;
 }
 
+void sendrecv_post() {
+
+   ++sendRecvPosted;
+   MPI_Irecv(TTSendRecvBuffs[sendRecvPosted       % 2].data(),
+             TTSendRecvBuffs[sendRecvPosted       % 2].size() * sizeof(KeyedTTEntry), MPI_BYTE,
+             (rank() + size() - 1) % size(), 42, TTComm, &reqsTTSendRecv[0]);
+   MPI_Isend(TTSendRecvBuffs[(sendRecvPosted + 1) % 2].data(),
+             TTSendRecvBuffs[(sendRecvPosted + 1) % 2].size() * sizeof(KeyedTTEntry), MPI_BYTE,
+             (rank() + 1         ) % size(), 42, TTComm, &reqsTTSendRecv[1]);
+}
+
+/// During search, most message passing is asynchronous, but at the end of
+/// search it makes sense to bring them to a common, finalized state.
 void signals_sync() {
 
   while(stopSignalsPosted < uint64_t(size()))
       signals_poll();
 
-  // finalize outstanding messages of the signal loops. We might have issued one call less than needed on some ranks.
+  // Finalize outstanding messages of the signal loops.
+  // We might have issued one call less than needed on some ranks.
   uint64_t globalCounter;
-  MPI_Allreduce(&signalsCallCounter, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm); // MoveComm needed
+  MPI_Allreduce(&signalsCallCounter, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
   if (signalsCallCounter < globalCounter)
+  {
+      MPI_Wait(&reqSignals, MPI_STATUS_IGNORE);
       signals_send();
-
+  }
   assert(signalsCallCounter == globalCounter);
-
   MPI_Wait(&reqSignals, MPI_STATUS_IGNORE);
-
   signals_process();
 
+  // Finalize outstanding messages in the sendRecv loop
+  MPI_Allreduce(&sendRecvPosted, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
+  while (sendRecvPosted < globalCounter)
+  {
+      MPI_Waitall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), MPI_STATUSES_IGNORE);
+      sendrecv_post();
+  }
+  assert(sendRecvPosted == globalCounter);
+  MPI_Waitall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), MPI_STATUSES_IGNORE);
+
 }
 
+/// Initialize signal counters to zero.
 void signals_init() {
 
-  stopSignalsPosted = tbHitsOthers = nodesSearchedOthers = 0;
+  stopSignalsPosted = tbHitsOthers = TTsavesOthers = nodesSearchedOthers = 0;
 
   signalsSend[SIG_NODES] = signalsRecv[SIG_NODES] = 0;
   signalsSend[SIG_TB] = signalsRecv[SIG_TB] = 0;
+  signalsSend[SIG_TTS] = signalsRecv[SIG_TTS] = 0;
   signalsSend[SIG_STOP] = signalsRecv[SIG_STOP] = 0;
 
 }
 
+/// Poll the signal loop, and start next round as needed.
 void signals_poll() {
 
   int flag;
@@ -212,76 +277,105 @@ void signals_poll() {
   }
 }
 
+/// Provide basic info related the cluster performance, in particular, the number of signals send,
+/// signals per sounds (sps), the number of gathers, the number of positions gathered (per node and per second, gpps)
+/// The number of TT saves and TT saves per second. If gpps equals approximately TTSavesps the gather loop has enough bandwidth.
+void cluster_info(Depth depth) {
+
+  TimePoint elapsed = Time.elapsed() + 1;
+  uint64_t TTSaves = TT_saves();
+
+  sync_cout << "info depth " << depth << " cluster "
+            << " signals " << signalsCallCounter << " sps " << signalsCallCounter * 1000 / elapsed
+            << " sendRecvs " << sendRecvPosted << " srpps " <<  TTSendRecvBuffs[0].size() * sendRecvPosted * 1000 / elapsed
+            << " TTSaves " << TTSaves << " TTSavesps " << TTSaves * 1000 / elapsed
+            << sync_endl;
+}
+
+/// When a TT entry is saved, additional steps are taken if the entry is of sufficient depth.
+/// If sufficient entries has been collected, a communication is initiated.
+/// If a communication has been completed, the received results are saved to the TT.
 void save(Thread* thread, TTEntry* tte,
-          Key k, Value v, Bound b, Depth d, Move m, Value ev) {
+          Key k, Value v, bool PvHit, Bound b, Depth d, Move m, Value ev) {
 
-  tte->save(k, v, b, d, m, ev);
+  // Standard save to the TT
+  tte->save(k, v, PvHit, b, d, m, ev);
 
-  if (d > 5 * ONE_PLY)
+  // If the entry is of sufficient depth to be worth communicating, take action.
+  if (d > 3)
   {
-     // Try to add to thread's send buffer
+     // count the TTsaves to information: this should be relatively similar
+     // to the number of entries we can send/recv.
+     thread->TTsaves.fetch_add(1, std::memory_order_relaxed);
+
+     // Add to thread's send buffer, the locking here avoids races when the master thread
+     // prepares the send buffer.
      {
-         std::lock_guard<Mutex> lk(thread->ttBuffer.mutex);
-         thread->ttBuffer.buffer.replace(KeyedTTEntry(k,*tte));
-         ++thread->ttBuffer.counter;
+         std::lock_guard<std::mutex> lk(thread->ttCache.mutex);
+         thread->ttCache.buffer.replace(KeyedTTEntry(k,*tte));
+        ++TTCacheCounter;
      }
 
-     // Communicate on main search thread
-     if (thread == Threads.main() && thread->ttBuffer.counter * Threads.size() > TTSendBufferSize)
-     {
-         static MPI_Request req = MPI_REQUEST_NULL;
-         static TTSendBuffer<TTSendBufferSize> send_buff = {};
-         int flag;
+     size_t recvBuffPerRankSize = Threads.size() * TTCacheSize;
 
+     // Communicate on main search thread, as soon the threads combined have collected
+     // sufficient data to fill the send buffers.
+     if (thread == Threads.main() && TTCacheCounter > recvBuffPerRankSize)
+     {
          // Test communication status
-         MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
+         int flag;
+         MPI_Testall(reqsTTSendRecv.size(), reqsTTSendRecv.data(), &flag, MPI_STATUSES_IGNORE);
 
          // Current communication is complete
          if (flag)
          {
-             // Save all received entries (except ours)
+             // Save all received entries to TT, and store our TTCaches, ready for the next round of communication
              for (size_t irank = 0; irank < size_t(size()) ; ++irank)
              {
-                 if (irank == size_t(rank()))
-                     continue;
-
-                 for (size_t i = irank * TTSendBufferSize ; i < (irank + 1) * TTSendBufferSize; ++i)
+                 if (irank == size_t(rank())) // this is our part, fill the part of the buffer for sending
                  {
-                     auto&& e = TTBuff[i];
-                     bool found;
-                     TTEntry* replace_tte;
-                     replace_tte = TT.probe(e.first, found);
-                     replace_tte->save(e.first, e.second.value(), e.second.bound(), e.second.depth(),
-                                       e.second.move(), e.second.eval());
-                 }
-             }
+                    // Copy from the thread caches to the right spot in the buffer
+                    size_t i = irank * recvBuffPerRankSize;
+                    for (auto&& th : Threads)
+                    {
+                        std::lock_guard<std::mutex> lk(th->ttCache.mutex);
 
-             // Reset send buffer
-             send_buff = {};
+                        for (auto&& e : th->ttCache.buffer)
+                            TTSendRecvBuffs[sendRecvPosted % 2][i++] = e;
 
-             // Build up new send buffer: best 16 found across all threads
-             for (auto&& th : Threads)
-             {
-                 std::lock_guard<Mutex> lk(th->ttBuffer.mutex);
-                 for (auto&& e : th->ttBuffer.buffer)
-                     send_buff.replace(e);
-                 // Reset thread's send buffer
-                 th->ttBuffer.buffer = {};
-                 th->ttBuffer.counter = 0;
+                        // Reset thread's send buffer
+                        th->ttCache.buffer = {};
+                    }
+
+                   TTCacheCounter = 0;
+                 }
+                 else // process data received from the corresponding rank.
+                    for (size_t i = irank * recvBuffPerRankSize; i < (irank + 1) * recvBuffPerRankSize; ++i)
+                    {
+                        auto&& e = TTSendRecvBuffs[sendRecvPosted % 2][i];
+                        bool found;
+                        TTEntry* replace_tte;
+                        replace_tte = TT.probe(e.first, found);
+                        replace_tte->save(e.first, e.second.value(), e.second.is_pv(), e.second.bound(), e.second.depth(),
+                                          e.second.move(), e.second.eval());
+                    }
              }
 
              // Start next communication
-             MPI_Iallgather(send_buff.data(), send_buff.size() * sizeof(KeyedTTEntry), MPI_BYTE,
-                            TTBuff.data(), TTSendBufferSize * sizeof(KeyedTTEntry), MPI_BYTE,
-                            TTComm, &req);
+             sendrecv_post();
+
+            // Force check of time on the next occasion, the above actions might have taken some time.
+             static_cast<MainThread*>(thread)->callsCnt = 0;
+
          }
      }
   }
 }
 
-
-// TODO update to the scheme in master.. can this use aggregation of votes?
-void pick_moves(MoveInfo& mi) {
+/// Picks the bestMove across ranks, and send the associated info and PV to the root of the cluster.
+/// Note that this bestMove and PV must be output by the root, the guarantee proper ordering of output.
+/// TODO update to the scheme in master.. can this use aggregation of votes?
+void pick_moves(MoveInfo& mi, std::string& PVLine) {
 
   MoveInfo* pMoveInfo = NULL;
   if (is_root())
@@ -314,19 +408,50 @@ void pick_moves(MoveInfo& mi) {
       }
       free(pMoveInfo);
   }
+
+  // Send around the final result
   MPI_Bcast(&mi, 1, MIDatatype, 0, MoveComm);
+
+  // Send PV line to root as needed
+  if (mi.rank != 0 && mi.rank == rank()) {
+      int size;
+      std::vector<char> vec;
+      vec.assign(PVLine.begin(), PVLine.end());
+      size = vec.size();
+      MPI_Send(&size, 1, MPI_INT, 0, 42, MoveComm);
+      MPI_Send(vec.data(), size, MPI_CHAR, 0, 42, MoveComm);
+  }
+  if (mi.rank != 0 && is_root()) {
+      int size;
+      std::vector<char> vec;
+      MPI_Recv(&size, 1, MPI_INT, mi.rank, 42, MoveComm, MPI_STATUS_IGNORE);
+      vec.resize(size);
+      MPI_Recv(vec.data(), size, MPI_CHAR, mi.rank, 42, MoveComm, MPI_STATUS_IGNORE);
+      PVLine.assign(vec.begin(), vec.end());
+  }
+
 }
 
+/// Return nodes searched (lazily updated cluster wide in the signal loop)
 uint64_t nodes_searched() {
 
   return nodesSearchedOthers + Threads.nodes_searched();
 }
 
+/// Return table base hits (lazily updated cluster wide in the signal loop)
 uint64_t tb_hits() {
 
   return tbHitsOthers + Threads.tb_hits();
 }
 
+/// Return the number of saves to the TT buffers, (lazily updated cluster wide in the signal loop)
+uint64_t TT_saves() {
+
+  return TTsavesOthers + Threads.TT_saves();
+}
+
+
+}
 }
 
 #else
@@ -334,6 +459,7 @@ uint64_t tb_hits() {
 #include "cluster.h"
 #include "thread.h"
 
+namespace Stockfish {
 namespace Cluster {
 
 uint64_t nodes_searched() {
@@ -346,6 +472,12 @@ uint64_t tb_hits() {
   return Threads.tb_hits();
 }
 
+uint64_t TT_saves() {
+
+  return Threads.TT_saves();
+}
+
+}
 }
 
 #endif // USE_MPI