Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

Annotation of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1198 - (view) (download) (as text)

1 : jhr 1198 /*! \file main.c
2 :     *
3 :     * \author John Reppy
4 :     */
5 :    
6 :     /*
7 :     * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
8 :     * All rights reserved.
9 :     */
10 :    
11 :     #include <string.h>
12 :     #include <stdio.h>
13 :     #include <assert.h>
14 :     #include <Diderot/diderot.h>
15 :     #include <pthread.h>
16 :     #include <semaphore.h>
17 :    
18 :     // The number of strands a worker will take for processing at one time
19 :     #define BLOCK_SIZE 32
20 :    
21 :     struct struct_world {
22 :     bool isArray; // is the initialization an array or collection?
23 :     uint32_t nDims; // depth of iteration nesting
24 :     int32_t *base; // nDims array of base indices
25 :     uint32_t *size; // nDims array of iteration sizes
26 :     uint32_t numStrands; // number of strands in the world
27 :     sem_t numActive; // number of active strands in the world
28 :     void **inState;
29 :     void **outState;
30 :     uint8_t *status; // array of strand status flags
31 :     uint32_t numThreads; // number of worker threads
32 :     pthread_mutex_t lock; // big lock to protect wrld from multiple accesses
33 :     pthread_cond_t workers; // workers wait on this when they have no work
34 :     uint8_t nWorkers; // number of workers waiting for work to do
35 :     pthread_cond_t main; // used to signal main when the workers have finished
36 :     sem_t stabilizing;// number of strands stabilizing this step
37 :     uint32_t nextStrand; // the next strand to be processed this iteration
38 :     };
39 :    
40 :     extern float getOutf (void *self);
41 :    
42 :     /* Function which processes active strands. */
43 :     void *worker_func (void *arg)
44 :     {
45 :     Diderot_World_t *wrld = (Diderot_World_t *) arg;
46 :    
47 :     pthread_detach(pthread_self());
48 :    
49 :     while (true) {
50 :     pthread_mutex_lock(&wrld->lock);
51 :    
52 :     // If there is no more work to do this iteration, we wait
53 :     while(wrld->nextStrand == wrld->numStrands)
54 :     {
55 :     wrld->nWorkers++;
56 :     if (wrld->nWorkers == wrld->numThreads) {
57 :     pthread_cond_signal(&wrld->main);
58 :     }
59 :     pthread_cond_wait(&wrld->workers, &wrld->lock);
60 :     int nActive = 0;
61 :     sem_getvalue(&wrld->numActive, &nActive);
62 :     if (nActive == 0) {
63 :     pthread_mutex_unlock(&wrld->lock);
64 :     wrld->numThreads--;
65 :     if (wrld->nWorkers == wrld->numThreads) {
66 :     pthread_cond_signal(&wrld->main);
67 :     }
68 :     pthread_exit(0);
69 :     }
70 :     }
71 :    
72 :     // Get BLOCK_SIZE active strands to process
73 :     uint32_t to_process = wrld->nextStrand;
74 :     uint32_t end = to_process;
75 :     uint32_t num = 0;
76 :     while (end != wrld->numStrands && num < BLOCK_SIZE) {
77 :     if(! wrld->status[end]) num++;
78 :     end++;
79 :     }
80 :     wrld->nextStrand = end;
81 :    
82 :     pthread_mutex_unlock(&wrld->lock);
83 :    
84 :     // Update the strands in this block
85 :     for (int i = to_process; i < end; i++) {
86 :     if (! wrld->status[i]) {
87 :     StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
88 :     switch (sts) {
89 :     case DIDEROT_STABILIZE:
90 :     wrld->status[i] = DIDEROT_STABILIZE;
91 :     sem_post(&wrld->stabilizing);
92 :     sem_wait(&wrld->numActive);
93 :     break;
94 :     case DIDEROT_DIE:
95 :     wrld->status[i] = DIDEROT_DIE;
96 :     sem_wait(&wrld->numActive);
97 :     break;
98 :     default:
99 :     break;
100 :     }
101 :     }
102 :     }
103 :     }
104 :     }
105 :    
106 :    
107 :     int main (int argc, const char **argv)
108 :     {
109 :     printf ("initializing globals ...\n");
110 :     Diderot_InitGlobals ();
111 :    
112 :     printf ("initializing strands ...\n");
113 :     Diderot_World_t *wrld = Diderot_Initially ();
114 :    
115 :     for (int i = 0; i < wrld->numStrands; i++) {
116 :     // hack to make the invariant part of the state the same in both copies
117 :     memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
118 :     }
119 :    
120 :     CPUInfo_t cpuInfo;
121 :     if (! GetNumCPUs (&cpuInfo)) {
122 :     fprintf(stderr, "unable to get number of processors\n");
123 :     exit (1);
124 :     }
125 :     // Start worker threads
126 :     int nWorkers = cpuInfo.numHWCores;
127 :     pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t));
128 :    
129 :     wrld->numThreads = nWorkers;
130 :     for (int i = 0; i < nWorkers; i++) {
131 :     pthread_create (&workers[i], NULL, worker_func, wrld);
132 :     }
133 :    
134 :     int nSteps = 0;
135 :     int nActive = 0;
136 :     sem_getvalue(&wrld->numActive, &nActive);
137 :     while(nActive > 0) {
138 :     nSteps++;
139 :    
140 :     pthread_mutex_lock(&wrld->lock);
141 :    
142 :     // Wait until all workers have finished this iteration
143 :     while(wrld->nWorkers != wrld->numThreads) {
144 :     pthread_cond_wait(&wrld->main, &wrld->lock);
145 :     }
146 :     wrld->nWorkers = 0;
147 :    
148 :     // Handle stabilizing strands
149 :     int numStabilizing = 0;
150 :     sem_getvalue(&wrld->stabilizing, &numStabilizing);
151 :     for (int i = 0; i < wrld->numStrands && numStabilizing > 0; i++) {
152 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
153 :     sem_wait(&wrld->stabilizing);
154 :     memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
155 :     wrld->status[i] = DIDEROT_STABLE;
156 :     numStabilizing--;
157 :     }
158 :     }
159 :    
160 :     sem_getvalue(&wrld->numActive, &nActive);
161 :    
162 :     // Do any global update stuff that needs doing
163 :     void **tmp = wrld->inState;
164 :     wrld->inState = wrld->outState;
165 :     wrld->outState = tmp;
166 :    
167 :     wrld->nextStrand = 0;
168 :    
169 :     // Wake up all the workers to process the next step
170 :     pthread_cond_broadcast(&wrld->workers);
171 :     pthread_mutex_unlock(&wrld->lock);
172 :     }
173 :    
174 :     printf("done: %d steps\n", nSteps);
175 :     // here we have the final state of all of the strands in the "in" buffer
176 :     FILE *outS = fopen("mip.txt", "w");
177 :     if (outS == NULL) {
178 :     fprintf (stderr, "Cannot open output file\n");
179 :     exit (8);
180 :     }
181 :    
182 :     for (int i = 0; i < wrld->numStrands; i++) {
183 :     if (wrld->status[i] == DIDEROT_STABLE)
184 :     Diderot_Strands[0]->print (outS, wrld->inState[i]);
185 :     }
186 :     fclose (outS);
187 :    
188 :     return 0;
189 :    
190 :     }
191 :    
192 :    
193 :     // this should be the part of the scheduler
194 :     void *Diderot_AllocStrand (Strand_t *strand)
195 :     {
196 :     return malloc(strand->stateSzb);
197 :     }
198 :    
199 :     // block allocation of an initial collection of strands
200 :     Diderot_World_t *Diderot_AllocInitially (
201 :     Strand_t *strand, // the type of strands being allocated
202 :     bool isArray, // is the initialization an array or collection?
203 :     uint32_t nDims, // depth of iteration nesting
204 :     int32_t *base, // nDims array of base indices
205 :     uint32_t *size) // nDims array of iteration sizes
206 :     {
207 :     Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
208 :     if (wrld == 0) {
209 :     fprintf (stderr, "unable to allocate world\n");
210 :     exit (1);
211 :     }
212 :    
213 :     wrld->isArray = isArray;
214 :     wrld->nDims = nDims;
215 :     wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
216 :     wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t));
217 :     size_t numStrands = 1;
218 :     for (int i = 0; i < wrld->nDims; i++) {
219 :     numStrands *= size[i];
220 :     wrld->base[i] = base[i];
221 :     wrld->size[i] = size[i];
222 :     }
223 :    
224 :     printf("AllocInitially: %d", size[0]);
225 :     for (int i = 1; i < nDims; i++) printf(" x %d", size[i]);
226 :     printf("\n");
227 :    
228 :     // allocate the strand state pointers
229 :     wrld->numStrands = numStrands;
230 :     sem_init (&wrld->numActive, 0, numStrands);
231 :     wrld->inState = (void **) malloc (numStrands * sizeof(void *));
232 :     wrld->outState = (void **) malloc (numStrands * sizeof(void *));
233 :     wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
234 :     if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
235 :     fprintf (stderr, "unable to allocate strand states\n");
236 :     exit (1);
237 :     }
238 :    
239 :     // initialize strand state pointers etc.
240 :     for (size_t i = 0; i < numStrands; i++) {
241 :     wrld->inState[i] = Diderot_AllocStrand (strand);
242 :     wrld->outState[i] = Diderot_AllocStrand (strand);
243 :     wrld->status[i] = DIDEROT_ACTIVE;
244 :     }
245 :    
246 :     wrld->numThreads = 0;
247 :     pthread_mutex_init(&wrld->lock, NULL);
248 :     pthread_cond_init(&wrld->workers, NULL);
249 :     pthread_cond_init(&wrld->main, NULL);
250 :     wrld->nWorkers = 0;
251 :     sem_init(&wrld->stabilizing, 0, 0);
252 :     wrld->nextStrand = 0;
253 :    
254 :     return wrld;
255 :    
256 :     }
257 :    
258 :     // get strand state pointers
259 :     void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
260 :     {
261 :     assert (i < wrld->numStrands);
262 :     return wrld->inState[i];
263 :     }
264 :    
265 :     void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
266 :     {
267 :     assert (i < wrld->numStrands);
268 :     return wrld->outState[i];
269 :     }
270 :    
271 :     bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
272 :     {
273 :     assert (i < wrld->numStrands);
274 :     return !wrld->status[i];
275 :     }

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0