SCM Repository
Annotation of /branches/charisee/src/lib/parallel-target/main.c
Parent Directory
|
Revision Log
Revision 2377 - (view) (download) (as text)
1 : | jhr | 1232 | /*! \file main.c |
2 : | * | ||
3 : | * \author John Reppy | ||
4 : | */ | ||
5 : | |||
6 : | /* | ||
7 : | * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu) | ||
8 : | * All rights reserved. | ||
9 : | */ | ||
10 : | |||
11 : | #include <Diderot/diderot.h> | ||
12 : | #include <pthread.h> | ||
13 : | jhr | 1640 | #include <teem/nrrd.h> |
14 : | jhr | 1232 | |
15 : | // #ifdef HAVE_BUILTIN_ATOMIC_OPS | ||
16 : | // STATIC_INLINE uint32_t AtomicInc (uint32_t *x) | ||
17 : | // { | ||
18 : | // return __sync_add_and_fetch(x, 1); | ||
19 : | // } | ||
20 : | // STATIC_INLINE uint32_t AtomicDec (uint32_t *x) | ||
21 : | // { | ||
22 : | // return __sync_sub_and_fetch(x, 1); | ||
23 : | // } | ||
24 : | // #else | ||
25 : | // # error atomic operations not supported | ||
26 : | // #endif | ||
27 : | |||
28 : | // The number of strands a worker will take for processing at one time | ||
29 : | jhr | 1640 | #define BLOCK_SIZE 4096 |
30 : | jhr | 1232 | |
31 : | jhr | 1640 | // cache-line alignment (Xeon has 64-byte lines) |
32 : | #define CACHE_ALIGN __attribute__((aligned(64))) | ||
33 : | |||
34 : | jhr | 1232 | struct struct_world { |
35 : | jhr | 1640 | STRUCT_WORLD_PREFIX |
36 : | jhr | 1301 | void **inState; |
37 : | void **outState; | ||
38 : | uint32_t numWorkers; // number of worker threads | ||
39 : | uint32_t nSteps; // number of super steps | ||
40 : | jhr | 1232 | // synchronization state |
41 : | jhr | 1640 | uint32_t nextStrand CACHE_ALIGN; // index of next strand to update |
42 : | uint32_t numActive CACHE_ALIGN; // # active strands | ||
43 : | uint32_t numAvail CACHE_ALIGN; // # unevaluated strands | ||
44 : | uint32_t numIdle CACHE_ALIGN; // # idle workers | ||
45 : | jhr | 1301 | pthread_mutex_t lock; // big lock to protect wrld from multiple accesses |
46 : | pthread_cond_t barrier; // workers wait on this when they have no work | ||
47 : | pthread_cond_t mainWait; // used to signal main when the workers have finished | ||
48 : | jhr | 1232 | }; |
49 : | |||
50 : | typedef struct { | ||
51 : | jhr | 1301 | int id; |
52 : | Diderot_World_t *wrld; | ||
53 : | jhr | 1640 | } WorkerArg_t CACHE_ALIGN; |
54 : | jhr | 1232 | |
55 : | jhr | 1671 | static void *Worker (void *arg); |
56 : | jhr | 1301 | |
57 : | jhr | 1671 | int main (int argc, const char **argv) |
58 : | { | ||
59 : | CPUInfo_t cpuInfo; | ||
60 : | if (! GetNumCPUs (&cpuInfo)) { | ||
61 : | fprintf(stderr, "unable to get number of processors\n"); | ||
62 : | exit (1); | ||
63 : | } | ||
64 : | |||
65 : | Diderot_int_t np = cpuInfo.numHWCores; | ||
66 : | |||
67 : | Diderot_Options_t *opts = Diderot_OptNew (); | ||
68 : | |||
69 : | Diderot_OptAddInt (opts, "np", "specify number of worker threads", &np, true); | ||
70 : | Diderot_RegisterGlobalOpts (opts); | ||
71 : | Diderot_OptProcess (opts, argc, argv); | ||
72 : | Diderot_OptFree (opts); | ||
73 : | |||
74 : | if (VerboseFlg) fprintf (stderr, "initializing globals ...\n"); | ||
75 : | Diderot_InitGlobals (); | ||
76 : | |||
77 : | if (VerboseFlg) fprintf (stderr, "initializing strands ...\n"); | ||
78 : | Diderot_World_t *wrld = Diderot_Initially (); | ||
79 : | |||
80 : | for (int i = 0; i < wrld->numStrands; i++) { | ||
81 : | // hack to make the invariant part of the state the same in both copies | ||
82 : | memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); | ||
83 : | } | ||
84 : | |||
85 : | // Start worker threads | ||
86 : | int nWorkers = np; | ||
87 : | jhr | 1825 | WorkerArg_t *args = NEWVEC(WorkerArg_t, nWorkers); |
88 : | jhr | 1671 | if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers); |
89 : | double t0 = airTime(); | ||
90 : | wrld->numWorkers = nWorkers; | ||
91 : | wrld->numIdle = 0; | ||
92 : | for (int i = 0; i < nWorkers; i++) { | ||
93 : | pthread_t pid; | ||
94 : | args[i].wrld = wrld; | ||
95 : | args[i].id = i; | ||
96 : | if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) { | ||
97 : | fprintf (stderr, "unable to create worker thread\n"); | ||
98 : | exit (1); | ||
99 : | } | ||
100 : | pthread_detach (pid); | ||
101 : | } | ||
102 : | |||
103 : | // wait for the computation to finish | ||
104 : | pthread_mutex_lock (&wrld->lock); | ||
105 : | pthread_cond_wait (&wrld->mainWait, &wrld->lock); | ||
106 : | pthread_mutex_unlock (&wrld->lock); | ||
107 : | |||
108 : | double totalTime = airTime() - t0; | ||
109 : | |||
110 : | if (VerboseFlg) | ||
111 : | fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime); | ||
112 : | else if (TimingFlg) | ||
113 : | printf ("np=%d usr=%f\n", nWorkers, totalTime); | ||
114 : | |||
115 : | // output the final strand states | ||
116 : | if (NrrdOutputFlg) | ||
117 : | Diderot_Output (wrld, Diderot_Strands[0]->outputSzb); | ||
118 : | else | ||
119 : | Diderot_Print (wrld); | ||
120 : | |||
121 : | Diderot_Shutdown (wrld); | ||
122 : | |||
123 : | return 0; | ||
124 : | |||
125 : | } | ||
126 : | |||
127 : | jhr | 1232 | /* Function which processes active strands. */ |
128 : | static void *Worker (void *arg) | ||
129 : | { | ||
130 : | jhr | 1301 | WorkerArg_t *myArg = (WorkerArg_t *)arg; |
131 : | Diderot_World_t *wrld = myArg->wrld; | ||
132 : | jhr | 1232 | |
133 : | jhr | 1301 | int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers; |
134 : | int start = myArg->id * nStrandsPerWorker; | ||
135 : | int limit; | ||
136 : | if (wrld->numWorkers-1 == myArg->id) | ||
137 : | limit = wrld->numStrands; | ||
138 : | else | ||
139 : | limit = start + nStrandsPerWorker; | ||
140 : | |||
141 : | jhr | 1232 | while (true) { |
142 : | // barrier synchronization at start of super step | ||
143 : | jhr | 1301 | pthread_mutex_lock (&wrld->lock); |
144 : | if (wrld->numIdle+1 < wrld->numWorkers) { | ||
145 : | wrld->numIdle++; | ||
146 : | pthread_cond_wait (&wrld->barrier, &wrld->lock); | ||
147 : | } | ||
148 : | else { | ||
149 : | // all other workers are idle, so we can proceed after some initialization | ||
150 : | wrld->numIdle = 0; | ||
151 : | wrld->numAvail = wrld->numStrands; // includes inactive strands | ||
152 : | wrld->nextStrand = 0; | ||
153 : | // swap in and out | ||
154 : | void **tmp = wrld->inState; | ||
155 : | wrld->inState = wrld->outState; | ||
156 : | wrld->outState = tmp; | ||
157 : | pthread_cond_broadcast (&wrld->barrier); | ||
158 : | } | ||
159 : | pthread_mutex_unlock (&wrld->lock); | ||
160 : | jhr | 1232 | |
161 : | // if there are no active strands left, then we're done | ||
162 : | jhr | 1301 | if (wrld->numActive == 0) { |
163 : | pthread_cond_signal (&wrld->mainWait); | ||
164 : | pthread_exit (0); | ||
165 : | } | ||
166 : | jhr | 1232 | |
167 : | // iterate until there is no more work to do | ||
168 : | jhr | 1301 | int blkStart, blkSize; |
169 : | int numDead = 0; | ||
170 : | do { | ||
171 : | // grab some work | ||
172 : | pthread_mutex_lock (&wrld->lock); | ||
173 : | blkStart = wrld->nextStrand; | ||
174 : | blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail; | ||
175 : | wrld->numAvail -= blkSize; | ||
176 : | wrld->nextStrand += blkSize; | ||
177 : | pthread_mutex_unlock (&wrld->lock); | ||
178 : | // update the strands | ||
179 : | for (int i = blkStart; i < blkStart+blkSize; i++) { | ||
180 : | if (! wrld->status[i]) { | ||
181 : | StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); | ||
182 : | switch (sts) { | ||
183 : | case DIDEROT_STABILIZE: | ||
184 : | wrld->status[i] = DIDEROT_STABILIZE; | ||
185 : | break; | ||
186 : | case DIDEROT_DIE: | ||
187 : | wrld->status[i] = DIDEROT_DIE; | ||
188 : | numDead++; | ||
189 : | break; | ||
190 : | default: | ||
191 : | break; | ||
192 : | } | ||
193 : | } | ||
194 : | else { | ||
195 : | assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE)); | ||
196 : | } | ||
197 : | } | ||
198 : | } while (blkSize > 0); | ||
199 : | jhr | 1232 | |
200 : | // barrier synchronization | ||
201 : | jhr | 1301 | pthread_mutex_lock (&wrld->lock); |
202 : | wrld->numActive -= numDead; | ||
203 : | if (wrld->numIdle+1 < wrld->numWorkers) { | ||
204 : | wrld->numIdle++; | ||
205 : | pthread_cond_wait (&wrld->barrier, &wrld->lock); | ||
206 : | } | ||
207 : | else { | ||
208 : | // all other workers are idle, so we can proceed | ||
209 : | wrld->numIdle = 0; | ||
210 : | pthread_cond_broadcast (&wrld->barrier); | ||
211 : | wrld->nSteps++; | ||
212 : | } | ||
213 : | pthread_mutex_unlock (&wrld->lock); | ||
214 : | jhr | 1232 | |
215 : | /**** If there is a global computation phase, it goes here ****/ | ||
216 : | |||
217 : | // stabilize any threads that need stabilization. Each worker is responsible for | ||
218 : | // a contiguous region of the strands | ||
219 : | jhr | 1640 | // FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking |
220 : | jhr | 1301 | int numStabilized = 0; |
221 : | for (int i = start; i < limit; i++) { | ||
222 : | if (wrld->status[i] == DIDEROT_STABILIZE) { | ||
223 : | jhr | 1640 | // stabilize the strand's state. Note that the outState has been set by |
224 : | // the last call to update, so we make the inState be the target of the | ||
225 : | // stabilize method. | ||
226 : | Diderot_Strands[0]->stabilize(wrld->outState[i], wrld->inState[i]); | ||
227 : | memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); | ||
228 : | jhr | 1301 | wrld->status[i] = DIDEROT_STABLE; |
229 : | numStabilized++; | ||
230 : | } | ||
231 : | } | ||
232 : | // adjust the numActive count | ||
233 : | jhr | 1232 | #if defined(HAVE_BUILTIN_ATOMIC_OPS) |
234 : | jhr | 1301 | __sync_fetch_and_sub(&wrld->numActive, numStabilized); |
235 : | jhr | 1232 | #else |
236 : | jhr | 1301 | pthread_mutex_lock (&wrld->lock); |
237 : | wrld->numActive -= numStabilized; | ||
238 : | pthread_mutex_unlock (&wrld->lock); | ||
239 : | jhr | 1232 | #endif |
240 : | } // end while(true) | ||
241 : | |||
242 : | } | ||
243 : | |||
244 : | // block allocation of an initial collection of strands | ||
245 : | Diderot_World_t *Diderot_AllocInitially ( | ||
246 : | jhr | 1301 | const char *name, // the name of the program |
247 : | Strand_t *strand, // the type of strands being allocated | ||
248 : | bool isArray, // is the initialization an array or collection? | ||
249 : | uint32_t nDims, // depth of iteration nesting | ||
250 : | int32_t *base, // nDims array of base indices | ||
251 : | uint32_t *size) // nDims array of iteration sizes | ||
252 : | jhr | 1232 | { |
253 : | jhr | 1640 | Diderot_World_t *wrld = NEW(Diderot_World_t); |
254 : | jhr | 1232 | if (wrld == 0) { |
255 : | jhr | 1301 | fprintf (stderr, "unable to allocate world\n"); |
256 : | exit (1); | ||
257 : | jhr | 1232 | } |
258 : | |||
259 : | jhr | 1301 | wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */ |
260 : | jhr | 1232 | wrld->isArray = isArray; |
261 : | wrld->nDims = nDims; | ||
262 : | jhr | 1640 | wrld->base = NEWVEC(int32_t, nDims); |
263 : | wrld->size = NEWVEC(uint32_t, nDims); | ||
264 : | jhr | 1232 | size_t numStrands = 1; |
265 : | for (int i = 0; i < wrld->nDims; i++) { | ||
266 : | jhr | 1301 | numStrands *= size[i]; |
267 : | wrld->base[i] = base[i]; | ||
268 : | wrld->size[i] = size[i]; | ||
269 : | jhr | 1232 | } |
270 : | |||
271 : | jhr | 1301 | if (VerboseFlg) { |
272 : | fprintf(stderr, "AllocInitially: %d", size[0]); | ||
273 : | for (int i = 1; i < nDims; i++) fprintf(stderr, " x %d", size[i]); | ||
274 : | fprintf(stderr, "\n"); | ||
275 : | } | ||
276 : | jhr | 1232 | |
277 : | // allocate the strand state pointers | ||
278 : | wrld->numStrands = numStrands; | ||
279 : | jhr | 1640 | wrld->inState = NEWVEC(void *, numStrands); |
280 : | wrld->outState = NEWVEC(void *, numStrands); | ||
281 : | wrld->status = NEWVEC(uint8_t, numStrands); | ||
282 : | jhr | 1232 | wrld->numActive = wrld->numStrands; |
283 : | wrld->nSteps = 0; | ||
284 : | wrld->numWorkers = 0; | ||
285 : | |||
286 : | // initialize strand state pointers etc. | ||
287 : | jhr | 1301 | for (size_t i = 0; i < numStrands; i++) { |
288 : | jhr | 1640 | wrld->inState[i] = CheckedAlloc (strand->stateSzb); |
289 : | wrld->outState[i] = CheckedAlloc (strand->stateSzb); | ||
290 : | jhr | 1301 | wrld->status[i] = DIDEROT_ACTIVE; |
291 : | jhr | 1232 | } |
292 : | |||
293 : | pthread_mutex_init (&wrld->lock, NULL); | ||
294 : | pthread_cond_init (&wrld->barrier, NULL); | ||
295 : | pthread_cond_init (&wrld->mainWait, NULL); | ||
296 : | |||
297 : | return wrld; | ||
298 : | |||
299 : | } | ||
300 : | |||
301 : | // get strand state pointers | ||
302 : | void *Diderot_InState (Diderot_World_t *wrld, uint32_t i) | ||
303 : | { | ||
304 : | assert (i < wrld->numStrands); | ||
305 : | return wrld->inState[i]; | ||
306 : | } | ||
307 : | |||
308 : | void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i) | ||
309 : | { | ||
310 : | assert (i < wrld->numStrands); | ||
311 : | return wrld->outState[i]; | ||
312 : | } |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |