Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Diff of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

Diff of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1213, Fri May 13 14:29:16 2011 UTC revision 1214, Sat May 14 04:19:19 2011 UTC
# Line 28  Line 28 
28  #endif  #endif
29    
30  // The number of strands a worker will take for processing at one time  // The number of strands a worker will take for processing at one time
31  #define BLOCK_SIZE 32  #define BLOCK_SIZE 256
32    
33  struct struct_world {  struct struct_world {
34      bool            isArray;    // is the initialization an array or collection?      bool            isArray;    // is the initialization an array or collection?
# Line 36  Line 36 
36      int32_t         *base;      // nDims array of base indices      int32_t         *base;      // nDims array of base indices
37      uint32_t        *size;      // nDims array of iteration sizes      uint32_t        *size;      // nDims array of iteration sizes
38      uint32_t        numStrands; // number of strands in the world      uint32_t        numStrands; // number of strands in the world
     uint32_t        numActive;  // number of active strands in the world  
39      void            **inState;      void            **inState;
40      void            **outState;      void            **outState;
41      uint8_t         *status;    // array of strand status flags      uint8_t         *status;    // array of strand status flags
42      uint32_t        numThreads; // number of worker threads      uint32_t            numWorkers;     // number of worker threads
43        uint32_t            nSteps;         // number of super steps
44      // synchronization state
45        uint32_t            nextStrand __attribute__((aligned(64))); // index of next strand to update
46        uint32_t            numActive __attribute__((aligned(64))); // # active strands
47        uint32_t            numAvail __attribute__((aligned(64))); // # unevaluated strands
48        uint32_t            numIdle __attribute__((aligned(64)));   // # idle workers
49      pthread_mutex_t lock;       // big lock to protect wrld from multiple accesses      pthread_mutex_t lock;       // big lock to protect wrld from multiple accesses
50      pthread_cond_t  workers;    // workers wait on this when they have no work      pthread_cond_t      barrier;        // workers wait on this when they have no work
51      uint8_t         nWorkers;   // number of workers waiting for work to do      pthread_cond_t      mainWait;       // used to signal main when the workers have finished
     pthread_cond_t  main;       // used to signal main when the workers have finished  
     sem_t           stabilizing;// number of strands stabilizing this step  
     uint32_t        nextStrand; // the next strand to be processed this iteration  
52  };  };
53    
54  extern float getOutf (void *self);  typedef struct {
55        int                 id;
56        Diderot_World_t     *wrld;
57    } WorkerArg_t;
58    
59  /* Function which processes active strands. */  /* Function which processes active strands. */
60  void *worker_func (void *arg)  static void *Worker (void *arg)
61  {  {
62      Diderot_World_t *wrld = (Diderot_World_t *) arg;      WorkerArg_t         *myArg = (WorkerArg_t *)arg;
63        Diderot_World_t     *wrld = myArg->wrld;
64      pthread_detach(pthread_self());      bool                existsStabilizing;
65    
66      while (true) {      while (true) {
67          // barrier synchronization at start of super step
68          pthread_mutex_lock(&wrld->lock);          pthread_mutex_lock(&wrld->lock);
69                if (wrld->numIdle+1 < wrld->numWorkers) {
70        // If there is no more work to do this iteration, we wait                  wrld->numIdle++;
71          while(wrld->nextStrand == wrld->numStrands) {                  pthread_cond_wait (&wrld->barrier, &wrld->lock);
72              wrld->nWorkers++;              }
73              if (wrld->nWorkers == wrld->numThreads) {              else {
74                  pthread_cond_signal (&wrld->main);                // all other workers are idle, so we can proceed after some initialization
75              }                  wrld->numIdle = 0;
76              pthread_cond_wait (&wrld->workers, &wrld->lock);                  wrld->numAvail = wrld->numActive;
77              int nActive = 0;                  wrld->nextStrand = 0;
78              sem_getvalue(&wrld->numActive, &nActive);                // swap in and out
79              if (nActive == 0) {                  void **tmp = wrld->inState;
80                  pthread_mutex_unlock (&wrld->lock);                  wrld->inState = wrld->outState;
81                  wrld->numThreads--;                  wrld->outState = tmp;
82                  if (wrld->nWorkers == wrld->numThreads) {                  pthread_cond_broadcast (&wrld->barrier);
                     pthread_cond_signal (&wrld->main);  
83                  }                  }
84            pthread_mutex_unlock (&wrld->lock);
85    
86          // if there are no active strands left, then we're done
87            if (wrld->numActive == 0) {
88                pthread_cond_signal (&wrld->mainWait);
89                  pthread_exit(0);                  pthread_exit(0);
90              }              }
         }  
91    
92          // Get BLOCK_SIZE active strands to process        // iterate until there is no more work to do
93          uint32_t to_process = wrld->nextStrand;          int blkStart, blkSize;
94          uint32_t end = to_process;          existsStabilizing = false;
95          uint32_t num = 0;          do {
96          while (end != wrld->numStrands && num < BLOCK_SIZE) {            // grab some work
97              if(! wrld->status[end]) num++;              pthread_mutex_lock (&wrld->lock);
98              end++;                  if (wrld->numAvail > 0) {
99                        blkStart = wrld->nextStrand;
100                        blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
101                        wrld->numAvail -= blkSize;
102          }          }
         wrld->nextStrand = end;  
   
103          pthread_mutex_unlock(&wrld->lock);          pthread_mutex_unlock(&wrld->lock);
104              // update the strands
105          // Update the strands in this block              for (int i = blkStart;  i < blkStart+blkSize;  i++) {
         for (int i = to_process; i < end; i++) {  
106              if (! wrld->status[i]) {              if (! wrld->status[i]) {
107                  StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);                  StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
108                  switch (sts) {                  switch (sts) {
109                    case DIDEROT_STABILIZE:                    case DIDEROT_STABILIZE:
110                      wrld->status[i] = DIDEROT_STABILIZE;                      wrld->status[i] = DIDEROT_STABILIZE;
111                      sem_post(&wrld->stabilizing);                          existsStabilizing = true;
                     sem_wait(&wrld->numActive);  
112                      break;                      break;
113                    case DIDEROT_DIE:                    case DIDEROT_DIE:
114                      wrld->status[i] = DIDEROT_DIE;                      wrld->status[i] = DIDEROT_DIE;
115                      sem_wait(&wrld->numActive);                          AtomicDec(&wrld->numActive);
116                      break;                      break;
117                    default:                    default:
118                      break;                      break;
119                  }                  }
120              }              }
121          }          }
122            } while (blkSize > 0);
123    
124          // barrier synchronization
125            pthread_mutex_lock (&wrld->lock);
126                if (wrld->numIdle+1 < wrld->numWorkers) {
127                    wrld->numIdle++;
128                    pthread_cond_wait (&wrld->barrier, &wrld->lock);
129                }
130                else {
131                  // all other workers are idle, so we can proceed
132                    wrld->numIdle = 0;
133                    wrld->numAvail = wrld->numActive;
134                    pthread_cond_broadcast (&wrld->barrier);
135                }
136            pthread_mutex_unlock (&wrld->lock);
137    
138    /**** If there is a global computation phase, it goes here ****/
139    
140          // stabilize any threads that need stabilization.  Each worker is responsible for
141          // a contiguous region of the strands
142            if (existsStabilizing) {
143                int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
144                int start = myArg->id * nStrandsPerWorker;
145                int limit = start + nStrandsPerWorker;
146                if (limit > wrld->numStrands) limit = wrld->numStrands;
147                for (int i = start;  i < limit;  i++) {
148                    if (wrld->status[i] == DIDEROT_STABILIZE) {
149                      // copy out to in so that both copies are the stable state
150                        memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
151                        wrld->status[i] = DIDEROT_STABLE;
152                        AtomicDec (&wrld->numActive);
153                    }
154                }
155      }      }
156  }  }
157    
158    }
159    
160    
161  int main (int argc, const char **argv)  int main (int argc, const char **argv)
162  {  {
163      printf ("initializing globals ...\n");      fprintf (stderr, "initializing globals ...\n");
164      Diderot_InitGlobals ();      Diderot_InitGlobals (argc, argv);
165    
166      printf ("initializing strands ...\n");      fprintf (stderr, "initializing strands ...\n");
167      Diderot_World_t *wrld = Diderot_Initially ();      Diderot_World_t *wrld = Diderot_Initially ();
   
168      for (int i = 0;  i < wrld->numStrands;  i++) {      for (int i = 0;  i < wrld->numStrands;  i++) {
169        // hack to make the invariant part of the state the same in both copies        // hack to make the invariant part of the state the same in both copies
170          memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);          memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
# Line 133  Line 175 
175          fprintf(stderr, "unable to get number of processors\n");          fprintf(stderr, "unable to get number of processors\n");
176          exit (1);          exit (1);
177      }      }
178    
179      // Start worker threads      // Start worker threads
180      int nWorkers = cpuInfo.numHWCores;      int nWorkers = cpuInfo.numHWCores;
181      pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t));      WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t));
   
182      printf ("initializing %d workers ...\n", nWorkers);      printf ("initializing %d workers ...\n", nWorkers);
183      wrld->numThreads = nWorkers;      wrld->numWorkers = nWorkers;
184        wrld->numIdle = 0;
185      for (int i = 0; i < nWorkers; i++) {      for (int i = 0; i < nWorkers; i++) {
186          pthread_create (&workers[i], NULL, worker_func, wrld);          pthread_t pid;
187      }          args[i].wrld = wrld;
188            args[i].id = i;
189      int nSteps = 0;          if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
190      int nActive = 0;              fprintf (stderr, "unable to create worker thread\n");
191      sem_getvalue (&wrld->numActive, &nActive);              exit (1);
     while (nActive > 0) {  
         nSteps++;  
   
         pthread_mutex_lock(&wrld->lock);  
   
       // Wait until all workers have finished this iteration  
         while (wrld->nWorkers != wrld->numThreads) {  
             pthread_cond_wait(&wrld->main, &wrld->lock);  
         }  
         wrld->nWorkers = 0;  
   
       // Handle stabilizing strands  
         int numStabilizing = 0;  
         sem_getvalue (&wrld->stabilizing, &numStabilizing);  
         for (int i = 0;  i < wrld->numStrands && numStabilizing > 0;  i++) {  
             if (wrld->status[i] == DIDEROT_STABILIZE) {  
                 sem_wait(&wrld->stabilizing);  
                 memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);  
                 wrld->status[i] = DIDEROT_STABLE;  
                 numStabilizing--;  
192              }              }
193            pthread_detach (pid);
194          }          }
195    
196          sem_getvalue(&wrld->numActive, &nActive);    // wait for the computation to finish
197        pthread_mutex_lock (&wrld->lock);
198          // Do any global update stuff that needs doing          pthread_cond_wait (&wrld->mainWait, &wrld->lock);
         void **tmp = wrld->inState;  
         wrld->inState = wrld->outState;  
         wrld->outState = tmp;  
   
         wrld->nextStrand = 0;  
   
         // Wake up all the workers to process the next step  
         pthread_cond_broadcast(&wrld->workers);  
199          pthread_mutex_unlock(&wrld->lock);          pthread_mutex_unlock(&wrld->lock);
     }  
200    
201  printf("done: %d steps\n", nSteps);  fprintf (stderr, "done: %d steps\n", wrld->nSteps);
202    // here we have the final state of all of the strands in the "in" buffer    // here we have the final state of all of the strands in the "in" buffer
203      FILE *outS = fopen("mip.txt", "w");      FILE *outS = fopen("mip.txt", "w");
204      if (outS == NULL) {      if (outS == NULL) {
# Line 197  Line 212 
212      }      }
213      fclose (outS);      fclose (outS);
214    
215        Diderot_Shutdown (wrld);
216    
217      return 0;      return 0;
218    
219  }  }
# Line 233  Line 250 
250          wrld->size[i] = size[i];          wrld->size[i] = size[i];
251      }      }
252    
253  printf("AllocInitially: %d", size[0]);  fprintf(stderr, "AllocInitially: %d", size[0]);
254  for (int i = 1;  i < nDims;  i++) printf(" x %d", size[i]);  for (int i = 1;  i < nDims;  i++)
255  printf("\n");      fprintf(stderr, " x %d", size[i]);
256    fprintf(stderr, "\n");
257    
258    // allocate the strand state pointers    // allocate the strand state pointers
259      wrld->numStrands = numStrands;      wrld->numStrands = numStrands;
     sem_init (&wrld->numActive, 0, numStrands);  
260      wrld->inState = (void **) malloc (numStrands * sizeof(void *));      wrld->inState = (void **) malloc (numStrands * sizeof(void *));
261      wrld->outState = (void **) malloc (numStrands * sizeof(void *));      wrld->outState = (void **) malloc (numStrands * sizeof(void *));
262      wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));      wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
# Line 247  Line 264 
264          fprintf (stderr, "unable to allocate strand states\n");          fprintf (stderr, "unable to allocate strand states\n");
265          exit (1);          exit (1);
266      }      }
267        wrld->numActive = wrld->numStrands;
268        wrld->nSteps = 0;
269        wrld->numWorkers = 0;
270    
271    // initialize strand state pointers etc.    // initialize strand state pointers etc.
272      for (size_t i = 0;  i < numStrands;  i++) {      for (size_t i = 0;  i < numStrands;  i++) {
# Line 255  Line 275 
275          wrld->status[i] = DIDEROT_ACTIVE;          wrld->status[i] = DIDEROT_ACTIVE;
276      }      }
277    
     wrld->numThreads = 0;  
278      pthread_mutex_init(&wrld->lock, NULL);      pthread_mutex_init(&wrld->lock, NULL);
279      pthread_cond_init(&wrld->workers, NULL);      pthread_cond_init (&wrld->barrier, NULL);
280      pthread_cond_init(&wrld->main, NULL);      pthread_cond_init (&wrld->mainWait, NULL);
     wrld->nWorkers = 0;  
     sem_init(&wrld->stabilizing, 0, 0);  
     wrld->nextStrand = 0;  
281    
282      return wrld;      return wrld;
283    

Legend:
Removed from v.1213  
changed lines
  Added in v.1214

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0