Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/vis15/src/lib/parallel-target/scheduler.cxx
ViewVC logotype

Annotation of /branches/vis15/src/lib/parallel-target/scheduler.cxx

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4865 - (view) (download) (as text)

1 : jhr 4180 /*! \file scheduler.cxx
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * This code is part of the Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     *
9 :     * COPYRIGHT (c) 2016 The University of Chicago
10 :     * All rights reserved.
11 :     */
12 :    
13 :     #include "diderot/parallel.hxx"
14 :    
15 :     #if defined (__APPLE__)
16 :     # include <sys/sysctl.h>
17 : jhr 4266 #elif defined(HAVE__PROC_CPUINFO)
18 :     # include <cstdio> // for sscanf
19 :     # include <fstream>
20 :     # ifdef HAVE_LIBNUMA
21 :     # include <numa.h>
22 :     # endif
23 : jhr 4180 #endif
24 :     #include <errno.h>
25 :    
26 : jhr 4842 #ifdef DIDEROT_ENABLE_LOGGING
27 :     #define IF_LOGGING(...) __VA_ARGS__
28 :     #else
29 :     #define IF_LOGGING(...)
30 :     #endif
31 :    
32 : jhr 4180 namespace diderot {
33 :    
34 :     bool scheduler::get_cpu_info (world_base *wrld)
35 :     {
36 :     #if defined(HAVE__PROC_CPUINFO)
37 :     /* Get information from /proc/cpuinfo. The interesting
38 :     * fields are:
39 :     *
40 : jhr 4835 * processor : <id> # logical processor/thread id
41 : jhr 4180 * physical id : <id> # node id
42 :     * core id : <id> # core id (per node)
43 :     * cpu cores : <n> # number of cores per node
44 :     */
45 : jhr 4351 std::ifstream cpuinfo("/proc/cpuinfo");
46 :     if (cpuinfo.good()) {
47 :     int maxProcId = 0, maxNodeId = 0, maxCoreId = 0, nCores = 0;
48 :     char buf[1024];
49 :     cpuinfo.getline (buf, sizeof(buf));
50 :     while (cpuinfo.good()) {
51 : jhr 4860 int tmp = 0;
52 : jhr 4351 if (sscanf(buf, "processor : %d", &tmp) == 1) {
53 :     maxProcId = (tmp > maxProcId) ? tmp : maxProcId;
54 :     }
55 :     else if (sscanf(buf, "physical id : %d", &tmp) == 1) {
56 :     maxNodeId = (tmp > maxNodeId) ? tmp : maxNodeId;
57 :     }
58 :     else if (sscanf(buf, "core id : %d", &tmp) == 1) {
59 :     maxCoreId = (tmp > maxCoreId) ? tmp : maxCoreId;
60 :     }
61 :     else if (sscanf(buf, "cpu cores : %d", &tmp) == 1) {
62 : jhr 4860 if (tmp != 0) {
63 : jhr 4835 nCores = tmp;
64 :     } else if (tmp != nCores) {
65 :     wrld->error ("inconsistent core counts %d != %d\n",
66 :     nCores, tmp);
67 :     return true;
68 :     }
69 : jhr 4351 }
70 :     cpuinfo.getline (buf, sizeof(buf));
71 :     }
72 :     cpuinfo.close();
73 :     this->_numHWNodes = maxNodeId + 1;
74 : jhr 4861 this->_numHWCores = (maxCoreId + 1) * this->_numHWNodes;
75 : jhr 4351 this->_numHWThreads = maxProcId + 1;
76 :     this->_numCoresPerNode = nCores;
77 :     this->_numThdsPerCore = this->_numHWThreads / this->_numHWCores;
78 : jhr 4835 // check consistency
79 :     if (this->_numHWCores != this->_numHWNodes * nCores) {
80 :     wrld->error ("inconsistent core counts: %d cores != %d nodes * %d cores/node\n",
81 :     this->_numHWCores, this->_numHWNodes, nCores);
82 :     return true;
83 :     }
84 : jhr 4351 return false;
85 :     }
86 :     else {
87 :     wrld->error ("unable to determine the number of processors\n");
88 :     return true;
89 :     }
90 : jhr 4180 #elif defined(__APPLE__)
91 : jhr 4351 size_t len = sizeof(int);
92 : jhr 4180
93 :     /* the number of nodes */
94 : jhr 4351 if (sysctlbyname("hw.packages", &(this->_numHWNodes), &len, 0, 0) < 0) {
95 :     if (errno == ENOENT) {
96 :     // "hw.packages" is not known
97 :     this->_numHWNodes = 1;
98 :     }
99 :     else {
100 :     wrld->error ("unable to determine the number of nodes\n");
101 :     return true;
102 :     }
103 :     }
104 : jhr 4180
105 :     /* the number of cores */
106 : jhr 4351 if (sysctlbyname("hw.physicalcpu", &(this->_numHWCores), &len, 0, 0) < 0) {
107 :     wrld->error ("unable to determine the number of physical CPUs\n");
108 :     return true;
109 :     }
110 : jhr 4180
111 :     /* the number of hardware threads */
112 : jhr 4351 if (sysctlbyname("hw.logicalcpu", &(this->_numHWThreads), &len, 0, 0) < 0) {
113 :     if (errno == ENOENT) {
114 :     // "hw.packages" is not known
115 :     this->_numHWThreads = this->_numHWCores;
116 :     }
117 :     else {
118 :     wrld->error ("unable to determine the number of logical CPUs\n");
119 :     return true;
120 :     }
121 :     }
122 : jhr 4180
123 : jhr 4351 this->_numCoresPerNode = this->_numHWCores / this->_numHWNodes;
124 :     this->_numThdsPerCore = this->_numHWThreads / this->_numHWCores;
125 : jhr 4180
126 : jhr 4351 return false;
127 : jhr 4180 #else
128 : jhr 4351 return true;
129 : jhr 4180 #endif
130 :     }
131 :    
132 : jhr 4797 static void *worker_main (void *data)
133 :     {
134 : jhr 4835 auto *myInfo = static_cast<scheduler::worker_info *>(data);
135 :     scheduler *sched = myInfo->_sched;
136 : jhr 4797
137 : jhr 4855 IF_LOGGING( LogWorkerStart(myInfo->_wrld, myInfo->_id+1); )
138 : jhr 4844
139 : jhr 4864 // set thread affinity for worker (if supported)
140 :     #ifdef HAVE_SCHED_SETAFFINITY
141 :     cpu_set_t cpus;
142 :     CPU_ZERO(&cpus);
143 :     uint32_t cpuId = myInfo->_node;
144 :     cpuId = (sched->_numCoresPerNode * cpuId) + myInfo->_core;
145 :     cpuId = sched->_numThdsPerCore * cpuId;
146 :     // allow worker to run on any thead
147 : jhr 4865 for (int i = 0; i < sched->_numThdsPerCore; i++) {
148 : jhr 4864 CPU_SET(cpuId+i, &cpus);
149 :     }
150 :     if (sched_setaffinity (0, sizeof(cpu_set_t), &cpus) == -1) {
151 :     myInfo->_wrld->error("Warning: unable to set affinity for worker %d on cpu %d\n",
152 :     myInfo->_id, cpuId);
153 :     }
154 :     #elif HAVE_LIBNUMA
155 :     if (numa_run_on_node (myInfo->_node) == -1) {
156 :     myInfo->_wrld->error("Warning: unable to set affinity for worker %d on node %d\n",
157 :     myInfo->_id, myInfo->_node);
158 :     }
159 :     #endif
160 :    
161 : jhr 4835 while (true) {
162 :     // wait until all workers and the coordinator are ready
163 : jhr 4842 sched->_gate.worker_wait (IF_LOGGING( myInfo->_wrld, myInfo->_id ));
164 : jhr 4835 // check for termination
165 :     if (sched->_done) {
166 : jhr 4864 IF_LOGGING( LogWorkerExit(myInfo->_wrld, myInfo->_id+1); )
167 : jhr 4835 pthread_exit (nullptr);
168 :     }
169 :     // run the task
170 :     assert (sched->_task != nullptr);
171 :     sched->_task (myInfo->_data);
172 :     }
173 : jhr 4797
174 :     } // worker_main
175 :    
176 : jhr 4180 scheduler::scheduler ()
177 : jhr 4351 : _numHWNodes(0), _numHWCores(0), _numHWThreads(0), _numCoresPerNode(0),
178 : jhr 4797 _numThdsPerCore(0), _numWorkers(0), _workers(nullptr), _info(nullptr),
179 : jhr 4835 _task(nullptr), _workSize(0), _done(false)
180 : jhr 4180 {
181 : jhr 4799 pthread_mutex_init (&this->_prLock, nullptr);
182 : jhr 4180 }
183 :    
184 :     scheduler::~scheduler ()
185 :     {
186 : jhr 4835 pthread_mutex_destroy (&this->_prLock);
187 :     delete[] this->_workers;
188 :     delete[] this->_info;
189 : jhr 4180 }
190 :    
191 : jhr 4842 void scheduler::set_num_workers (uint32_t nw)
192 : jhr 4637 {
193 : jhr 4842 this->_numWorkers = (nw < 1) ? 1 : ((this->_numHWThreads < nw) ? this->_numHWThreads : nw);
194 :     }
195 :    
196 :     bool scheduler::create_workers (world_base *wrld)
197 :     {
198 :     if (this->_info != nullptr) {
199 : jhr 4835 wrld->error ("attempt to create workers when workers already exist\n");
200 :     return true;
201 :     }
202 : jhr 4864
203 : jhr 4856 IF_LOGGING( LogSchedulerStart(wrld, 0); )
204 : jhr 4864
205 : jhr 4797 // initialize the barriers
206 : jhr 4835 this->_gate.init (this->_numWorkers);
207 :     this->_bspBar.init (this->_numWorkers);
208 : jhr 4864
209 : jhr 4797 // allocate space for thread IDs and thread info
210 : jhr 4835 this->_workers = new pthread_t[this->_numWorkers];
211 :     this->_info = new worker_info[this->_numWorkers];
212 : jhr 4864
213 :     #if defined(HAVE_SCHED_SETAFFINITY) || defined(HAVE_LIBNUMA)
214 :     // set the desired worker locations
215 : jhr 4865 int workersPerNode = (this->_numWorkers + this->_numHWNodes - 1) / this->_numHWNodes;
216 : jhr 4864 uint32_t wid = 0;
217 : jhr 4865 for (uint32_t i = 0; i < this->_numHWNodes; i++) {
218 : jhr 4864 for (uint32_t j = 0; j < workersPerNode; j++) {
219 :     if (wid < this->_numWorkers) {
220 :     this->_info[wid]._node = i;
221 : jhr 4865 this->_info[wid]._core = j % this->_numCoresPerNode;
222 : jhr 4864 this->_info[wid]._thd = 0;
223 :     wid++;
224 :     }
225 :     else {
226 :     break;
227 :     }
228 :     }
229 :     }
230 :     #endif
231 :    
232 :     // start the worker threads
233 : jhr 4835 for (uint32_t i = 0; i < this->_numWorkers; i++) {
234 :     this->_info[i]._id = i;
235 :     this->_info[i]._wrld = wrld;
236 :     this->_info[i]._sched = this;
237 : jhr 4864 #if !(defined(HAVE_SCHED_SETAFFINITY) || defined(HAVE_LIBNUMA))
238 :     // no affinity support
239 :     this->_info[i]._node = 0;
240 :     this->_info[i]._core = 0;
241 :     this->_info[i]._thd = 0;
242 :     #endif
243 : jhr 4835 int sts = pthread_create (
244 :     &this->_workers[i],
245 :     nullptr,
246 :     worker_main,
247 :     &this->_info[i]);
248 :     if (sts != 0) {
249 :     wrld->error ("unable to create worker thread; err = %d\n", sts);
250 :     return true;
251 :     }
252 :     }
253 : jhr 4637
254 : jhr 4797 // wait for workers to be ready to go
255 : jhr 4842 this->_gate.controller_wait (IF_LOGGING( wrld ));
256 : jhr 4797
257 : jhr 4835 return false;
258 : jhr 4637 }
259 :    
260 : jhr 4842 void scheduler::shutdown (world_base *wrld)
261 : jhr 4797 {
262 : jhr 4835 this->_done = true;
263 : jhr 4797 // synchronize with workers
264 : jhr 4842 this->_gate.release_workers (IF_LOGGING( wrld ));
265 : jhr 4797 // wait for them to terminate
266 : jhr 4835 void *dummy;
267 :     for (uint32_t i = 0; i < this->_numWorkers; i++) {
268 :     int sts = pthread_join (this->_workers[i], &dummy);
269 :     if (sts != 0) {
270 :     std::cerr << "error attempting to join with worker " << i
271 :     << " (id: " << this->_workers[i] << "); error code = "
272 :     << sts << std::endl;
273 :     }
274 :     }
275 :     this->_numWorkers = 0;
276 : jhr 4860 IF_LOGGING( LogSchedulerShutdown(wrld, 0); )
277 : jhr 4797 }
278 :    
279 : jhr 4180 } // namepace diderot

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0