Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1209 - (view) (download) (as text)

1 : jhr 1198 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     * All rights reserved.
9 :     */
10 :    
11 :     #include <string.h>
12 :     #include <stdio.h>
13 :     #include <assert.h>
14 :     #include <Diderot/diderot.h>
15 :     #include <pthread.h>
16 :    
17 : jhr 1209 #ifdef HAVE_BUILTIN_ATOMIC_OPS
18 :     STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
19 :     {
20 :     return __sync_add_and_fetch(x, 1);
21 :     }
22 :     STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
23 :     {
24 :     return __sync_sub_and_fetch(x, 1);
25 :     }
26 :     #else
27 :     # error atomic operations not supported
28 :     #endif
29 :    
30 : jhr 1198 // The number of strands a worker will take for processing at one time
31 :     #define BLOCK_SIZE 32
32 :    
33 :     struct struct_world {
34 :     bool isArray; // is the initialization an array or collection?
35 :     uint32_t nDims; // depth of iteration nesting
36 :     int32_t *base; // nDims array of base indices
37 :     uint32_t *size; // nDims array of iteration sizes
38 :     uint32_t numStrands; // number of strands in the world
39 : jhr 1209 uint32_t numActive; // number of active strands in the world
40 : jhr 1198 void **inState;
41 :     void **outState;
42 :     uint8_t *status; // array of strand status flags
43 :     uint32_t numThreads; // number of worker threads
44 :     pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
45 :     pthread_cond_t workers; // workers wait on this when they have no work
46 :     uint8_t nWorkers; // number of workers waiting for work to do
47 :     pthread_cond_t main; // used to signal main when the workers have finished
48 :     sem_t stabilizing;// number of strands stabilizing this step
49 :     uint32_t nextStrand; // the next strand to be processed this iteration
50 :     };
51 :    
52 :     extern float getOutf (void *self);
53 :    
54 :     /* Function which processes active strands. */
55 :     void *worker_func (void *arg)
56 :     {
57 :     Diderot_World_t *wrld = (Diderot_World_t *) arg;
58 :    
59 :     pthread_detach(pthread_self());
60 :    
61 :     while (true) {
62 :     pthread_mutex_lock(&wrld->lock);
63 :    
64 : jhr 1209 // If there is no more work to do this iteration, we wait
65 :     while(wrld->nextStrand == wrld->numStrands) {
66 : jhr 1198 wrld->nWorkers++;
67 :     if (wrld->nWorkers == wrld->numThreads) {
68 : jhr 1209 pthread_cond_signal (&wrld->main);
69 : jhr 1198 }
70 : jhr 1209 pthread_cond_wait (&wrld->workers, &wrld->lock);
71 : jhr 1198 int nActive = 0;
72 :     sem_getvalue(&wrld->numActive, &nActive);
73 :     if (nActive == 0) {
74 : jhr 1209 pthread_mutex_unlock (&wrld->lock);
75 : jhr 1198 wrld->numThreads--;
76 :     if (wrld->nWorkers == wrld->numThreads) {
77 : jhr 1209 pthread_cond_signal (&wrld->main);
78 : jhr 1198 }
79 :     pthread_exit(0);
80 :     }
81 :     }
82 :    
83 :     // Get BLOCK_SIZE active strands to process
84 :     uint32_t to_process = wrld->nextStrand;
85 :     uint32_t end = to_process;
86 :     uint32_t num = 0;
87 :     while (end != wrld->numStrands && num < BLOCK_SIZE) {
88 :     if(! wrld->status[end]) num++;
89 :     end++;
90 :     }
91 :     wrld->nextStrand = end;
92 :    
93 :     pthread_mutex_unlock(&wrld->lock);
94 :    
95 :     // Update the strands in this block
96 :     for (int i = to_process; i < end; i++) {
97 :     if (! wrld->status[i]) {
98 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
99 :     switch (sts) {
100 :     case DIDEROT_STABILIZE:
101 :     wrld->status[i] = DIDEROT_STABILIZE;
102 :     sem_post(&wrld->stabilizing);
103 :     sem_wait(&wrld->numActive);
104 :     break;
105 :     case DIDEROT_DIE:
106 :     wrld->status[i] = DIDEROT_DIE;
107 :     sem_wait(&wrld->numActive);
108 :     break;
109 :     default:
110 :     break;
111 :     }
112 :     }
113 :     }
114 :     }
115 :     }
116 :    
117 :    
118 :     int main (int argc, const char **argv)
119 :     {
120 :     printf ("initializing globals ...\n");
121 :     Diderot_InitGlobals ();
122 :    
123 :     printf ("initializing strands ...\n");
124 :     Diderot_World_t *wrld = Diderot_Initially ();
125 :    
126 :     for (int i = 0; i < wrld->numStrands; i++) {
127 :     // hack to make the invariant part of the state the same in both copies
128 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
129 :     }
130 :    
131 :     CPUInfo_t cpuInfo;
132 :     if (! GetNumCPUs (&cpuInfo)) {
133 :     fprintf(stderr, "unable to get number of processors\n");
134 :     exit (1);
135 :     }
136 :     // Start worker threads
137 :     int nWorkers = cpuInfo.numHWCores;
138 :     pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t));
139 :    
140 : jhr 1209 printf ("initializing %d workers ...\n", nWorkers);
141 : jhr 1198 wrld->numThreads = nWorkers;
142 :     for (int i = 0; i < nWorkers; i++) {
143 :     pthread_create (&workers[i], NULL, worker_func, wrld);
144 :     }
145 :    
146 :     int nSteps = 0;
147 :     int nActive = 0;
148 : jhr 1209 sem_getvalue (&wrld->numActive, &nActive);
149 :     while (nActive > 0) {
150 : jhr 1198 nSteps++;
151 :    
152 :     pthread_mutex_lock(&wrld->lock);
153 :    
154 : jhr 1209 // Wait until all workers have finished this iteration
155 :     while (wrld->nWorkers != wrld->numThreads) {
156 : jhr 1198 pthread_cond_wait(&wrld->main, &wrld->lock);
157 :     }
158 :     wrld->nWorkers = 0;
159 :    
160 : jhr 1209 // Handle stabilizing strands
161 : jhr 1198 int numStabilizing = 0;
162 : jhr 1209 sem_getvalue (&wrld->stabilizing, &numStabilizing);
163 : jhr 1198 for (int i = 0; i < wrld->numStrands && numStabilizing > 0; i++) {
164 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
165 :     sem_wait(&wrld->stabilizing);
166 :     memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
167 :     wrld->status[i] = DIDEROT_STABLE;
168 :     numStabilizing--;
169 :     }
170 :     }
171 :    
172 :     sem_getvalue(&wrld->numActive, &nActive);
173 :    
174 :     // Do any global update stuff that needs doing
175 :     void **tmp = wrld->inState;
176 :     wrld->inState = wrld->outState;
177 :     wrld->outState = tmp;
178 :    
179 :     wrld->nextStrand = 0;
180 :    
181 :     // Wake up all the workers to process the next step
182 :     pthread_cond_broadcast(&wrld->workers);
183 :     pthread_mutex_unlock(&wrld->lock);
184 :     }
185 :    
186 :     printf("done: %d steps\n", nSteps);
187 :     // here we have the final state of all of the strands in the "in" buffer
188 :     FILE *outS = fopen("mip.txt", "w");
189 :     if (outS == NULL) {
190 :     fprintf (stderr, "Cannot open output file\n");
191 :     exit (8);
192 :     }
193 :    
194 :     for (int i = 0; i < wrld->numStrands; i++) {
195 :     if (wrld->status[i] == DIDEROT_STABLE)
196 :     Diderot_Strands[0]->print (outS, wrld->inState[i]);
197 :     }
198 :     fclose (outS);
199 :    
200 :     return 0;
201 :    
202 :     }
203 :    
204 :    
205 :     // this should be the part of the scheduler
206 :     void *Diderot_AllocStrand (Strand_t *strand)
207 :     {
208 :     return malloc(strand->stateSzb);
209 :     }
210 :    
211 :     // block allocation of an initial collection of strands
212 :     Diderot_World_t *Diderot_AllocInitially (
213 : jhr 1209 Strand_t *strand, // the type of strands being allocated
214 : jhr 1198 bool isArray, // is the initialization an array or collection?
215 :     uint32_t nDims, // depth of iteration nesting
216 :     int32_t *base, // nDims array of base indices
217 :     uint32_t *size) // nDims array of iteration sizes
218 :     {
219 :     Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
220 :     if (wrld == 0) {
221 :     fprintf (stderr, "unable to allocate world\n");
222 :     exit (1);
223 :     }
224 :    
225 :     wrld->isArray = isArray;
226 :     wrld->nDims = nDims;
227 :     wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
228 :     wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t));
229 :     size_t numStrands = 1;
230 :     for (int i = 0; i < wrld->nDims; i++) {
231 :     numStrands *= size[i];
232 :     wrld->base[i] = base[i];
233 :     wrld->size[i] = size[i];
234 :     }
235 :    
236 :     printf("AllocInitially: %d", size[0]);
237 :     for (int i = 1; i < nDims; i++) printf(" x %d", size[i]);
238 :     printf("\n");
239 :    
240 :     // allocate the strand state pointers
241 :     wrld->numStrands = numStrands;
242 :     sem_init (&wrld->numActive, 0, numStrands);
243 :     wrld->inState = (void **) malloc (numStrands * sizeof(void *));
244 :     wrld->outState = (void **) malloc (numStrands * sizeof(void *));
245 :     wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
246 :     if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
247 :     fprintf (stderr, "unable to allocate strand states\n");
248 :     exit (1);
249 :     }
250 :    
251 :     // initialize strand state pointers etc.
252 :     for (size_t i = 0; i < numStrands; i++) {
253 :     wrld->inState[i] = Diderot_AllocStrand (strand);
254 :     wrld->outState[i] = Diderot_AllocStrand (strand);
255 :     wrld->status[i] = DIDEROT_ACTIVE;
256 :     }
257 :    
258 :     wrld->numThreads = 0;
259 :     pthread_mutex_init(&wrld->lock, NULL);
260 :     pthread_cond_init(&wrld->workers, NULL);
261 :     pthread_cond_init(&wrld->main, NULL);
262 :     wrld->nWorkers = 0;
263 :     sem_init(&wrld->stabilizing, 0, 0);
264 :     wrld->nextStrand = 0;
265 :    
266 :     return wrld;
267 :    
268 :     }
269 :    
270 :     // get strand state pointers
271 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
272 :     {
273 :     assert (i < wrld->numStrands);
274 :     return wrld->inState[i];
275 :     }
276 :    
277 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
278 :     {
279 :     assert (i < wrld->numStrands);
280 :     return wrld->outState[i];
281 :     }
282 :    
283 :     bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
284 :     {
285 :     assert (i < wrld->numStrands);
286 :     return !wrld->status[i];
287 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0