SCM Repository
Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
Parent Directory
|
Revision Log
Revision 3349 - (view) (download) (as text)
1 : | jhr | 1198 | /*! \file main.c |
2 : | * | ||
3 : | * \author John Reppy | ||
4 : | */ | ||
5 : | |||
6 : | /* | ||
7 : | jhr | 3349 | * This code is part of the Diderot Project (http://diderot-language.cs.uchicago.edu) |
8 : | * | ||
9 : | * COPYRIGHT (c) 2015 The University of Chicago | ||
10 : | jhr | 1198 | * All rights reserved. |
11 : | */ | ||
12 : | |||
13 : | #include <Diderot/diderot.h> | ||
14 : | #include <pthread.h> | ||
15 : | nseltzer | 1450 | #include <teem/nrrd.h> |
16 : | jhr | 1198 | |
17 : | jhr | 1215 | // #ifdef HAVE_BUILTIN_ATOMIC_OPS |
18 : | // STATIC_INLINE uint32_t AtomicInc (uint32_t *x) | ||
19 : | // { | ||
20 : | // return __sync_add_and_fetch(x, 1); | ||
21 : | // } | ||
22 : | // STATIC_INLINE uint32_t AtomicDec (uint32_t *x) | ||
23 : | // { | ||
24 : | // return __sync_sub_and_fetch(x, 1); | ||
25 : | // } | ||
26 : | // #else | ||
27 : | // # error atomic operations not supported | ||
28 : | // #endif | ||
29 : | jhr | 1209 | |
30 : | jhr | 1198 | // The number of strands a worker will take for processing at one time |
31 : | jhr | 1578 | #define BLOCK_SIZE 4096 |
32 : | jhr | 1198 | |
33 : | jhr | 1578 | // cache-line alignment (Xeon has 64-byte lines) |
34 : | #define CACHE_ALIGN __attribute__((aligned(64))) | ||
35 : | |||
36 : | jhr | 1198 | struct struct_world { |
37 : | jhr | 1519 | STRUCT_WORLD_PREFIX |
38 : | jhr | 1268 | void **inState; |
39 : | void **outState; | ||
40 : | uint32_t numWorkers; // number of worker threads | ||
41 : | uint32_t nSteps; // number of super steps | ||
42 : | jhr | 1214 | // synchronization state |
43 : | jhr | 1578 | uint32_t nextStrand CACHE_ALIGN; // index of next strand to update |
44 : | uint32_t numActive CACHE_ALIGN; // # active strands | ||
45 : | uint32_t numAvail CACHE_ALIGN; // # unevaluated strands | ||
46 : | uint32_t numIdle CACHE_ALIGN; // # idle workers | ||
47 : | jhr | 1268 | pthread_mutex_t lock; // big lock to protect wrld from multiple accesses |
48 : | pthread_cond_t barrier; // workers wait on this when they have no work | ||
49 : | pthread_cond_t mainWait; // used to signal main when the workers have finished | ||
50 : | jhr | 1198 | }; |
51 : | |||
52 : | jhr | 1214 | typedef struct { |
53 : | jhr | 1268 | int id; |
54 : | Diderot_World_t *wrld; | ||
55 : | jhr | 1578 | } WorkerArg_t CACHE_ALIGN; |
56 : | jhr | 1198 | |
57 : | jhr | 1649 | static void *Worker (void *arg); |
58 : | |||
59 : | int main (int argc, const char **argv) | ||
60 : | { | ||
61 : | CPUInfo_t cpuInfo; | ||
62 : | if (! GetNumCPUs (&cpuInfo)) { | ||
63 : | fprintf(stderr, "unable to get number of processors\n"); | ||
64 : | exit (1); | ||
65 : | } | ||
66 : | |||
67 : | Diderot_int_t np = cpuInfo.numHWCores; | ||
68 : | |||
69 : | Diderot_Options_t *opts = Diderot_OptNew (); | ||
70 : | |||
71 : | Diderot_OptAddInt (opts, "np", "specify number of worker threads", &np, true); | ||
72 : | Diderot_RegisterGlobalOpts (opts); | ||
73 : | Diderot_OptProcess (opts, argc, argv); | ||
74 : | Diderot_OptFree (opts); | ||
75 : | |||
76 : | if (VerboseFlg) fprintf (stderr, "initializing globals ...\n"); | ||
77 : | Diderot_InitGlobals (); | ||
78 : | |||
79 : | if (VerboseFlg) fprintf (stderr, "initializing strands ...\n"); | ||
80 : | Diderot_World_t *wrld = Diderot_Initially (); | ||
81 : | |||
82 : | for (int i = 0; i < wrld->numStrands; i++) { | ||
83 : | // hack to make the invariant part of the state the same in both copies | ||
84 : | memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); | ||
85 : | } | ||
86 : | |||
87 : | // Start worker threads | ||
88 : | int nWorkers = np; | ||
89 : | WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t)); | ||
90 : | if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers); | ||
91 : | double t0 = airTime(); | ||
92 : | wrld->numWorkers = nWorkers; | ||
93 : | wrld->numIdle = 0; | ||
94 : | for (int i = 0; i < nWorkers; i++) { | ||
95 : | pthread_t pid; | ||
96 : | args[i].wrld = wrld; | ||
97 : | args[i].id = i; | ||
98 : | if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) { | ||
99 : | fprintf (stderr, "unable to create worker thread\n"); | ||
100 : | exit (1); | ||
101 : | } | ||
102 : | pthread_detach (pid); | ||
103 : | } | ||
104 : | |||
105 : | // wait for the computation to finish | ||
106 : | pthread_mutex_lock (&wrld->lock); | ||
107 : | pthread_cond_wait (&wrld->mainWait, &wrld->lock); | ||
108 : | pthread_mutex_unlock (&wrld->lock); | ||
109 : | |||
110 : | double totalTime = airTime() - t0; | ||
111 : | |||
112 : | if (VerboseFlg) | ||
113 : | fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime); | ||
114 : | else if (TimingFlg) | ||
115 : | printf ("np=%d usr=%f\n", nWorkers, totalTime); | ||
116 : | |||
117 : | // output the final strand states | ||
118 : | if (NrrdOutputFlg) | ||
119 : | Diderot_Output (wrld, Diderot_Strands[0]->outputSzb); | ||
120 : | else | ||
121 : | Diderot_Print (wrld); | ||
122 : | |||
123 : | Diderot_Shutdown (wrld); | ||
124 : | |||
125 : | return 0; | ||
126 : | |||
127 : | } | ||
128 : | |||
129 : | jhr | 1198 | /* Function which processes active strands. */ |
130 : | jhr | 1214 | static void *Worker (void *arg) |
131 : | jhr | 1198 | { |
132 : | jhr | 1268 | WorkerArg_t *myArg = (WorkerArg_t *)arg; |
133 : | Diderot_World_t *wrld = myArg->wrld; | ||
134 : | jhr | 1198 | |
135 : | jhr | 1277 | int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers; |
136 : | int start = myArg->id * nStrandsPerWorker; | ||
137 : | int limit; | ||
138 : | if (wrld->numWorkers-1 == myArg->id) | ||
139 : | jhr | 1302 | limit = wrld->numStrands; |
140 : | jhr | 1277 | else |
141 : | jhr | 1302 | limit = start + nStrandsPerWorker; |
142 : | jhr | 1277 | |
143 : | jhr | 1198 | while (true) { |
144 : | jhr | 1214 | // barrier synchronization at start of super step |
145 : | jhr | 1268 | pthread_mutex_lock (&wrld->lock); |
146 : | if (wrld->numIdle+1 < wrld->numWorkers) { | ||
147 : | wrld->numIdle++; | ||
148 : | pthread_cond_wait (&wrld->barrier, &wrld->lock); | ||
149 : | } | ||
150 : | else { | ||
151 : | // all other workers are idle, so we can proceed after some initialization | ||
152 : | wrld->numIdle = 0; | ||
153 : | wrld->numAvail = wrld->numStrands; // includes inactive strands | ||
154 : | wrld->nextStrand = 0; | ||
155 : | // swap in and out | ||
156 : | void **tmp = wrld->inState; | ||
157 : | wrld->inState = wrld->outState; | ||
158 : | wrld->outState = tmp; | ||
159 : | pthread_cond_broadcast (&wrld->barrier); | ||
160 : | } | ||
161 : | pthread_mutex_unlock (&wrld->lock); | ||
162 : | jhr | 1198 | |
163 : | jhr | 1214 | // if there are no active strands left, then we're done |
164 : | jhr | 1268 | if (wrld->numActive == 0) { |
165 : | pthread_cond_signal (&wrld->mainWait); | ||
166 : | pthread_exit (0); | ||
167 : | } | ||
168 : | jhr | 1198 | |
169 : | jhr | 1214 | // iterate until there is no more work to do |
170 : | jhr | 1268 | int blkStart, blkSize; |
171 : | int numDead = 0; | ||
172 : | do { | ||
173 : | // grab some work | ||
174 : | pthread_mutex_lock (&wrld->lock); | ||
175 : | blkStart = wrld->nextStrand; | ||
176 : | blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail; | ||
177 : | wrld->numAvail -= blkSize; | ||
178 : | wrld->nextStrand += blkSize; | ||
179 : | pthread_mutex_unlock (&wrld->lock); | ||
180 : | // update the strands | ||
181 : | for (int i = blkStart; i < blkStart+blkSize; i++) { | ||
182 : | if (! wrld->status[i]) { | ||
183 : | StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); | ||
184 : | switch (sts) { | ||
185 : | case DIDEROT_STABILIZE: | ||
186 : | wrld->status[i] = DIDEROT_STABILIZE; | ||
187 : | break; | ||
188 : | case DIDEROT_DIE: | ||
189 : | wrld->status[i] = DIDEROT_DIE; | ||
190 : | numDead++; | ||
191 : | break; | ||
192 : | default: | ||
193 : | break; | ||
194 : | } | ||
195 : | } | ||
196 : | else { | ||
197 : | assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE)); | ||
198 : | } | ||
199 : | } | ||
200 : | } while (blkSize > 0); | ||
201 : | jhr | 1198 | |
202 : | jhr | 1214 | // barrier synchronization |
203 : | jhr | 1268 | pthread_mutex_lock (&wrld->lock); |
204 : | wrld->numActive -= numDead; | ||
205 : | if (wrld->numIdle+1 < wrld->numWorkers) { | ||
206 : | wrld->numIdle++; | ||
207 : | pthread_cond_wait (&wrld->barrier, &wrld->lock); | ||
208 : | } | ||
209 : | else { | ||
210 : | // all other workers are idle, so we can proceed | ||
211 : | wrld->numIdle = 0; | ||
212 : | pthread_cond_broadcast (&wrld->barrier); | ||
213 : | wrld->nSteps++; | ||
214 : | } | ||
215 : | pthread_mutex_unlock (&wrld->lock); | ||
216 : | jhr | 1214 | |
217 : | /**** If there is a global computation phase, it goes here ****/ | ||
218 : | |||
219 : | // stabilize any threads that need stabilization. Each worker is responsible for | ||
220 : | // a contiguous region of the strands | ||
221 : | jhr | 1519 | // FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking |
222 : | jhr | 1302 | int numStabilized = 0; |
223 : | for (int i = start; i < limit; i++) { | ||
224 : | if (wrld->status[i] == DIDEROT_STABILIZE) { | ||
225 : | jhr | 1519 | // stabilize the strand's state. Note that the outState has been set by |
226 : | // the last call to update, so we make the inState be the target of the | ||
227 : | // stabilize method. | ||
228 : | Diderot_Strands[0]->stabilize(wrld->outState[i], wrld->inState[i]); | ||
229 : | memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); | ||
230 : | jhr | 1302 | wrld->status[i] = DIDEROT_STABLE; |
231 : | numStabilized++; | ||
232 : | } | ||
233 : | } | ||
234 : | jhr | 1277 | // adjust the numActive count |
235 : | jhr | 1216 | #if defined(HAVE_BUILTIN_ATOMIC_OPS) |
236 : | jhr | 1302 | __sync_fetch_and_sub(&wrld->numActive, numStabilized); |
237 : | jhr | 1216 | #else |
238 : | jhr | 1302 | pthread_mutex_lock (&wrld->lock); |
239 : | wrld->numActive -= numStabilized; | ||
240 : | pthread_mutex_unlock (&wrld->lock); | ||
241 : | jhr | 1216 | #endif |
242 : | jhr | 1215 | } // end while(true) |
243 : | jhr | 1214 | |
244 : | jhr | 1198 | } |
245 : | |||
246 : | // block allocation of an initial collection of strands | ||
247 : | Diderot_World_t *Diderot_AllocInitially ( | ||
248 : | jhr | 1280 | const char *name, // the name of the program |
249 : | jhr | 1276 | Strand_t *strand, // the type of strands being allocated |
250 : | bool isArray, // is the initialization an array or collection? | ||
251 : | uint32_t nDims, // depth of iteration nesting | ||
252 : | int32_t *base, // nDims array of base indices | ||
253 : | uint32_t *size) // nDims array of iteration sizes | ||
254 : | jhr | 1198 | { |
255 : | jhr | 1472 | Diderot_World_t *wrld = NEW(Diderot_World_t); |
256 : | jhr | 1198 | if (wrld == 0) { |
257 : | jhr | 1268 | fprintf (stderr, "unable to allocate world\n"); |
258 : | exit (1); | ||
259 : | jhr | 1198 | } |
260 : | |||
261 : | jhr | 1287 | wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */ |
262 : | jhr | 1198 | wrld->isArray = isArray; |
263 : | wrld->nDims = nDims; | ||
264 : | jhr | 1472 | wrld->base = NEWVEC(int32_t, nDims); |
265 : | wrld->size = NEWVEC(uint32_t, nDims); | ||
266 : | jhr | 1198 | size_t numStrands = 1; |
267 : | for (int i = 0; i < wrld->nDims; i++) { | ||
268 : | jhr | 1268 | numStrands *= size[i]; |
269 : | wrld->base[i] = base[i]; | ||
270 : | wrld->size[i] = size[i]; | ||
271 : | jhr | 1198 | } |
272 : | |||
273 : | jhr | 1276 | if (VerboseFlg) { |
274 : | fprintf(stderr, "AllocInitially: %d", size[0]); | ||
275 : | for (int i = 1; i < nDims; i++) fprintf(stderr, " x %d", size[i]); | ||
276 : | fprintf(stderr, "\n"); | ||
277 : | } | ||
278 : | jhr | 1198 | |
279 : | // allocate the strand state pointers | ||
280 : | wrld->numStrands = numStrands; | ||
281 : | jhr | 1472 | wrld->inState = NEWVEC(void *, numStrands); |
282 : | wrld->outState = NEWVEC(void *, numStrands); | ||
283 : | wrld->status = NEWVEC(uint8_t, numStrands); | ||
284 : | jhr | 1214 | wrld->numActive = wrld->numStrands; |
285 : | wrld->nSteps = 0; | ||
286 : | wrld->numWorkers = 0; | ||
287 : | jhr | 1198 | |
288 : | // initialize strand state pointers etc. | ||
289 : | jhr | 1268 | for (size_t i = 0; i < numStrands; i++) { |
290 : | jhr | 1519 | wrld->inState[i] = CheckedAlloc (strand->stateSzb); |
291 : | wrld->outState[i] = CheckedAlloc (strand->stateSzb); | ||
292 : | jhr | 1268 | wrld->status[i] = DIDEROT_ACTIVE; |
293 : | jhr | 1198 | } |
294 : | |||
295 : | jhr | 1214 | pthread_mutex_init (&wrld->lock, NULL); |
296 : | pthread_cond_init (&wrld->barrier, NULL); | ||
297 : | pthread_cond_init (&wrld->mainWait, NULL); | ||
298 : | jhr | 1198 | |
299 : | return wrld; | ||
300 : | |||
301 : | } | ||
302 : | |||
303 : | // get strand state pointers | ||
304 : | void *Diderot_InState (Diderot_World_t *wrld, uint32_t i) | ||
305 : | { | ||
306 : | assert (i < wrld->numStrands); | ||
307 : | return wrld->inState[i]; | ||
308 : | } | ||
309 : | |||
310 : | void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i) | ||
311 : | { | ||
312 : | assert (i < wrld->numStrands); | ||
313 : | return wrld->outState[i]; | ||
314 : | } |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |