Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1518 - (view) (download) (as text)

1 : jhr 1198 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     * All rights reserved.
9 :     */
10 :    
11 :     #include <string.h>
12 :     #include <stdio.h>
13 :     #include <assert.h>
14 :     #include <Diderot/diderot.h>
15 :     #include <pthread.h>
16 : nseltzer 1450 #include <teem/nrrd.h>
17 : jhr 1198
18 : jhr 1215 // #ifdef HAVE_BUILTIN_ATOMIC_OPS
19 :     // STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
20 :     // {
21 :     // return __sync_add_and_fetch(x, 1);
22 :     // }
23 :     // STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
24 :     // {
25 :     // return __sync_sub_and_fetch(x, 1);
26 :     // }
27 :     // #else
28 :     // # error atomic operations not supported
29 :     // #endif
30 : jhr 1209
31 : jhr 1198 // The number of strands a worker will take for processing at one time
32 : jhr 1214 #define BLOCK_SIZE 256
33 : jhr 1198
34 :     struct struct_world {
35 : jhr 1280 const char *name; // the program name
36 : jhr 1268 bool isArray; // is the initialization an array or collection?
37 :     uint32_t nDims; // depth of iteration nesting
38 :     int32_t *base; // nDims array of base indices
39 :     uint32_t *size; // nDims array of iteration sizes
40 :     uint32_t numStrands; // number of strands in the world
41 :     void **inState;
42 :     void **outState;
43 :     uint8_t *status; // array of strand status flags
44 :     uint32_t numWorkers; // number of worker threads
45 :     uint32_t nSteps; // number of super steps
46 : jhr 1214 // synchronization state
47 : jhr 1268 uint32_t nextStrand __attribute__((aligned(64))); // index of next strand to update
48 :     uint32_t numActive __attribute__((aligned(64))); // # active strands
49 :     uint32_t numAvail __attribute__((aligned(64))); // # unevaluated strands
50 :     uint32_t numIdle __attribute__((aligned(64))); // # idle workers
51 :     pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
52 :     pthread_cond_t barrier; // workers wait on this when they have no work
53 :     pthread_cond_t mainWait; // used to signal main when the workers have finished
54 : jhr 1198 };
55 :    
56 : jhr 1214 typedef struct {
57 : jhr 1268 int id;
58 :     Diderot_World_t *wrld;
59 : jhr 1277 } WorkerArg_t __attribute__((aligned(64)));
60 : jhr 1198
61 : jhr 1262 // NOTE: we probably should put this in a file that supports runtime printing
62 : jhr 1268 static bool VerboseFlg = false;
63 : jhr 1276 static bool TimingFlg = false;
64 : nseltzer 1450 static bool NrrdOutputFlg = false;
65 : jhr 1262
66 : jhr 1198 /* Function which processes active strands. */
67 : jhr 1214 static void *Worker (void *arg)
68 : jhr 1198 {
69 : jhr 1268 WorkerArg_t *myArg = (WorkerArg_t *)arg;
70 :     Diderot_World_t *wrld = myArg->wrld;
71 : jhr 1198
72 : jhr 1277 int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
73 :     int start = myArg->id * nStrandsPerWorker;
74 :     int limit;
75 :     if (wrld->numWorkers-1 == myArg->id)
76 : jhr 1302 limit = wrld->numStrands;
77 : jhr 1277 else
78 : jhr 1302 limit = start + nStrandsPerWorker;
79 : jhr 1277
80 : jhr 1198 while (true) {
81 : jhr 1214 // barrier synchronization at start of super step
82 : jhr 1268 pthread_mutex_lock (&wrld->lock);
83 :     if (wrld->numIdle+1 < wrld->numWorkers) {
84 :     wrld->numIdle++;
85 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
86 :     }
87 :     else {
88 :     // all other workers are idle, so we can proceed after some initialization
89 :     wrld->numIdle = 0;
90 :     wrld->numAvail = wrld->numStrands; // includes inactive strands
91 :     wrld->nextStrand = 0;
92 :     // swap in and out
93 :     void **tmp = wrld->inState;
94 :     wrld->inState = wrld->outState;
95 :     wrld->outState = tmp;
96 :     pthread_cond_broadcast (&wrld->barrier);
97 :     }
98 :     pthread_mutex_unlock (&wrld->lock);
99 : jhr 1198
100 : jhr 1214 // if there are no active strands left, then we're done
101 : jhr 1268 if (wrld->numActive == 0) {
102 :     pthread_cond_signal (&wrld->mainWait);
103 :     pthread_exit (0);
104 :     }
105 : jhr 1198
106 : jhr 1214 // iterate until there is no more work to do
107 : jhr 1268 int blkStart, blkSize;
108 :     int numDead = 0;
109 :     do {
110 :     // grab some work
111 :     pthread_mutex_lock (&wrld->lock);
112 :     blkStart = wrld->nextStrand;
113 :     blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
114 :     wrld->numAvail -= blkSize;
115 :     wrld->nextStrand += blkSize;
116 :     pthread_mutex_unlock (&wrld->lock);
117 :     // update the strands
118 :     for (int i = blkStart; i < blkStart+blkSize; i++) {
119 :     if (! wrld->status[i]) {
120 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
121 :     switch (sts) {
122 :     case DIDEROT_STABILIZE:
123 : jhr 1494 Diderot_Strands[0]->stabilize(wrld->inState[i], wrld->outState[i]);
124 : jhr 1268 wrld->status[i] = DIDEROT_STABILIZE;
125 :     break;
126 :     case DIDEROT_DIE:
127 :     wrld->status[i] = DIDEROT_DIE;
128 :     numDead++;
129 :     break;
130 :     default:
131 :     break;
132 :     }
133 :     }
134 :     else {
135 :     assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE));
136 :     }
137 :     }
138 :     } while (blkSize > 0);
139 : jhr 1198
140 : jhr 1214 // barrier synchronization
141 : jhr 1268 pthread_mutex_lock (&wrld->lock);
142 :     wrld->numActive -= numDead;
143 :     if (wrld->numIdle+1 < wrld->numWorkers) {
144 :     wrld->numIdle++;
145 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
146 :     }
147 :     else {
148 :     // all other workers are idle, so we can proceed
149 :     wrld->numIdle = 0;
150 :     pthread_cond_broadcast (&wrld->barrier);
151 :     wrld->nSteps++;
152 :     }
153 :     pthread_mutex_unlock (&wrld->lock);
154 : jhr 1214
155 :     /**** If there is a global computation phase, it goes here ****/
156 :    
157 :     // stabilize any threads that need stabilization. Each worker is responsible for
158 :     // a contiguous region of the strands
159 : jhr 1277 // FIXME: once we switch to dynamic lists of strand blocks, then we use finer-grain
160 :     // tracking
161 : jhr 1302 int numStabilized = 0;
162 :     for (int i = start; i < limit; i++) {
163 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
164 :     // copy out to in so that both copies are the stable state
165 :     memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
166 :     wrld->status[i] = DIDEROT_STABLE;
167 :     numStabilized++;
168 :     }
169 :     }
170 : jhr 1277 // adjust the numActive count
171 : jhr 1216 #if defined(HAVE_BUILTIN_ATOMIC_OPS)
172 : jhr 1302 __sync_fetch_and_sub(&wrld->numActive, numStabilized);
173 : jhr 1216 #else
174 : jhr 1302 pthread_mutex_lock (&wrld->lock);
175 :     wrld->numActive -= numStabilized;
176 :     pthread_mutex_unlock (&wrld->lock);
177 : jhr 1216 #endif
178 : jhr 1215 } // end while(true)
179 : jhr 1214
180 : jhr 1198 }
181 :    
182 :    
183 :     int main (int argc, const char **argv)
184 :     {
185 : jhr 1262 CPUInfo_t cpuInfo;
186 :     if (! GetNumCPUs (&cpuInfo)) {
187 : jhr 1268 fprintf(stderr, "unable to get number of processors\n");
188 :     exit (1);
189 : jhr 1262 }
190 :    
191 :     Diderot_int_t np = cpuInfo.numHWCores;
192 :    
193 :     Diderot_Options_t *opts = Diderot_OptNew ();
194 :    
195 : jhr 1263 Diderot_OptAddFlag (opts, "verbose", "enable runtime-system messages", &VerboseFlg);
196 : jhr 1276 Diderot_OptAddFlag (opts, "timing", "enable execution timing", &TimingFlg);
197 : nseltzer 1450 Diderot_OptAddFlag (opts, "nrrd", "enable nrrd output", &NrrdOutputFlg);
198 : jhr 1262 Diderot_OptAddInt (opts, "np", "specify number of threads", &np, true);
199 :     Diderot_RegisterGlobalOpts (opts);
200 :     Diderot_OptProcess (opts, argc, argv);
201 :     Diderot_OptFree (opts);
202 :    
203 :     if (VerboseFlg) fprintf (stderr, "initializing globals ...\n");
204 : jhr 1276 Diderot_InitGlobals ();
205 : jhr 1198
206 : jhr 1262 if (VerboseFlg) fprintf (stderr, "initializing strands ...\n");
207 : jhr 1198 Diderot_World_t *wrld = Diderot_Initially ();
208 : lamonts 1462
209 : jhr 1198 for (int i = 0; i < wrld->numStrands; i++) {
210 :     // hack to make the invariant part of the state the same in both copies
211 : jhr 1268 memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
212 : jhr 1198 }
213 :    
214 : jhr 1214 // Start worker threads
215 : jhr 1262 int nWorkers = np;
216 : jhr 1214 WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t));
217 : jhr 1262 if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers);
218 : jhr 1276 double t0 = GetTime();
219 : jhr 1214 wrld->numWorkers = nWorkers;
220 :     wrld->numIdle = 0;
221 : jhr 1198 for (int i = 0; i < nWorkers; i++) {
222 : jhr 1268 pthread_t pid;
223 :     args[i].wrld = wrld;
224 :     args[i].id = i;
225 :     if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
226 :     fprintf (stderr, "unable to create worker thread\n");
227 :     exit (1);
228 :     }
229 :     pthread_detach (pid);
230 : jhr 1198 }
231 :    
232 : jhr 1214 // wait for the computation to finish
233 :     pthread_mutex_lock (&wrld->lock);
234 : jhr 1268 pthread_cond_wait (&wrld->mainWait, &wrld->lock);
235 : jhr 1214 pthread_mutex_unlock (&wrld->lock);
236 : jhr 1198
237 : jhr 1276 double totalTime = GetTime() - t0;
238 :    
239 :     if (VerboseFlg)
240 :     fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime);
241 :     else if (TimingFlg)
242 :     printf ("np=%d usr=%f\n", nWorkers, totalTime);
243 :    
244 : jhr 1482 // output the final strand states
245 :     Output_Args_t outArgs;
246 :     outArgs.name = wrld->name;
247 :     outArgs.isArray = wrld->isArray;
248 :     outArgs.numStrands = wrld->numStrands;
249 :     outArgs.status = wrld->status;
250 :     outArgs.inState = wrld->inState;
251 :     outArgs.nDims = wrld->nDims;
252 :     outArgs.size = wrld->size;
253 : lamonts 1513 outArgs.outputSzb = Diderot_Strands[0]->outputSzb;
254 : jhr 1482 if (NrrdOutputFlg)
255 :     Diderot_Output (&outArgs);
256 :     else
257 :     Diderot_Print (&outArgs);
258 : nseltzer 1450
259 : jhr 1214 Diderot_Shutdown (wrld);
260 :    
261 : jhr 1198 return 0;
262 :    
263 :     }
264 :    
265 :    
266 :     // this should be the part of the scheduler
267 :     void *Diderot_AllocStrand (Strand_t *strand)
268 :     {
269 : jhr 1472 return CheckedAlloc(strand->stateSzb);
270 : jhr 1198 }
271 :    
272 :     // block allocation of an initial collection of strands
273 :     Diderot_World_t *Diderot_AllocInitially (
274 : jhr 1280 const char *name, // the name of the program
275 : jhr 1276 Strand_t *strand, // the type of strands being allocated
276 :     bool isArray, // is the initialization an array or collection?
277 :     uint32_t nDims, // depth of iteration nesting
278 :     int32_t *base, // nDims array of base indices
279 :     uint32_t *size) // nDims array of iteration sizes
280 : jhr 1198 {
281 : jhr 1472 Diderot_World_t *wrld = NEW(Diderot_World_t);
282 : jhr 1198 if (wrld == 0) {
283 : jhr 1268 fprintf (stderr, "unable to allocate world\n");
284 :     exit (1);
285 : jhr 1198 }
286 :    
287 : jhr 1287 wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */
288 : jhr 1198 wrld->isArray = isArray;
289 :     wrld->nDims = nDims;
290 : jhr 1472 wrld->base = NEWVEC(int32_t, nDims);
291 :     wrld->size = NEWVEC(uint32_t, nDims);
292 : jhr 1198 size_t numStrands = 1;
293 :     for (int i = 0; i < wrld->nDims; i++) {
294 : jhr 1268 numStrands *= size[i];
295 :     wrld->base[i] = base[i];
296 :     wrld->size[i] = size[i];
297 : jhr 1198 }
298 :    
299 : jhr 1276 if (VerboseFlg) {
300 :     fprintf(stderr, "AllocInitially: %d", size[0]);
301 :     for (int i = 1; i < nDims; i++) fprintf(stderr, " x %d", size[i]);
302 :     fprintf(stderr, "\n");
303 :     }
304 : jhr 1198
305 :     // allocate the strand state pointers
306 :     wrld->numStrands = numStrands;
307 : jhr 1472 wrld->inState = NEWVEC(void *, numStrands);
308 :     wrld->outState = NEWVEC(void *, numStrands);
309 :     wrld->status = NEWVEC(uint8_t, numStrands);
310 : jhr 1214 wrld->numActive = wrld->numStrands;
311 :     wrld->nSteps = 0;
312 :     wrld->numWorkers = 0;
313 : jhr 1198
314 :     // initialize strand state pointers etc.
315 : jhr 1268 for (size_t i = 0; i < numStrands; i++) {
316 :     wrld->inState[i] = Diderot_AllocStrand (strand);
317 :     wrld->outState[i] = Diderot_AllocStrand (strand);
318 :     wrld->status[i] = DIDEROT_ACTIVE;
319 : jhr 1198 }
320 :    
321 : jhr 1214 pthread_mutex_init (&wrld->lock, NULL);
322 :     pthread_cond_init (&wrld->barrier, NULL);
323 :     pthread_cond_init (&wrld->mainWait, NULL);
324 : jhr 1198
325 :     return wrld;
326 :    
327 :     }
328 :    
329 :     // get strand state pointers
330 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
331 :     {
332 :     assert (i < wrld->numStrands);
333 :     return wrld->inState[i];
334 :     }
335 :    
336 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
337 :     {
338 :     assert (i < wrld->numStrands);
339 :     return wrld->outState[i];
340 :     }
341 :    
342 :     bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
343 :     {
344 :     assert (i < wrld->numStrands);
345 :     return !wrld->status[i];
346 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0