]> git.sesse.net Git - stockfish/commitdiff
[Cluster] Improve message passing part.
authorJoost VandeVondele <Joost.VandeVondele@gmail.com>
Thu, 27 Dec 2018 14:42:53 +0000 (15:42 +0100)
committerStéphane Nicolet <Stephane.Nicolet@u-paris2.fr>
Wed, 2 Jan 2019 10:16:24 +0000 (11:16 +0100)
This rewrites in part the message passing part, using in place gather, and collecting, rather than merging, the data of all threads.

neutral with a single thread per rank:
Score of new-2mpi-1t vs old-2mpi-1t: 789 - 787 - 2615  [0.500] 4191
Elo difference: 0.17 +/- 6.44

likely progress with multiple threads per rank:
Score of new-2mpi-36t vs old-2mpi-36t: 76 - 53 - 471  [0.519] 600
Elo difference: 13.32 +/- 12.85

src/cluster.cpp
src/cluster.h
src/thread.cpp
src/thread.h

index 293a5c96f435889d2b4b12a87d86e6d2cca67fba..40f5aae4f5352bf4966cb6300249528888d3800f 100644 (file)
@@ -54,10 +54,15 @@ static MPI_Comm TTComm = MPI_COMM_NULL;
 static MPI_Comm MoveComm = MPI_COMM_NULL;
 static MPI_Comm signalsComm = MPI_COMM_NULL;
 
-static std::vector<KeyedTTEntry> TTBuff;
+static std::vector<KeyedTTEntry> TTRecvBuff;
+static MPI_Request reqGather = MPI_REQUEST_NULL;
+static uint64_t gathersPosted = 0;
+
+static std::atomic<uint64_t> TTCacheCounter = {};
 
 static MPI_Datatype MIDatatype = MPI_DATATYPE_NULL;
 
+
 void init() {
 
   int thread_support;
@@ -72,8 +77,6 @@ void init() {
   MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
   MPI_Comm_size(MPI_COMM_WORLD, &world_size);
 
-  TTBuff.resize(TTSendBufferSize * world_size);
-
   const std::array<MPI_Aint, 4> MIdisps = {offsetof(MoveInfo, move),
                                            offsetof(MoveInfo, depth),
                                            offsetof(MoveInfo, score),
@@ -111,6 +114,13 @@ int rank() {
   return world_rank;
 }
 
+void ttRecvBuff_resize(size_t nThreads) {
+
+  TTRecvBuff.resize(TTCacheSize * world_size * nThreads);
+  std::fill(TTRecvBuff.begin(), TTRecvBuff.end(), KeyedTTEntry());
+
+}
+
 
 bool getline(std::istream& input, std::string& str) {
 
@@ -189,6 +199,18 @@ void signals_sync() {
 
   signals_process();
 
+  // finalize outstanding messages in the gather loop
+  MPI_Allreduce(&gathersPosted, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm);
+  if (gathersPosted < globalCounter)
+  {
+     size_t recvBuffPerRankSize = Threads.size() * TTCacheSize;
+     MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
+                    TTRecvBuff.data(), recvBuffPerRankSize * sizeof(KeyedTTEntry), MPI_BYTE,
+                    TTComm, &reqGather);
+     ++gathersPosted;
+  }
+  assert(gathersPosted == globalCounter);
+
 }
 
 void signals_init() {
@@ -221,59 +243,64 @@ void save(Thread* thread, TTEntry* tte,
   {
      // Try to add to thread's send buffer
      {
-         std::lock_guard<Mutex> lk(thread->ttBuffer.mutex);
-         thread->ttBuffer.buffer.replace(KeyedTTEntry(k,*tte));
-         ++thread->ttBuffer.counter;
+         std::lock_guard<Mutex> lk(thread->ttCache.mutex);
+         thread->ttCache.buffer.replace(KeyedTTEntry(k,*tte));
+        ++TTCacheCounter;
      }
 
+     size_t recvBuffPerRankSize = Threads.size() * TTCacheSize;
+
      // Communicate on main search thread
-     if (thread == Threads.main() && thread->ttBuffer.counter * Threads.size() > TTSendBufferSize)
+     if (thread == Threads.main() && TTCacheCounter > size() * recvBuffPerRankSize)
      {
-         static MPI_Request req = MPI_REQUEST_NULL;
-         static TTSendBuffer<TTSendBufferSize> send_buff = {};
-         int flag;
-
          // Test communication status
-         MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
+         int flag;
+         MPI_Test(&reqGather, &flag, MPI_STATUS_IGNORE);
 
          // Current communication is complete
          if (flag)
          {
-             // Save all received entries (except ours)
+             // Save all received entries to TT, and store our TTCaches, ready for the next round of communication
              for (size_t irank = 0; irank < size_t(size()) ; ++irank)
              {
                  if (irank == size_t(rank()))
-                     continue;
-
-                 for (size_t i = irank * TTSendBufferSize ; i < (irank + 1) * TTSendBufferSize; ++i)
                  {
-                     auto&& e = TTBuff[i];
-                     bool found;
-                     TTEntry* replace_tte;
-                     replace_tte = TT.probe(e.first, found);
-                     replace_tte->save(e.first, e.second.value(), e.second.bound(), e.second.depth(),
-                                       e.second.move(), e.second.eval());
-                 }
-             }
+                    // Copy from the thread caches to the right spot in the buffer
+                    size_t i = irank * recvBuffPerRankSize;
+                    for (auto&& th : Threads)
+                    {
+                        std::lock_guard<Mutex> lk(th->ttCache.mutex);
 
-             // Reset send buffer
-             send_buff = {};
+                        for (auto&& e : th->ttCache.buffer)
+                            TTRecvBuff[i++] = e;
 
-             // Build up new send buffer: best 16 found across all threads
-             for (auto&& th : Threads)
-             {
-                 std::lock_guard<Mutex> lk(th->ttBuffer.mutex);
-                 for (auto&& e : th->ttBuffer.buffer)
-                     send_buff.replace(e);
-                 // Reset thread's send buffer
-                 th->ttBuffer.buffer = {};
-                 th->ttBuffer.counter = 0;
+                        // Reset thread's send buffer
+                        th->ttCache.buffer = {};
+                    }
+
+                   TTCacheCounter = 0;
+                 }
+                 else
+                    for (size_t i = irank * recvBuffPerRankSize; i < (irank + 1) * recvBuffPerRankSize; ++i)
+                    {
+                        auto&& e = TTRecvBuff[i];
+                        bool found;
+                        TTEntry* replace_tte;
+                        replace_tte = TT.probe(e.first, found);
+                        replace_tte->save(e.first, e.second.value(), e.second.bound(), e.second.depth(),
+                                          e.second.move(), e.second.eval());
+                    }
              }
 
              // Start next communication
-             MPI_Iallgather(send_buff.data(), send_buff.size() * sizeof(KeyedTTEntry), MPI_BYTE,
-                            TTBuff.data(), TTSendBufferSize * sizeof(KeyedTTEntry), MPI_BYTE,
-                            TTComm, &req);
+             MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
+                            TTRecvBuff.data(), recvBuffPerRankSize * sizeof(KeyedTTEntry), MPI_BYTE,
+                            TTComm, &reqGather);
+             ++gathersPosted;
+
+            // Force check of time on the next occasion.
+             static_cast<MainThread*>(thread)->callsCnt = 0;
+
          }
      }
   }
index 4b80107dc4068c8109f8f9bf0da8df64b0b24748..b4bc76494bc2540298a7925ccaf4a6bf979b1d5e 100644 (file)
@@ -42,8 +42,8 @@ struct MoveInfo {
 #ifdef USE_MPI
 using KeyedTTEntry = std::pair<Key, TTEntry>;
 
-constexpr std::size_t TTSendBufferSize = 32;
-template <std::size_t N> class TTSendBuffer : public std::array<KeyedTTEntry, N> {
+constexpr std::size_t TTCacheSize = 32;
+template <std::size_t N> class TTCache : public std::array<KeyedTTEntry, N> {
 
   struct Compare {
       inline bool operator()(const KeyedTTEntry& lhs, const KeyedTTEntry& rhs) {
@@ -74,6 +74,7 @@ int rank();
 inline bool is_root() { return rank() == 0; }
 void save(Thread* thread, TTEntry* tte, Key k, Value v, Bound b, Depth d, Move m, Value ev);
 void pick_moves(MoveInfo& mi);
+void ttRecvBuff_resize(size_t nThreads);
 uint64_t nodes_searched();
 uint64_t tb_hits();
 void signals_init();
@@ -90,6 +91,7 @@ constexpr int rank() { return 0; }
 constexpr bool is_root() { return true; }
 inline void save(Thread*, TTEntry* tte, Key k, Value v, Bound b, Depth d, Move m, Value ev) { tte->save(k, v, b, d, m, ev); }
 inline void pick_moves(MoveInfo&) { }
+inline void ttRecvBuff_resize(size_t) { }
 uint64_t nodes_searched();
 uint64_t tb_hits();
 inline void signals_init() { }
index 18d7692b9416318fe0be60e0a2fd63a8524a7d50..932376a305c6bfb2db86b238492050cd5255d73f 100644 (file)
@@ -139,6 +139,9 @@ void ThreadPool::set(size_t requested) {
 
       // Reallocate the hash with the new threadpool size
       TT.resize(Options["Hash"]);
+
+      // Adjust cluster buffers
+      Cluster::ttRecvBuff_resize(requested);
   }
 }
 
index 4f34de514ea362c0a15542bf42c52a04005788ee..f7e88f05fbbf92a02f0f7857455d10f7ebc6e414 100644 (file)
@@ -78,9 +78,8 @@ public:
 #ifdef USE_MPI
   struct {
       Mutex mutex;
-      Cluster::TTSendBuffer<Cluster::TTSendBufferSize> buffer = {};
-      size_t counter = 0;
-  } ttBuffer;
+      Cluster::TTCache<Cluster::TTCacheSize> buffer = {};
+  } ttCache;
 #endif
 };