SCM Repository
Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
Parent Directory
|
Revision Log
Revision 1198 - (view) (download) (as text)
1 : | jhr | 1198 | /*! \file main.c |
2 : | * | ||
3 : | * \author John Reppy | ||
4 : | */ | ||
5 : | |||
6 : | /* | ||
7 : | * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu) | ||
8 : | * All rights reserved. | ||
9 : | */ | ||
10 : | |||
11 : | #include <string.h> | ||
12 : | #include <stdio.h> | ||
13 : | #include <assert.h> | ||
14 : | #include <Diderot/diderot.h> | ||
15 : | #include <pthread.h> | ||
16 : | #include <semaphore.h> | ||
17 : | |||
18 : | // The number of strands a worker will take for processing at one time | ||
19 : | #define BLOCK_SIZE 32 | ||
20 : | |||
21 : | struct struct_world { | ||
22 : | bool isArray; // is the initialization an array or collection? | ||
23 : | uint32_t nDims; // depth of iteration nesting | ||
24 : | int32_t *base; // nDims array of base indices | ||
25 : | uint32_t *size; // nDims array of iteration sizes | ||
26 : | uint32_t numStrands; // number of strands in the world | ||
27 : | sem_t numActive; // number of active strands in the world | ||
28 : | void **inState; | ||
29 : | void **outState; | ||
30 : | uint8_t *status; // array of strand status flags | ||
31 : | uint32_t numThreads; // number of worker threads | ||
32 : | pthread_mutex_t lock; // big lock to protect wrld from multiple accesses | ||
33 : | pthread_cond_t workers; // workers wait on this when they have no work | ||
34 : | uint8_t nWorkers; // number of workers waiting for work to do | ||
35 : | pthread_cond_t main; // used to signal main when the workers have finished | ||
36 : | sem_t stabilizing;// number of strands stabilizing this step | ||
37 : | uint32_t nextStrand; // the next strand to be processed this iteration | ||
38 : | }; | ||
39 : | |||
40 : | extern float getOutf (void *self); | ||
41 : | |||
42 : | /* Function which processes active strands. */ | ||
43 : | void *worker_func (void *arg) | ||
44 : | { | ||
45 : | Diderot_World_t *wrld = (Diderot_World_t *) arg; | ||
46 : | |||
47 : | pthread_detach(pthread_self()); | ||
48 : | |||
49 : | while (true) { | ||
50 : | pthread_mutex_lock(&wrld->lock); | ||
51 : | |||
52 : | // If there is no more work to do this iteration, we wait | ||
53 : | while(wrld->nextStrand == wrld->numStrands) | ||
54 : | { | ||
55 : | wrld->nWorkers++; | ||
56 : | if (wrld->nWorkers == wrld->numThreads) { | ||
57 : | pthread_cond_signal(&wrld->main); | ||
58 : | } | ||
59 : | pthread_cond_wait(&wrld->workers, &wrld->lock); | ||
60 : | int nActive = 0; | ||
61 : | sem_getvalue(&wrld->numActive, &nActive); | ||
62 : | if (nActive == 0) { | ||
63 : | pthread_mutex_unlock(&wrld->lock); | ||
64 : | wrld->numThreads--; | ||
65 : | if (wrld->nWorkers == wrld->numThreads) { | ||
66 : | pthread_cond_signal(&wrld->main); | ||
67 : | } | ||
68 : | pthread_exit(0); | ||
69 : | } | ||
70 : | } | ||
71 : | |||
72 : | // Get BLOCK_SIZE active strands to process | ||
73 : | uint32_t to_process = wrld->nextStrand; | ||
74 : | uint32_t end = to_process; | ||
75 : | uint32_t num = 0; | ||
76 : | while (end != wrld->numStrands && num < BLOCK_SIZE) { | ||
77 : | if(! wrld->status[end]) num++; | ||
78 : | end++; | ||
79 : | } | ||
80 : | wrld->nextStrand = end; | ||
81 : | |||
82 : | pthread_mutex_unlock(&wrld->lock); | ||
83 : | |||
84 : | // Update the strands in this block | ||
85 : | for (int i = to_process; i < end; i++) { | ||
86 : | if (! wrld->status[i]) { | ||
87 : | StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); | ||
88 : | switch (sts) { | ||
89 : | case DIDEROT_STABILIZE: | ||
90 : | wrld->status[i] = DIDEROT_STABILIZE; | ||
91 : | sem_post(&wrld->stabilizing); | ||
92 : | sem_wait(&wrld->numActive); | ||
93 : | break; | ||
94 : | case DIDEROT_DIE: | ||
95 : | wrld->status[i] = DIDEROT_DIE; | ||
96 : | sem_wait(&wrld->numActive); | ||
97 : | break; | ||
98 : | default: | ||
99 : | break; | ||
100 : | } | ||
101 : | } | ||
102 : | } | ||
103 : | } | ||
104 : | } | ||
105 : | |||
106 : | |||
107 : | int main (int argc, const char **argv) | ||
108 : | { | ||
109 : | printf ("initializing globals ...\n"); | ||
110 : | Diderot_InitGlobals (); | ||
111 : | |||
112 : | printf ("initializing strands ...\n"); | ||
113 : | Diderot_World_t *wrld = Diderot_Initially (); | ||
114 : | |||
115 : | for (int i = 0; i < wrld->numStrands; i++) { | ||
116 : | // hack to make the invariant part of the state the same in both copies | ||
117 : | memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); | ||
118 : | } | ||
119 : | |||
120 : | CPUInfo_t cpuInfo; | ||
121 : | if (! GetNumCPUs (&cpuInfo)) { | ||
122 : | fprintf(stderr, "unable to get number of processors\n"); | ||
123 : | exit (1); | ||
124 : | } | ||
125 : | // Start worker threads | ||
126 : | int nWorkers = cpuInfo.numHWCores; | ||
127 : | pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t)); | ||
128 : | |||
129 : | wrld->numThreads = nWorkers; | ||
130 : | for (int i = 0; i < nWorkers; i++) { | ||
131 : | pthread_create (&workers[i], NULL, worker_func, wrld); | ||
132 : | } | ||
133 : | |||
134 : | int nSteps = 0; | ||
135 : | int nActive = 0; | ||
136 : | sem_getvalue(&wrld->numActive, &nActive); | ||
137 : | while(nActive > 0) { | ||
138 : | nSteps++; | ||
139 : | |||
140 : | pthread_mutex_lock(&wrld->lock); | ||
141 : | |||
142 : | // Wait until all workers have finished this iteration | ||
143 : | while(wrld->nWorkers != wrld->numThreads) { | ||
144 : | pthread_cond_wait(&wrld->main, &wrld->lock); | ||
145 : | } | ||
146 : | wrld->nWorkers = 0; | ||
147 : | |||
148 : | // Handle stabilizing strands | ||
149 : | int numStabilizing = 0; | ||
150 : | sem_getvalue(&wrld->stabilizing, &numStabilizing); | ||
151 : | for (int i = 0; i < wrld->numStrands && numStabilizing > 0; i++) { | ||
152 : | if (wrld->status[i] == DIDEROT_STABILIZE) { | ||
153 : | sem_wait(&wrld->stabilizing); | ||
154 : | memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb); | ||
155 : | wrld->status[i] = DIDEROT_STABLE; | ||
156 : | numStabilizing--; | ||
157 : | } | ||
158 : | } | ||
159 : | |||
160 : | sem_getvalue(&wrld->numActive, &nActive); | ||
161 : | |||
162 : | // Do any global update stuff that needs doing | ||
163 : | void **tmp = wrld->inState; | ||
164 : | wrld->inState = wrld->outState; | ||
165 : | wrld->outState = tmp; | ||
166 : | |||
167 : | wrld->nextStrand = 0; | ||
168 : | |||
169 : | // Wake up all the workers to process the next step | ||
170 : | pthread_cond_broadcast(&wrld->workers); | ||
171 : | pthread_mutex_unlock(&wrld->lock); | ||
172 : | } | ||
173 : | |||
174 : | printf("done: %d steps\n", nSteps); | ||
175 : | // here we have the final state of all of the strands in the "in" buffer | ||
176 : | FILE *outS = fopen("mip.txt", "w"); | ||
177 : | if (outS == NULL) { | ||
178 : | fprintf (stderr, "Cannot open output file\n"); | ||
179 : | exit (8); | ||
180 : | } | ||
181 : | |||
182 : | for (int i = 0; i < wrld->numStrands; i++) { | ||
183 : | if (wrld->status[i] == DIDEROT_STABLE) | ||
184 : | Diderot_Strands[0]->print (outS, wrld->inState[i]); | ||
185 : | } | ||
186 : | fclose (outS); | ||
187 : | |||
188 : | return 0; | ||
189 : | |||
190 : | } | ||
191 : | |||
192 : | |||
193 : | // this should be the part of the scheduler | ||
194 : | void *Diderot_AllocStrand (Strand_t *strand) | ||
195 : | { | ||
196 : | return malloc(strand->stateSzb); | ||
197 : | } | ||
198 : | |||
199 : | // block allocation of an initial collection of strands | ||
200 : | Diderot_World_t *Diderot_AllocInitially ( | ||
201 : | Strand_t *strand, // the type of strands being allocated | ||
202 : | bool isArray, // is the initialization an array or collection? | ||
203 : | uint32_t nDims, // depth of iteration nesting | ||
204 : | int32_t *base, // nDims array of base indices | ||
205 : | uint32_t *size) // nDims array of iteration sizes | ||
206 : | { | ||
207 : | Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t)); | ||
208 : | if (wrld == 0) { | ||
209 : | fprintf (stderr, "unable to allocate world\n"); | ||
210 : | exit (1); | ||
211 : | } | ||
212 : | |||
213 : | wrld->isArray = isArray; | ||
214 : | wrld->nDims = nDims; | ||
215 : | wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t)); | ||
216 : | wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t)); | ||
217 : | size_t numStrands = 1; | ||
218 : | for (int i = 0; i < wrld->nDims; i++) { | ||
219 : | numStrands *= size[i]; | ||
220 : | wrld->base[i] = base[i]; | ||
221 : | wrld->size[i] = size[i]; | ||
222 : | } | ||
223 : | |||
224 : | printf("AllocInitially: %d", size[0]); | ||
225 : | for (int i = 1; i < nDims; i++) printf(" x %d", size[i]); | ||
226 : | printf("\n"); | ||
227 : | |||
228 : | // allocate the strand state pointers | ||
229 : | wrld->numStrands = numStrands; | ||
230 : | sem_init (&wrld->numActive, 0, numStrands); | ||
231 : | wrld->inState = (void **) malloc (numStrands * sizeof(void *)); | ||
232 : | wrld->outState = (void **) malloc (numStrands * sizeof(void *)); | ||
233 : | wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t)); | ||
234 : | if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) { | ||
235 : | fprintf (stderr, "unable to allocate strand states\n"); | ||
236 : | exit (1); | ||
237 : | } | ||
238 : | |||
239 : | // initialize strand state pointers etc. | ||
240 : | for (size_t i = 0; i < numStrands; i++) { | ||
241 : | wrld->inState[i] = Diderot_AllocStrand (strand); | ||
242 : | wrld->outState[i] = Diderot_AllocStrand (strand); | ||
243 : | wrld->status[i] = DIDEROT_ACTIVE; | ||
244 : | } | ||
245 : | |||
246 : | wrld->numThreads = 0; | ||
247 : | pthread_mutex_init(&wrld->lock, NULL); | ||
248 : | pthread_cond_init(&wrld->workers, NULL); | ||
249 : | pthread_cond_init(&wrld->main, NULL); | ||
250 : | wrld->nWorkers = 0; | ||
251 : | sem_init(&wrld->stabilizing, 0, 0); | ||
252 : | wrld->nextStrand = 0; | ||
253 : | |||
254 : | return wrld; | ||
255 : | |||
256 : | } | ||
257 : | |||
258 : | // get strand state pointers | ||
259 : | void *Diderot_InState (Diderot_World_t *wrld, uint32_t i) | ||
260 : | { | ||
261 : | assert (i < wrld->numStrands); | ||
262 : | return wrld->inState[i]; | ||
263 : | } | ||
264 : | |||
265 : | void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i) | ||
266 : | { | ||
267 : | assert (i < wrld->numStrands); | ||
268 : | return wrld->outState[i]; | ||
269 : | } | ||
270 : | |||
271 : | bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i) | ||
272 : | { | ||
273 : | assert (i < wrld->numStrands); | ||
274 : | return !wrld->status[i]; | ||
275 : | } |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |