Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /trunk/src/lib/parallel-target/main.c
ViewVC logotype

Annotation of /trunk/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1640 - (view) (download) (as text)

1 : jhr 1232 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     * All rights reserved.
9 :     */
10 :    
11 :     #include <Diderot/diderot.h>
12 :     #include <pthread.h>
13 : jhr 1640 #include <teem/nrrd.h>
14 : jhr 1232
15 :     // #ifdef HAVE_BUILTIN_ATOMIC_OPS
16 :     // STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
17 :     // {
18 :     // return __sync_add_and_fetch(x, 1);
19 :     // }
20 :     // STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
21 :     // {
22 :     // return __sync_sub_and_fetch(x, 1);
23 :     // }
24 :     // #else
25 :     // # error atomic operations not supported
26 :     // #endif
27 :    
28 :     // The number of strands a worker will take for processing at one time
29 : jhr 1640 #define BLOCK_SIZE 4096
30 : jhr 1232
31 : jhr 1640 // cache-line alignment (Xeon has 64-byte lines)
32 :     #define CACHE_ALIGN __attribute__((aligned(64)))
33 :    
34 : jhr 1232 struct struct_world {
35 : jhr 1640 STRUCT_WORLD_PREFIX
36 : jhr 1301 void **inState;
37 :     void **outState;
38 :     uint32_t numWorkers; // number of worker threads
39 :     uint32_t nSteps; // number of super steps
40 : jhr 1232 // synchronization state
41 : jhr 1640 uint32_t nextStrand CACHE_ALIGN; // index of next strand to update
42 :     uint32_t numActive CACHE_ALIGN; // # active strands
43 :     uint32_t numAvail CACHE_ALIGN; // # unevaluated strands
44 :     uint32_t numIdle CACHE_ALIGN; // # idle workers
45 : jhr 1301 pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
46 :     pthread_cond_t barrier; // workers wait on this when they have no work
47 :     pthread_cond_t mainWait; // used to signal main when the workers have finished
48 : jhr 1232 };
49 :    
50 :     typedef struct {
51 : jhr 1301 int id;
52 :     Diderot_World_t *wrld;
53 : jhr 1640 } WorkerArg_t CACHE_ALIGN;
54 : jhr 1232
55 : jhr 1301 // NOTE: we probably should put this in a file that supports runtime printing
56 :     static bool VerboseFlg = false;
57 :     static bool TimingFlg = false;
58 : jhr 1640 static bool NrrdOutputFlg = false;
59 : jhr 1301
60 : jhr 1232 /* Function which processes active strands. */
61 :     static void *Worker (void *arg)
62 :     {
63 : jhr 1301 WorkerArg_t *myArg = (WorkerArg_t *)arg;
64 :     Diderot_World_t *wrld = myArg->wrld;
65 : jhr 1232
66 : jhr 1301 int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
67 :     int start = myArg->id * nStrandsPerWorker;
68 :     int limit;
69 :     if (wrld->numWorkers-1 == myArg->id)
70 :     limit = wrld->numStrands;
71 :     else
72 :     limit = start + nStrandsPerWorker;
73 :    
74 : jhr 1232 while (true) {
75 :     // barrier synchronization at start of super step
76 : jhr 1301 pthread_mutex_lock (&wrld->lock);
77 :     if (wrld->numIdle+1 < wrld->numWorkers) {
78 :     wrld->numIdle++;
79 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
80 :     }
81 :     else {
82 :     // all other workers are idle, so we can proceed after some initialization
83 :     wrld->numIdle = 0;
84 :     wrld->numAvail = wrld->numStrands; // includes inactive strands
85 :     wrld->nextStrand = 0;
86 :     // swap in and out
87 :     void **tmp = wrld->inState;
88 :     wrld->inState = wrld->outState;
89 :     wrld->outState = tmp;
90 :     pthread_cond_broadcast (&wrld->barrier);
91 :     }
92 :     pthread_mutex_unlock (&wrld->lock);
93 : jhr 1232
94 :     // if there are no active strands left, then we're done
95 : jhr 1301 if (wrld->numActive == 0) {
96 :     pthread_cond_signal (&wrld->mainWait);
97 :     pthread_exit (0);
98 :     }
99 : jhr 1232
100 :     // iterate until there is no more work to do
101 : jhr 1301 int blkStart, blkSize;
102 :     int numDead = 0;
103 :     do {
104 :     // grab some work
105 :     pthread_mutex_lock (&wrld->lock);
106 :     blkStart = wrld->nextStrand;
107 :     blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
108 :     wrld->numAvail -= blkSize;
109 :     wrld->nextStrand += blkSize;
110 :     pthread_mutex_unlock (&wrld->lock);
111 :     // update the strands
112 :     for (int i = blkStart; i < blkStart+blkSize; i++) {
113 :     if (! wrld->status[i]) {
114 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
115 :     switch (sts) {
116 :     case DIDEROT_STABILIZE:
117 :     wrld->status[i] = DIDEROT_STABILIZE;
118 :     break;
119 :     case DIDEROT_DIE:
120 :     wrld->status[i] = DIDEROT_DIE;
121 :     numDead++;
122 :     break;
123 :     default:
124 :     break;
125 :     }
126 :     }
127 :     else {
128 :     assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE));
129 :     }
130 :     }
131 :     } while (blkSize > 0);
132 : jhr 1232
133 :     // barrier synchronization
134 : jhr 1301 pthread_mutex_lock (&wrld->lock);
135 :     wrld->numActive -= numDead;
136 :     if (wrld->numIdle+1 < wrld->numWorkers) {
137 :     wrld->numIdle++;
138 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
139 :     }
140 :     else {
141 :     // all other workers are idle, so we can proceed
142 :     wrld->numIdle = 0;
143 :     pthread_cond_broadcast (&wrld->barrier);
144 :     wrld->nSteps++;
145 :     }
146 :     pthread_mutex_unlock (&wrld->lock);
147 : jhr 1232
148 :     /**** If there is a global computation phase, it goes here ****/
149 :    
150 :     // stabilize any threads that need stabilization. Each worker is responsible for
151 :     // a contiguous region of the strands
152 : jhr 1640 // FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking
153 : jhr 1301 int numStabilized = 0;
154 :     for (int i = start; i < limit; i++) {
155 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
156 : jhr 1640 // stabilize the strand's state. Note that the outState has been set by
157 :     // the last call to update, so we make the inState be the target of the
158 :     // stabilize method.
159 :     Diderot_Strands[0]->stabilize(wrld->outState[i], wrld->inState[i]);
160 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
161 : jhr 1301 wrld->status[i] = DIDEROT_STABLE;
162 :     numStabilized++;
163 :     }
164 :     }
165 :     // adjust the numActive count
166 : jhr 1232 #if defined(HAVE_BUILTIN_ATOMIC_OPS)
167 : jhr 1301 __sync_fetch_and_sub(&wrld->numActive, numStabilized);
168 : jhr 1232 #else
169 : jhr 1301 pthread_mutex_lock (&wrld->lock);
170 :     wrld->numActive -= numStabilized;
171 :     pthread_mutex_unlock (&wrld->lock);
172 : jhr 1232 #endif
173 :     } // end while(true)
174 :    
175 :     }
176 :    
177 :    
178 :     int main (int argc, const char **argv)
179 :     {
180 : jhr 1301 CPUInfo_t cpuInfo;
181 :     if (! GetNumCPUs (&cpuInfo)) {
182 :     fprintf(stderr, "unable to get number of processors\n");
183 :     exit (1);
184 :     }
185 : jhr 1232
186 : jhr 1301 Diderot_int_t np = cpuInfo.numHWCores;
187 :    
188 :     Diderot_Options_t *opts = Diderot_OptNew ();
189 :    
190 :     Diderot_OptAddFlag (opts, "verbose", "enable runtime-system messages", &VerboseFlg);
191 :     Diderot_OptAddFlag (opts, "timing", "enable execution timing", &TimingFlg);
192 : jhr 1640 Diderot_OptAddFlag (opts, "nrrd", "enable nrrd output", &NrrdOutputFlg);
193 : jhr 1301 Diderot_OptAddInt (opts, "np", "specify number of threads", &np, true);
194 :     Diderot_RegisterGlobalOpts (opts);
195 :     Diderot_OptProcess (opts, argc, argv);
196 :     Diderot_OptFree (opts);
197 :    
198 :     if (VerboseFlg) fprintf (stderr, "initializing globals ...\n");
199 :     Diderot_InitGlobals ();
200 :    
201 :     if (VerboseFlg) fprintf (stderr, "initializing strands ...\n");
202 : jhr 1232 Diderot_World_t *wrld = Diderot_Initially ();
203 : jhr 1640
204 : jhr 1232 for (int i = 0; i < wrld->numStrands; i++) {
205 :     // hack to make the invariant part of the state the same in both copies
206 : jhr 1301 memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
207 : jhr 1232 }
208 :    
209 :     // Start worker threads
210 : jhr 1301 int nWorkers = np;
211 : jhr 1232 WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t));
212 : jhr 1301 if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers);
213 : jhr 1640 double t0 = airTime();
214 : jhr 1232 wrld->numWorkers = nWorkers;
215 :     wrld->numIdle = 0;
216 :     for (int i = 0; i < nWorkers; i++) {
217 : jhr 1301 pthread_t pid;
218 :     args[i].wrld = wrld;
219 :     args[i].id = i;
220 :     if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
221 :     fprintf (stderr, "unable to create worker thread\n");
222 :     exit (1);
223 :     }
224 :     pthread_detach (pid);
225 : jhr 1232 }
226 :    
227 :     // wait for the computation to finish
228 :     pthread_mutex_lock (&wrld->lock);
229 : jhr 1301 pthread_cond_wait (&wrld->mainWait, &wrld->lock);
230 : jhr 1232 pthread_mutex_unlock (&wrld->lock);
231 :    
232 : jhr 1640 double totalTime = airTime() - t0;
233 : jhr 1301
234 :     if (VerboseFlg)
235 :     fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime);
236 :     else if (TimingFlg)
237 :     printf ("np=%d usr=%f\n", nWorkers, totalTime);
238 :    
239 : jhr 1640 // output the final strand states
240 :     if (NrrdOutputFlg)
241 :     Diderot_Output (wrld, Diderot_Strands[0]->outputSzb);
242 :     else
243 :     Diderot_Print (wrld);
244 : jhr 1232
245 :     Diderot_Shutdown (wrld);
246 :    
247 :     return 0;
248 :    
249 :     }
250 :    
251 :     // block allocation of an initial collection of strands
252 :     Diderot_World_t *Diderot_AllocInitially (
253 : jhr 1301 const char *name, // the name of the program
254 :     Strand_t *strand, // the type of strands being allocated
255 :     bool isArray, // is the initialization an array or collection?
256 :     uint32_t nDims, // depth of iteration nesting
257 :     int32_t *base, // nDims array of base indices
258 :     uint32_t *size) // nDims array of iteration sizes
259 : jhr 1232 {
260 : jhr 1640 Diderot_World_t *wrld = NEW(Diderot_World_t);
261 : jhr 1232 if (wrld == 0) {
262 : jhr 1301 fprintf (stderr, "unable to allocate world\n");
263 :     exit (1);
264 : jhr 1232 }
265 :    
266 : jhr 1301 wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */
267 : jhr 1232 wrld->isArray = isArray;
268 :     wrld->nDims = nDims;
269 : jhr 1640 wrld->base = NEWVEC(int32_t, nDims);
270 :     wrld->size = NEWVEC(uint32_t, nDims);
271 : jhr 1232 size_t numStrands = 1;
272 :     for (int i = 0; i < wrld->nDims; i++) {
273 : jhr 1301 numStrands *= size[i];
274 :     wrld->base[i] = base[i];
275 :     wrld->size[i] = size[i];
276 : jhr 1232 }
277 :    
278 : jhr 1301 if (VerboseFlg) {
279 :     fprintf(stderr, "AllocInitially: %d", size[0]);
280 :     for (int i = 1; i < nDims; i++) fprintf(stderr, " x %d", size[i]);
281 :     fprintf(stderr, "\n");
282 :     }
283 : jhr 1232
284 :     // allocate the strand state pointers
285 :     wrld->numStrands = numStrands;
286 : jhr 1640 wrld->inState = NEWVEC(void *, numStrands);
287 :     wrld->outState = NEWVEC(void *, numStrands);
288 :     wrld->status = NEWVEC(uint8_t, numStrands);
289 : jhr 1232 wrld->numActive = wrld->numStrands;
290 :     wrld->nSteps = 0;
291 :     wrld->numWorkers = 0;
292 :    
293 :     // initialize strand state pointers etc.
294 : jhr 1301 for (size_t i = 0; i < numStrands; i++) {
295 : jhr 1640 wrld->inState[i] = CheckedAlloc (strand->stateSzb);
296 :     wrld->outState[i] = CheckedAlloc (strand->stateSzb);
297 : jhr 1301 wrld->status[i] = DIDEROT_ACTIVE;
298 : jhr 1232 }
299 :    
300 :     pthread_mutex_init (&wrld->lock, NULL);
301 :     pthread_cond_init (&wrld->barrier, NULL);
302 :     pthread_cond_init (&wrld->mainWait, NULL);
303 :    
304 :     return wrld;
305 :    
306 :     }
307 :    
308 :     // get strand state pointers
309 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
310 :     {
311 :     assert (i < wrld->numStrands);
312 :     return wrld->inState[i];
313 :     }
314 :    
315 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
316 :     {
317 :     assert (i < wrld->numStrands);
318 :     return wrld->outState[i];
319 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0