Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /trunk/src/lib/main-parallel.c
ViewVC logotype

Annotation of /trunk/src/lib/main-parallel.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1187 - (view) (download) (as text)

1 : nseltzer 1187 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     * All rights reserved.
9 :     */
10 :    
11 :     #include <string.h>
12 :     #include <stdio.h>
13 :     #include <assert.h>
14 :     #include <Diderot/diderot.h>
15 :     #include <pthread.h>
16 :     #include <semaphore.h>
17 :    
18 :     // The number of workers in the thread pool
19 :     #define NTHREADS 3
20 :    
21 :     // The number of strands a worker will take for processing at one time
22 :     #define BLOCK_SIZE 32
23 :    
24 :     struct struct_world {
25 :     bool isArray; // is the initialization an array or collection?
26 :     uint32_t nDims; // depth of iteration nesting
27 :     int32_t *base; // nDims array of base indices
28 :     uint32_t *size; // nDims array of iteration sizes
29 :     uint32_t numStrands; // number of strands in the world
30 :     sem_t numActive; // number of active strands in the world
31 :     void **inState;
32 :     void **outState;
33 :     uint8_t *status; // array of strand status flags
34 :     uint32_t numThreads; // number of worker threads
35 :     pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
36 :     pthread_cond_t workers; // workers wait on this when they have no work
37 :     uint8_t nWorkers; // number of workers waiting for work to do
38 :     pthread_cond_t main; // used to signal main when the workers have finished
39 :     sem_t stabilizing;// number of strands stabilizing this step
40 :     uint32_t nextStrand; // the next strand to be processed this iteration
41 :     };
42 :    
43 :     extern float getOutf (void *self);
44 :    
45 :     /* Function which processes active strands. */
46 :     void * worker_func(void *arg)
47 :     {
48 :     Diderot_World_t *wrld = (Diderot_World_t *) arg;
49 :    
50 :     pthread_detach(pthread_self());
51 :    
52 :     while(true)
53 :     {
54 :     pthread_mutex_lock(&wrld->lock);
55 :    
56 :     // If there is no more work to do this iteration, we wait
57 :     while(wrld->nextStrand == wrld->numStrands)
58 :     {
59 :     wrld->nWorkers++;
60 :     if(wrld->nWorkers == wrld->numThreads)
61 :     {
62 :     pthread_cond_signal(&wrld->main);
63 :     }
64 :     pthread_cond_wait(&wrld->workers, &wrld->lock);
65 :     int nActive = 0;
66 :     sem_getvalue(&wrld->numActive, &nActive);
67 :     if(nActive == 0)
68 :     {
69 :     pthread_mutex_unlock(&wrld->lock);
70 :     wrld->numThreads--;
71 :     if(wrld->nWorkers == wrld->numThreads)
72 :     {
73 :     pthread_cond_signal(&wrld->main);
74 :     }
75 :     pthread_exit(0);
76 :     }
77 :     }
78 :    
79 :     // Get BLOCK_SIZE active strands to process
80 :     uint32_t to_process = wrld->nextStrand;
81 :     uint32_t end = to_process;
82 :     uint32_t num = 0;
83 :     while(end != wrld->numStrands && num < BLOCK_SIZE)
84 :     {
85 :     if(! wrld->status[end]) num++;
86 :     end++;
87 :     }
88 :     wrld->nextStrand = end;
89 :    
90 :     pthread_mutex_unlock(&wrld->lock);
91 :    
92 :     // Update the strands in this block
93 :     for(int i = to_process; i < end; i++)
94 :     {
95 :     if (! wrld->status[i])
96 :     {
97 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
98 :     switch (sts)
99 :     {
100 :     case DIDEROT_STABILIZE:
101 :     wrld->status[i] = DIDEROT_STABILIZE;
102 :     sem_post(&wrld->stabilizing);
103 :     sem_wait(&wrld->numActive);
104 :     break;
105 :     case DIDEROT_DIE:
106 :     wrld->status[i] = DIDEROT_DIE;
107 :     sem_wait(&wrld->numActive);
108 :     break;
109 :     }
110 :     }
111 :     }
112 :     }
113 :     }
114 :    
115 :    
116 :     int main (int argc, const char **argv)
117 :     {
118 :     printf("initializing globals ...\n");
119 :     Diderot_InitGlobals ();
120 :    
121 :     printf("initializing strands ...\n");
122 :     Diderot_World_t *wrld = Diderot_Initially ();
123 :    
124 :     for (int i = 0; i < wrld->numStrands; i++)
125 :     {
126 :     // hack to make the invariant part of the state the same in both copies
127 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
128 :     }
129 :    
130 :     // Start worker threads
131 :     pthread_t workers[NTHREADS];
132 :    
133 :     wrld->numThreads = NTHREADS;
134 :     for (int i = 0; i < NTHREADS; i++)
135 :     {
136 :     pthread_create(&workers[i], NULL, worker_func, wrld);
137 :     }
138 :    
139 :     int nSteps = 0;
140 :     int nActive = 0;
141 :     sem_getvalue(&wrld->numActive, &nActive);
142 :     while(nActive > 0)
143 :     {
144 :     nSteps++;
145 :    
146 :     pthread_mutex_lock(&wrld->lock);
147 :    
148 :     // Wait until all workers have finished this iteration
149 :     while(wrld->nWorkers != wrld->numThreads)
150 :     {
151 :     pthread_cond_wait(&wrld->main, &wrld->lock);
152 :     }
153 :     wrld->nWorkers = 0;
154 :    
155 :     // Handle stabilizing strands
156 :     int numStabilizing = 0;
157 :     sem_getvalue(&wrld->stabilizing, &numStabilizing);
158 :     for (int i = 0; i < wrld->numStrands && numStabilizing > 0; i++)
159 :     {
160 :     if (wrld->status[i] == DIDEROT_STABILIZE)
161 :     {
162 :     sem_wait(&wrld->stabilizing);
163 :     memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
164 :     wrld->status[i] = DIDEROT_STABLE;
165 :     numStabilizing--;
166 :     }
167 :     }
168 :    
169 :     sem_getvalue(&wrld->numActive, &nActive);
170 :    
171 :     // Do any global update stuff that needs doing
172 :     void **tmp = wrld->inState;
173 :     wrld->inState = wrld->outState;
174 :     wrld->outState = tmp;
175 :    
176 :     wrld->nextStrand = 0;
177 :    
178 :     // Wake up all the workers to process the next step
179 :     pthread_cond_broadcast(&wrld->workers);
180 :     pthread_mutex_unlock(&wrld->lock);
181 :     }
182 :    
183 :     printf("done: %d steps\n", nSteps);
184 :     // here we have the final state of all of the strands in the "in" buffer
185 :     FILE *outS = fopen("mip.txt", "w");
186 :     if (outS == NULL) {
187 :     fprintf (stderr, "Cannot open output file\n");
188 :     exit (8);
189 :     }
190 :    
191 :     for (int i = 0; i < wrld->numStrands; i++) {
192 :     if (wrld->status[i] == DIDEROT_STABLE)
193 :     Diderot_Strands[0]->print (outS, wrld->inState[i]);
194 :     }
195 :     fclose (outS);
196 :    
197 :     return 0;
198 :    
199 :     }
200 :    
201 :    
202 :     // this should be the part of the scheduler
203 :     void *Diderot_AllocStrand (Strand_t *strand)
204 :     {
205 :     return malloc(strand->stateSzb);
206 :     }
207 :    
208 :     // block allocation of an initial collection of strands
209 :     Diderot_World_t *Diderot_AllocInitially (
210 :     Strand_t *strand, // the type of strands being allocated
211 :     bool isArray, // is the initialization an array or collection?
212 :     uint32_t nDims, // depth of iteration nesting
213 :     int32_t *base, // nDims array of base indices
214 :     uint32_t *size) // nDims array of iteration sizes
215 :     {
216 :     Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
217 :     if (wrld == 0) {
218 :     fprintf (stderr, "unable to allocate world\n");
219 :     exit (1);
220 :     }
221 :    
222 :     wrld->isArray = isArray;
223 :     wrld->nDims = nDims;
224 :     wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
225 :     wrld->size = (int32_t *) malloc (nDims * sizeof(int32_t));
226 :     size_t numStrands = 1;
227 :     for (int i = 0; i < wrld->nDims; i++) {
228 :     numStrands *= size[i];
229 :     wrld->base[i] = base[i];
230 :     wrld->size[i] = size[i];
231 :     }
232 :    
233 :     printf("AllocInitially: %d", size[0]);
234 :     for (int i = 1; i < nDims; i++) printf(" x %d", size[i]);
235 :     printf("\n");
236 :    
237 :     // allocate the strand state pointers
238 :     wrld->numStrands = numStrands;
239 :     sem_init(&wrld->numActive, 0, numStrands);
240 :     wrld->inState = (void **) malloc (numStrands * sizeof(void *));
241 :     wrld->outState = (void **) malloc (numStrands * sizeof(void *));
242 :     wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
243 :     if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
244 :     fprintf (stderr, "unable to allocate strand states\n");
245 :     exit (1);
246 :     }
247 :    
248 :     // initialize strand state pointers etc.
249 :     for (size_t i = 0; i < numStrands; i++) {
250 :     wrld->inState[i] = Diderot_AllocStrand (strand);
251 :     wrld->outState[i] = Diderot_AllocStrand (strand);
252 :     wrld->status[i] = DIDEROT_ACTIVE;
253 :     }
254 :    
255 :     wrld->numThreads = 0;
256 :     pthread_mutex_init(&wrld->lock, NULL);
257 :     pthread_cond_init(&wrld->workers, NULL);
258 :     pthread_cond_init(&wrld->main, NULL);
259 :     wrld->nWorkers = 0;
260 :     sem_init(&wrld->stabilizing, 0, 0);
261 :     wrld->nextStrand = 0;
262 :    
263 :     return wrld;
264 :    
265 :     }
266 :    
267 :     // get strand state pointers
268 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
269 :     {
270 :     assert (i < wrld->numStrands);
271 :     return wrld->inState[i];
272 :     }
273 :    
274 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
275 :     {
276 :     assert (i < wrld->numStrands);
277 :     return wrld->outState[i];
278 :     }
279 :    
280 :     bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
281 :     {
282 :     assert (i < wrld->numStrands);
283 :     return !wrld->status[i];
284 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0