Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3349 - (view) (download) (as text)

1 : jhr 1198 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 : jhr 3349 * This code is part of the Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     *
9 :     * COPYRIGHT (c) 2015 The University of Chicago
10 : jhr 1198 * All rights reserved.
11 :     */
12 :    
13 :     #include <Diderot/diderot.h>
14 :     #include <pthread.h>
15 : nseltzer 1450 #include <teem/nrrd.h>
16 : jhr 1198
17 : jhr 1215 // #ifdef HAVE_BUILTIN_ATOMIC_OPS
18 :     // STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
19 :     // {
20 :     // return __sync_add_and_fetch(x, 1);
21 :     // }
22 :     // STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
23 :     // {
24 :     // return __sync_sub_and_fetch(x, 1);
25 :     // }
26 :     // #else
27 :     // # error atomic operations not supported
28 :     // #endif
29 : jhr 1209
30 : jhr 1198 // The number of strands a worker will take for processing at one time
31 : jhr 1578 #define BLOCK_SIZE 4096
32 : jhr 1198
33 : jhr 1578 // cache-line alignment (Xeon has 64-byte lines)
34 :     #define CACHE_ALIGN __attribute__((aligned(64)))
35 :    
36 : jhr 1198 struct struct_world {
37 : jhr 1519 STRUCT_WORLD_PREFIX
38 : jhr 1268 void **inState;
39 :     void **outState;
40 :     uint32_t numWorkers; // number of worker threads
41 :     uint32_t nSteps; // number of super steps
42 : jhr 1214 // synchronization state
43 : jhr 1578 uint32_t nextStrand CACHE_ALIGN; // index of next strand to update
44 :     uint32_t numActive CACHE_ALIGN; // # active strands
45 :     uint32_t numAvail CACHE_ALIGN; // # unevaluated strands
46 :     uint32_t numIdle CACHE_ALIGN; // # idle workers
47 : jhr 1268 pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
48 :     pthread_cond_t barrier; // workers wait on this when they have no work
49 :     pthread_cond_t mainWait; // used to signal main when the workers have finished
50 : jhr 1198 };
51 :    
52 : jhr 1214 typedef struct {
53 : jhr 1268 int id;
54 :     Diderot_World_t *wrld;
55 : jhr 1578 } WorkerArg_t CACHE_ALIGN;
56 : jhr 1198
57 : jhr 1649 static void *Worker (void *arg);
58 :    
59 :     int main (int argc, const char **argv)
60 :     {
61 :     CPUInfo_t cpuInfo;
62 :     if (! GetNumCPUs (&cpuInfo)) {
63 :     fprintf(stderr, "unable to get number of processors\n");
64 :     exit (1);
65 :     }
66 :    
67 :     Diderot_int_t np = cpuInfo.numHWCores;
68 :    
69 :     Diderot_Options_t *opts = Diderot_OptNew ();
70 :    
71 :     Diderot_OptAddInt (opts, "np", "specify number of worker threads", &np, true);
72 :     Diderot_RegisterGlobalOpts (opts);
73 :     Diderot_OptProcess (opts, argc, argv);
74 :     Diderot_OptFree (opts);
75 :    
76 :     if (VerboseFlg) fprintf (stderr, "initializing globals ...\n");
77 :     Diderot_InitGlobals ();
78 :    
79 :     if (VerboseFlg) fprintf (stderr, "initializing strands ...\n");
80 :     Diderot_World_t *wrld = Diderot_Initially ();
81 :    
82 :     for (int i = 0; i < wrld->numStrands; i++) {
83 :     // hack to make the invariant part of the state the same in both copies
84 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
85 :     }
86 :    
87 :     // Start worker threads
88 :     int nWorkers = np;
89 :     WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t));
90 :     if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers);
91 :     double t0 = airTime();
92 :     wrld->numWorkers = nWorkers;
93 :     wrld->numIdle = 0;
94 :     for (int i = 0; i < nWorkers; i++) {
95 :     pthread_t pid;
96 :     args[i].wrld = wrld;
97 :     args[i].id = i;
98 :     if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
99 :     fprintf (stderr, "unable to create worker thread\n");
100 :     exit (1);
101 :     }
102 :     pthread_detach (pid);
103 :     }
104 :    
105 :     // wait for the computation to finish
106 :     pthread_mutex_lock (&wrld->lock);
107 :     pthread_cond_wait (&wrld->mainWait, &wrld->lock);
108 :     pthread_mutex_unlock (&wrld->lock);
109 :    
110 :     double totalTime = airTime() - t0;
111 :    
112 :     if (VerboseFlg)
113 :     fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime);
114 :     else if (TimingFlg)
115 :     printf ("np=%d usr=%f\n", nWorkers, totalTime);
116 :    
117 :     // output the final strand states
118 :     if (NrrdOutputFlg)
119 :     Diderot_Output (wrld, Diderot_Strands[0]->outputSzb);
120 :     else
121 :     Diderot_Print (wrld);
122 :    
123 :     Diderot_Shutdown (wrld);
124 :    
125 :     return 0;
126 :    
127 :     }
128 :    
129 : jhr 1198 /* Function which processes active strands. */
130 : jhr 1214 static void *Worker (void *arg)
131 : jhr 1198 {
132 : jhr 1268 WorkerArg_t *myArg = (WorkerArg_t *)arg;
133 :     Diderot_World_t *wrld = myArg->wrld;
134 : jhr 1198
135 : jhr 1277 int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
136 :     int start = myArg->id * nStrandsPerWorker;
137 :     int limit;
138 :     if (wrld->numWorkers-1 == myArg->id)
139 : jhr 1302 limit = wrld->numStrands;
140 : jhr 1277 else
141 : jhr 1302 limit = start + nStrandsPerWorker;
142 : jhr 1277
143 : jhr 1198 while (true) {
144 : jhr 1214 // barrier synchronization at start of super step
145 : jhr 1268 pthread_mutex_lock (&wrld->lock);
146 :     if (wrld->numIdle+1 < wrld->numWorkers) {
147 :     wrld->numIdle++;
148 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
149 :     }
150 :     else {
151 :     // all other workers are idle, so we can proceed after some initialization
152 :     wrld->numIdle = 0;
153 :     wrld->numAvail = wrld->numStrands; // includes inactive strands
154 :     wrld->nextStrand = 0;
155 :     // swap in and out
156 :     void **tmp = wrld->inState;
157 :     wrld->inState = wrld->outState;
158 :     wrld->outState = tmp;
159 :     pthread_cond_broadcast (&wrld->barrier);
160 :     }
161 :     pthread_mutex_unlock (&wrld->lock);
162 : jhr 1198
163 : jhr 1214 // if there are no active strands left, then we're done
164 : jhr 1268 if (wrld->numActive == 0) {
165 :     pthread_cond_signal (&wrld->mainWait);
166 :     pthread_exit (0);
167 :     }
168 : jhr 1198
169 : jhr 1214 // iterate until there is no more work to do
170 : jhr 1268 int blkStart, blkSize;
171 :     int numDead = 0;
172 :     do {
173 :     // grab some work
174 :     pthread_mutex_lock (&wrld->lock);
175 :     blkStart = wrld->nextStrand;
176 :     blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
177 :     wrld->numAvail -= blkSize;
178 :     wrld->nextStrand += blkSize;
179 :     pthread_mutex_unlock (&wrld->lock);
180 :     // update the strands
181 :     for (int i = blkStart; i < blkStart+blkSize; i++) {
182 :     if (! wrld->status[i]) {
183 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
184 :     switch (sts) {
185 :     case DIDEROT_STABILIZE:
186 :     wrld->status[i] = DIDEROT_STABILIZE;
187 :     break;
188 :     case DIDEROT_DIE:
189 :     wrld->status[i] = DIDEROT_DIE;
190 :     numDead++;
191 :     break;
192 :     default:
193 :     break;
194 :     }
195 :     }
196 :     else {
197 :     assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE));
198 :     }
199 :     }
200 :     } while (blkSize > 0);
201 : jhr 1198
202 : jhr 1214 // barrier synchronization
203 : jhr 1268 pthread_mutex_lock (&wrld->lock);
204 :     wrld->numActive -= numDead;
205 :     if (wrld->numIdle+1 < wrld->numWorkers) {
206 :     wrld->numIdle++;
207 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
208 :     }
209 :     else {
210 :     // all other workers are idle, so we can proceed
211 :     wrld->numIdle = 0;
212 :     pthread_cond_broadcast (&wrld->barrier);
213 :     wrld->nSteps++;
214 :     }
215 :     pthread_mutex_unlock (&wrld->lock);
216 : jhr 1214
217 :     /**** If there is a global computation phase, it goes here ****/
218 :    
219 :     // stabilize any threads that need stabilization. Each worker is responsible for
220 :     // a contiguous region of the strands
221 : jhr 1519 // FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking
222 : jhr 1302 int numStabilized = 0;
223 :     for (int i = start; i < limit; i++) {
224 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
225 : jhr 1519 // stabilize the strand's state. Note that the outState has been set by
226 :     // the last call to update, so we make the inState be the target of the
227 :     // stabilize method.
228 :     Diderot_Strands[0]->stabilize(wrld->outState[i], wrld->inState[i]);
229 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
230 : jhr 1302 wrld->status[i] = DIDEROT_STABLE;
231 :     numStabilized++;
232 :     }
233 :     }
234 : jhr 1277 // adjust the numActive count
235 : jhr 1216 #if defined(HAVE_BUILTIN_ATOMIC_OPS)
236 : jhr 1302 __sync_fetch_and_sub(&wrld->numActive, numStabilized);
237 : jhr 1216 #else
238 : jhr 1302 pthread_mutex_lock (&wrld->lock);
239 :     wrld->numActive -= numStabilized;
240 :     pthread_mutex_unlock (&wrld->lock);
241 : jhr 1216 #endif
242 : jhr 1215 } // end while(true)
243 : jhr 1214
244 : jhr 1198 }
245 :    
246 :     // block allocation of an initial collection of strands
247 :     Diderot_World_t *Diderot_AllocInitially (
248 : jhr 1280 const char *name, // the name of the program
249 : jhr 1276 Strand_t *strand, // the type of strands being allocated
250 :     bool isArray, // is the initialization an array or collection?
251 :     uint32_t nDims, // depth of iteration nesting
252 :     int32_t *base, // nDims array of base indices
253 :     uint32_t *size) // nDims array of iteration sizes
254 : jhr 1198 {
255 : jhr 1472 Diderot_World_t *wrld = NEW(Diderot_World_t);
256 : jhr 1198 if (wrld == 0) {
257 : jhr 1268 fprintf (stderr, "unable to allocate world\n");
258 :     exit (1);
259 : jhr 1198 }
260 :    
261 : jhr 1287 wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */
262 : jhr 1198 wrld->isArray = isArray;
263 :     wrld->nDims = nDims;
264 : jhr 1472 wrld->base = NEWVEC(int32_t, nDims);
265 :     wrld->size = NEWVEC(uint32_t, nDims);
266 : jhr 1198 size_t numStrands = 1;
267 :     for (int i = 0; i < wrld->nDims; i++) {
268 : jhr 1268 numStrands *= size[i];
269 :     wrld->base[i] = base[i];
270 :     wrld->size[i] = size[i];
271 : jhr 1198 }
272 :    
273 : jhr 1276 if (VerboseFlg) {
274 :     fprintf(stderr, "AllocInitially: %d", size[0]);
275 :     for (int i = 1; i < nDims; i++) fprintf(stderr, " x %d", size[i]);
276 :     fprintf(stderr, "\n");
277 :     }
278 : jhr 1198
279 :     // allocate the strand state pointers
280 :     wrld->numStrands = numStrands;
281 : jhr 1472 wrld->inState = NEWVEC(void *, numStrands);
282 :     wrld->outState = NEWVEC(void *, numStrands);
283 :     wrld->status = NEWVEC(uint8_t, numStrands);
284 : jhr 1214 wrld->numActive = wrld->numStrands;
285 :     wrld->nSteps = 0;
286 :     wrld->numWorkers = 0;
287 : jhr 1198
288 :     // initialize strand state pointers etc.
289 : jhr 1268 for (size_t i = 0; i < numStrands; i++) {
290 : jhr 1519 wrld->inState[i] = CheckedAlloc (strand->stateSzb);
291 :     wrld->outState[i] = CheckedAlloc (strand->stateSzb);
292 : jhr 1268 wrld->status[i] = DIDEROT_ACTIVE;
293 : jhr 1198 }
294 :    
295 : jhr 1214 pthread_mutex_init (&wrld->lock, NULL);
296 :     pthread_cond_init (&wrld->barrier, NULL);
297 :     pthread_cond_init (&wrld->mainWait, NULL);
298 : jhr 1198
299 :     return wrld;
300 :    
301 :     }
302 :    
303 :     // get strand state pointers
304 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
305 :     {
306 :     assert (i < wrld->numStrands);
307 :     return wrld->inState[i];
308 :     }
309 :    
310 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
311 :     {
312 :     assert (i < wrld->numStrands);
313 :     return wrld->outState[i];
314 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0