Line data Source code
1 : #include "threading.hpp"
2 :
3 : #include "com.hpp"
4 : #include "distributed.h"
5 : #include "dynamicConfig.hpp"
6 : #include "logging.hpp"
7 : #include "searcher.hpp"
8 :
9 : namespace {
10 : std::unique_ptr<ThreadPool> _pool = nullptr;
11 : }
12 :
13 33339 : ThreadPool& ThreadPool::instance() {
14 33339 : if (!_pool) _pool.reset(new ThreadPool);
15 33339 : return *_pool;
16 : }
17 :
18 22 : ThreadPool::~ThreadPool() {
19 : // risk of static variable dependency fiasco ... ??
20 22 : Logging::LogIt(Logging::logInfo) << "... ok threadPool deleted";
21 22 : }
22 :
23 0 : void ThreadPool::initPawnTables(){
24 0 : for (const auto& s : instance()) {
25 0 : (*s).initPawnTable();
26 : }
27 0 : }
28 :
29 23 : void ThreadPool::setup() {
30 23 : assert(DynamicConfig::threads > 0);
31 23 : Logging::LogIt(Logging::logInfo) << "Using " << DynamicConfig::threads << " threads";
32 23 : resize(0);
33 : #ifdef LIMIT_THREADS_TO_PHYSICAL_CORES
34 : unsigned int maxThreads = std::max(1u, std::thread::hardware_concurrency());
35 : if (DynamicConfig::threads > maxThreads) {
36 : Logging::LogIt(Logging::logWarn) << "Trying to use more threads than hardware physical cores";
37 : Logging::LogIt(Logging::logWarn) << "I don't like that and will use only " << maxThreads << " threads";
38 : DynamicConfig::threads = maxThreads;
39 : }
40 : #endif
41 : // init other threads (for main see below)
42 49 : while (size() < DynamicConfig::threads) {
43 52 : push_back(std::unique_ptr<Searcher>(new Searcher(size())));
44 26 : back()->initPawnTable();
45 26 : back()->clearGame();
46 : }
47 46 : Logging::LogIt(Logging::logInfo) << "Total size of Pawn TTs " << back()->ttSizePawn * DynamicConfig::threads * sizeof(Searcher::PawnEntry) / 1024 << "Kb";
48 23 : }
49 :
50 15609 : Searcher& ThreadPool::main() { return *(front()); }
51 :
52 229 : void ThreadPool::wait(bool otherOnly) const {
53 229 : Logging::LogIt(Logging::logInfo) << "Wait for workers to be ready";
54 476 : for (const auto& s : *this) {
55 247 : if (!otherOnly || !(*s).isMainThread()) (*s).wait();
56 : }
57 229 : Logging::LogIt(Logging::logInfo) << "...ok";
58 229 : }
59 :
60 208 : void ThreadPool::distributeData(const ThreadData& data) const {
61 : // send input data and time control to all threads
62 208 : ThreadData dataOther = data;
63 208 : dataOther.depth = MAX_DEPTH; // helper threads go for infinite search (need to receive stopFlag to stop)
64 431 : for (const auto& s : *this) {
65 238 : (*s).setData((*s).isMainThread() ? data : dataOther); // this is a copy
66 223 : (*s).currentMoveMs = currentMoveMs; // propagate time control from Threadpool to each Searcher
67 : }
68 208 : }
69 :
70 : // distribute data and call main thread search (this is a non-blocking function)
71 208 : void ThreadPool::startSearch(const ThreadData& data) {
72 208 : Logging::LogIt(Logging::logInfo) << "Search Sync";
73 208 : main().wait();
74 : // COM state must be updated quite late, when all threads or done
75 413 : COM::state = data.isAnalysis ? COM::st_analyzing : data.isPondering ? COM::st_pondering : COM::st_searching; // update COM state
76 416 : Logging::LogIt(Logging::logInfo) << "state set to " << static_cast<int>(COM::state);
77 208 : Logging::LogIt(Logging::logInfo) << "Locking other threads";
78 : Searcher::startLock.store(true);
79 208 : distributeData(data);
80 208 : Logging::LogIt(Logging::logInfo) << "Calling main thread startThread";
81 208 : main().startThread(); // non blocking call
82 208 : }
83 :
84 208 : void ThreadPool::startOthers() const {
85 431 : for (const auto& s : *this)
86 223 : if (!(*s).isMainThread()) (*s).startThread();
87 208 : }
88 :
89 26 : void ThreadPool::clearGame() const {
90 26 : TT::clearTT();
91 52 : for (const auto& s : *this) (*s).clearGame();
92 26 : }
93 :
94 208 : void ThreadPool::clearSearch() const {
95 : #ifdef REPRODUCTIBLE_RESULTS
96 : TT::clearTT();
97 : #endif
98 431 : for (const auto& s : *this) (*s).clearSearch();
99 : Distributed::initStat();
100 208 : }
101 :
102 242 : void ThreadPool::stop() const {
103 242 : Logging::LogIt(Logging::logInfo) << "Setting stopflag to true on every threads";
104 511 : for (const auto& s : *this) (*s).stopFlag = true;
105 242 : }
106 :
107 208 : void ThreadPool::displayStats() const {
108 208 : if (DynamicConfig::minOutputLevel > Logging::logInfo) return;
109 22939 : for (size_t k = 0; k < Stats::sid_maxid; ++k) { Logging::LogIt(Logging::logInfo) << Stats::Names[k] << " " << counter((Stats::StatId)k); }
110 : }
111 :
112 15112 : Counter ThreadPool::counter(Stats::StatId id, bool forceLocal) const {
113 : if (!forceLocal && Distributed::moreThanOneProcess()) { return Distributed::counter(id); }
114 : else {
115 : Counter n = 0;
116 77622 : for (const auto& it : *this) { n += it->stats.counters[id]; }
117 : return n;
118 : }
119 : }
120 :
121 0 : std::ostream& operator<<(std::ostream& of, const ThreadData& d) {
122 0 : of << GetFEN(d.p) << " " << d.score << " " << ToString(d.best) << " " << ToString(d.pv);
123 0 : return of;
124 : }
|