Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1214 - (view) (download) (as text)

1 : jhr 1198 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     * All rights reserved.
9 :     */
10 :    
11 :     #include <string.h>
12 :     #include <stdio.h>
13 :     #include <assert.h>
14 :     #include <Diderot/diderot.h>
15 :     #include <pthread.h>
16 :    
17 : jhr 1209 #ifdef HAVE_BUILTIN_ATOMIC_OPS
18 :     STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
19 :     {
20 :     return __sync_add_and_fetch(x, 1);
21 :     }
22 :     STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
23 :     {
24 :     return __sync_sub_and_fetch(x, 1);
25 :     }
26 :     #else
27 :     # error atomic operations not supported
28 :     #endif
29 :    
30 : jhr 1198 // The number of strands a worker will take for processing at one time
31 : jhr 1214 #define BLOCK_SIZE 256
32 : jhr 1198
33 :     struct struct_world {
34 : jhr 1214 bool isArray; // is the initialization an array or collection?
35 :     uint32_t nDims; // depth of iteration nesting
36 :     int32_t *base; // nDims array of base indices
37 :     uint32_t *size; // nDims array of iteration sizes
38 :     uint32_t numStrands; // number of strands in the world
39 :     void **inState;
40 :     void **outState;
41 :     uint8_t *status; // array of strand status flags
42 :     uint32_t numWorkers; // number of worker threads
43 :     uint32_t nSteps; // number of super steps
44 :     // synchronization state
45 :     uint32_t nextStrand __attribute__((aligned(64))); // index of next strand to update
46 :     uint32_t numActive __attribute__((aligned(64))); // # active strands
47 :     uint32_t numAvail __attribute__((aligned(64))); // # unevaluated strands
48 :     uint32_t numIdle __attribute__((aligned(64))); // # idle workers
49 :     pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
50 :     pthread_cond_t barrier; // workers wait on this when they have no work
51 :     pthread_cond_t mainWait; // used to signal main when the workers have finished
52 : jhr 1198 };
53 :    
54 : jhr 1214 typedef struct {
55 :     int id;
56 :     Diderot_World_t *wrld;
57 :     } WorkerArg_t;
58 : jhr 1198
59 :     /* Function which processes active strands. */
60 : jhr 1214 static void *Worker (void *arg)
61 : jhr 1198 {
62 : jhr 1214 WorkerArg_t *myArg = (WorkerArg_t *)arg;
63 :     Diderot_World_t *wrld = myArg->wrld;
64 :     bool existsStabilizing;
65 : jhr 1198
66 :     while (true) {
67 : jhr 1214 // barrier synchronization at start of super step
68 :     pthread_mutex_lock (&wrld->lock);
69 :     if (wrld->numIdle+1 < wrld->numWorkers) {
70 :     wrld->numIdle++;
71 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
72 : jhr 1198 }
73 : jhr 1214 else {
74 :     // all other workers are idle, so we can proceed after some initialization
75 :     wrld->numIdle = 0;
76 :     wrld->numAvail = wrld->numActive;
77 :     wrld->nextStrand = 0;
78 :     // swap in and out
79 :     void **tmp = wrld->inState;
80 :     wrld->inState = wrld->outState;
81 :     wrld->outState = tmp;
82 :     pthread_cond_broadcast (&wrld->barrier);
83 : jhr 1198 }
84 : jhr 1214 pthread_mutex_unlock (&wrld->lock);
85 : jhr 1198
86 : jhr 1214 // if there are no active strands left, then we're done
87 :     if (wrld->numActive == 0) {
88 :     pthread_cond_signal (&wrld->mainWait);
89 :     pthread_exit (0);
90 : jhr 1198 }
91 :    
92 : jhr 1214 // iterate until there is no more work to do
93 :     int blkStart, blkSize;
94 :     existsStabilizing = false;
95 :     do {
96 :     // grab some work
97 :     pthread_mutex_lock (&wrld->lock);
98 :     if (wrld->numAvail > 0) {
99 :     blkStart = wrld->nextStrand;
100 :     blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
101 :     wrld->numAvail -= blkSize;
102 :     }
103 :     pthread_mutex_unlock (&wrld->lock);
104 :     // update the strands
105 :     for (int i = blkStart; i < blkStart+blkSize; i++) {
106 :     if (! wrld->status[i]) {
107 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
108 :     switch (sts) {
109 :     case DIDEROT_STABILIZE:
110 :     wrld->status[i] = DIDEROT_STABILIZE;
111 :     existsStabilizing = true;
112 :     break;
113 :     case DIDEROT_DIE:
114 :     wrld->status[i] = DIDEROT_DIE;
115 :     AtomicDec(&wrld->numActive);
116 :     break;
117 :     default:
118 :     break;
119 :     }
120 :     }
121 :     }
122 :     } while (blkSize > 0);
123 : jhr 1198
124 : jhr 1214 // barrier synchronization
125 :     pthread_mutex_lock (&wrld->lock);
126 :     if (wrld->numIdle+1 < wrld->numWorkers) {
127 :     wrld->numIdle++;
128 :     pthread_cond_wait (&wrld->barrier, &wrld->lock);
129 :     }
130 :     else {
131 :     // all other workers are idle, so we can proceed
132 :     wrld->numIdle = 0;
133 :     wrld->numAvail = wrld->numActive;
134 :     pthread_cond_broadcast (&wrld->barrier);
135 :     }
136 :     pthread_mutex_unlock (&wrld->lock);
137 :    
138 :     /**** If there is a global computation phase, it goes here ****/
139 :    
140 :     // stabilize any threads that need stabilization. Each worker is responsible for
141 :     // a contiguous region of the strands
142 :     if (existsStabilizing) {
143 :     int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
144 :     int start = myArg->id * nStrandsPerWorker;
145 :     int limit = start + nStrandsPerWorker;
146 :     if (limit > wrld->numStrands) limit = wrld->numStrands;
147 :     for (int i = start; i < limit; i++) {
148 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
149 :     // copy out to in so that both copies are the stable state
150 :     memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
151 :     wrld->status[i] = DIDEROT_STABLE;
152 :     AtomicDec (&wrld->numActive);
153 : jhr 1198 }
154 :     }
155 :     }
156 :     }
157 : jhr 1214
158 : jhr 1198 }
159 :    
160 :    
161 :     int main (int argc, const char **argv)
162 :     {
163 : jhr 1214 fprintf (stderr, "initializing globals ...\n");
164 :     Diderot_InitGlobals (argc, argv);
165 : jhr 1198
166 : jhr 1214 fprintf (stderr, "initializing strands ...\n");
167 : jhr 1198 Diderot_World_t *wrld = Diderot_Initially ();
168 :     for (int i = 0; i < wrld->numStrands; i++) {
169 :     // hack to make the invariant part of the state the same in both copies
170 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
171 :     }
172 :    
173 :     CPUInfo_t cpuInfo;
174 :     if (! GetNumCPUs (&cpuInfo)) {
175 :     fprintf(stderr, "unable to get number of processors\n");
176 :     exit (1);
177 :     }
178 : jhr 1214
179 :     // Start worker threads
180 : jhr 1198 int nWorkers = cpuInfo.numHWCores;
181 : jhr 1214 WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t));
182 : jhr 1209 printf ("initializing %d workers ...\n", nWorkers);
183 : jhr 1214 wrld->numWorkers = nWorkers;
184 :     wrld->numIdle = 0;
185 : jhr 1198 for (int i = 0; i < nWorkers; i++) {
186 : jhr 1214 pthread_t pid;
187 :     args[i].wrld = wrld;
188 :     args[i].id = i;
189 :     if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
190 :     fprintf (stderr, "unable to create worker thread\n");
191 :     exit (1);
192 :     }
193 :     pthread_detach (pid);
194 : jhr 1198 }
195 :    
196 : jhr 1214 // wait for the computation to finish
197 :     pthread_mutex_lock (&wrld->lock);
198 :     pthread_cond_wait (&wrld->mainWait, &wrld->lock);
199 :     pthread_mutex_unlock (&wrld->lock);
200 : jhr 1198
201 : jhr 1214 fprintf (stderr, "done: %d steps\n", wrld->nSteps);
202 : jhr 1198 // here we have the final state of all of the strands in the "in" buffer
203 :     FILE *outS = fopen("mip.txt", "w");
204 :     if (outS == NULL) {
205 :     fprintf (stderr, "Cannot open output file\n");
206 :     exit (8);
207 :     }
208 :    
209 :     for (int i = 0; i < wrld->numStrands; i++) {
210 :     if (wrld->status[i] == DIDEROT_STABLE)
211 :     Diderot_Strands[0]->print (outS, wrld->inState[i]);
212 :     }
213 :     fclose (outS);
214 :    
215 : jhr 1214 Diderot_Shutdown (wrld);
216 :    
217 : jhr 1198 return 0;
218 :    
219 :     }
220 :    
221 :    
222 :     // this should be the part of the scheduler
223 :     void *Diderot_AllocStrand (Strand_t *strand)
224 :     {
225 :     return malloc(strand->stateSzb);
226 :     }
227 :    
228 :     // block allocation of an initial collection of strands
229 :     Diderot_World_t *Diderot_AllocInitially (
230 : jhr 1209 Strand_t *strand, // the type of strands being allocated
231 : jhr 1198 bool isArray, // is the initialization an array or collection?
232 :     uint32_t nDims, // depth of iteration nesting
233 :     int32_t *base, // nDims array of base indices
234 :     uint32_t *size) // nDims array of iteration sizes
235 :     {
236 :     Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
237 :     if (wrld == 0) {
238 :     fprintf (stderr, "unable to allocate world\n");
239 :     exit (1);
240 :     }
241 :    
242 :     wrld->isArray = isArray;
243 :     wrld->nDims = nDims;
244 :     wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
245 :     wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t));
246 :     size_t numStrands = 1;
247 :     for (int i = 0; i < wrld->nDims; i++) {
248 :     numStrands *= size[i];
249 :     wrld->base[i] = base[i];
250 :     wrld->size[i] = size[i];
251 :     }
252 :    
253 : jhr 1214 fprintf(stderr, "AllocInitially: %d", size[0]);
254 :     for (int i = 1; i < nDims; i++)
255 :     fprintf(stderr, " x %d", size[i]);
256 :     fprintf(stderr, "\n");
257 : jhr 1198
258 :     // allocate the strand state pointers
259 :     wrld->numStrands = numStrands;
260 :     wrld->inState = (void **) malloc (numStrands * sizeof(void *));
261 :     wrld->outState = (void **) malloc (numStrands * sizeof(void *));
262 :     wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
263 :     if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
264 :     fprintf (stderr, "unable to allocate strand states\n");
265 :     exit (1);
266 :     }
267 : jhr 1214 wrld->numActive = wrld->numStrands;
268 :     wrld->nSteps = 0;
269 :     wrld->numWorkers = 0;
270 : jhr 1198
271 :     // initialize strand state pointers etc.
272 :     for (size_t i = 0; i < numStrands; i++) {
273 :     wrld->inState[i] = Diderot_AllocStrand (strand);
274 :     wrld->outState[i] = Diderot_AllocStrand (strand);
275 :     wrld->status[i] = DIDEROT_ACTIVE;
276 :     }
277 :    
278 : jhr 1214 pthread_mutex_init (&wrld->lock, NULL);
279 :     pthread_cond_init (&wrld->barrier, NULL);
280 :     pthread_cond_init (&wrld->mainWait, NULL);
281 : jhr 1198
282 :     return wrld;
283 :    
284 :     }
285 :    
286 :     // get strand state pointers
287 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
288 :     {
289 :     assert (i < wrld->numStrands);
290 :     return wrld->inState[i];
291 :     }
292 :    
293 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
294 :     {
295 :     assert (i < wrld->numStrands);
296 :     return wrld->outState[i];
297 :     }
298 :    
299 :     bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
300 :     {
301 :     assert (i < wrld->numStrands);
302 :     return !wrld->status[i];
303 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0