SCM Repository
View of /branches/pure-cfg/src/lib/parallel-target/main.c
Parent Directory
|
Revision Log
Revision 1198 -
(download)
(as text)
(annotate)
Thu May 12 04:18:38 2011 UTC (11 years, 1 month ago) by jhr
File size: 7910 byte(s)
Thu May 12 04:18:38 2011 UTC (11 years, 1 month ago) by jhr
File size: 7910 byte(s)
Add parallel runtime support
/*! \file main.c * * \author John Reppy */ /* * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu) * All rights reserved. */ #include <string.h> #include <stdio.h> #include <assert.h> #include <Diderot/diderot.h> #include <pthread.h> #include <semaphore.h> // The number of strands a worker will take for processing at one time #define BLOCK_SIZE 32 struct struct_world { bool isArray; // is the initialization an array or collection? uint32_t nDims; // depth of iteration nesting int32_t *base; // nDims array of base indices uint32_t *size; // nDims array of iteration sizes uint32_t numStrands; // number of strands in the world sem_t numActive; // number of active strands in the world void **inState; void **outState; uint8_t *status; // array of strand status flags uint32_t numThreads; // number of worker threads pthread_mutex_t lock; // big lock to protect wrld from multiple accesses pthread_cond_t workers; // workers wait on this when they have no work uint8_t nWorkers; // number of workers waiting for work to do pthread_cond_t main; // used to signal main when the workers have finished sem_t stabilizing;// number of strands stabilizing this step uint32_t nextStrand; // the next strand to be processed this iteration }; extern float getOutf (void *self); /* Function which processes active strands. */ void *worker_func (void *arg) { Diderot_World_t *wrld = (Diderot_World_t *) arg; pthread_detach(pthread_self()); while (true) { pthread_mutex_lock(&wrld->lock); // If there is no more work to do this iteration, we wait while(wrld->nextStrand == wrld->numStrands) { wrld->nWorkers++; if (wrld->nWorkers == wrld->numThreads) { pthread_cond_signal(&wrld->main); } pthread_cond_wait(&wrld->workers, &wrld->lock); int nActive = 0; sem_getvalue(&wrld->numActive, &nActive); if (nActive == 0) { pthread_mutex_unlock(&wrld->lock); wrld->numThreads--; if (wrld->nWorkers == wrld->numThreads) { pthread_cond_signal(&wrld->main); } pthread_exit(0); } } // Get BLOCK_SIZE active strands to process uint32_t to_process = wrld->nextStrand; uint32_t end = to_process; uint32_t num = 0; while (end != wrld->numStrands && num < BLOCK_SIZE) { if(! wrld->status[end]) num++; end++; } wrld->nextStrand = end; pthread_mutex_unlock(&wrld->lock); // Update the strands in this block for (int i = to_process; i < end; i++) { if (! wrld->status[i]) { StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); switch (sts) { case DIDEROT_STABILIZE: wrld->status[i] = DIDEROT_STABILIZE; sem_post(&wrld->stabilizing); sem_wait(&wrld->numActive); break; case DIDEROT_DIE: wrld->status[i] = DIDEROT_DIE; sem_wait(&wrld->numActive); break; default: break; } } } } } int main (int argc, const char **argv) { printf ("initializing globals ...\n"); Diderot_InitGlobals (); printf ("initializing strands ...\n"); Diderot_World_t *wrld = Diderot_Initially (); for (int i = 0; i < wrld->numStrands; i++) { // hack to make the invariant part of the state the same in both copies memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); } CPUInfo_t cpuInfo; if (! GetNumCPUs (&cpuInfo)) { fprintf(stderr, "unable to get number of processors\n"); exit (1); } // Start worker threads int nWorkers = cpuInfo.numHWCores; pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t)); wrld->numThreads = nWorkers; for (int i = 0; i < nWorkers; i++) { pthread_create (&workers[i], NULL, worker_func, wrld); } int nSteps = 0; int nActive = 0; sem_getvalue(&wrld->numActive, &nActive); while(nActive > 0) { nSteps++; pthread_mutex_lock(&wrld->lock); // Wait until all workers have finished this iteration while(wrld->nWorkers != wrld->numThreads) { pthread_cond_wait(&wrld->main, &wrld->lock); } wrld->nWorkers = 0; // Handle stabilizing strands int numStabilizing = 0; sem_getvalue(&wrld->stabilizing, &numStabilizing); for (int i = 0; i < wrld->numStrands && numStabilizing > 0; i++) { if (wrld->status[i] == DIDEROT_STABILIZE) { sem_wait(&wrld->stabilizing); memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb); wrld->status[i] = DIDEROT_STABLE; numStabilizing--; } } sem_getvalue(&wrld->numActive, &nActive); // Do any global update stuff that needs doing void **tmp = wrld->inState; wrld->inState = wrld->outState; wrld->outState = tmp; wrld->nextStrand = 0; // Wake up all the workers to process the next step pthread_cond_broadcast(&wrld->workers); pthread_mutex_unlock(&wrld->lock); } printf("done: %d steps\n", nSteps); // here we have the final state of all of the strands in the "in" buffer FILE *outS = fopen("mip.txt", "w"); if (outS == NULL) { fprintf (stderr, "Cannot open output file\n"); exit (8); } for (int i = 0; i < wrld->numStrands; i++) { if (wrld->status[i] == DIDEROT_STABLE) Diderot_Strands[0]->print (outS, wrld->inState[i]); } fclose (outS); return 0; } // this should be the part of the scheduler void *Diderot_AllocStrand (Strand_t *strand) { return malloc(strand->stateSzb); } // block allocation of an initial collection of strands Diderot_World_t *Diderot_AllocInitially ( Strand_t *strand, // the type of strands being allocated bool isArray, // is the initialization an array or collection? uint32_t nDims, // depth of iteration nesting int32_t *base, // nDims array of base indices uint32_t *size) // nDims array of iteration sizes { Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t)); if (wrld == 0) { fprintf (stderr, "unable to allocate world\n"); exit (1); } wrld->isArray = isArray; wrld->nDims = nDims; wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t)); wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t)); size_t numStrands = 1; for (int i = 0; i < wrld->nDims; i++) { numStrands *= size[i]; wrld->base[i] = base[i]; wrld->size[i] = size[i]; } printf("AllocInitially: %d", size[0]); for (int i = 1; i < nDims; i++) printf(" x %d", size[i]); printf("\n"); // allocate the strand state pointers wrld->numStrands = numStrands; sem_init (&wrld->numActive, 0, numStrands); wrld->inState = (void **) malloc (numStrands * sizeof(void *)); wrld->outState = (void **) malloc (numStrands * sizeof(void *)); wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t)); if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) { fprintf (stderr, "unable to allocate strand states\n"); exit (1); } // initialize strand state pointers etc. for (size_t i = 0; i < numStrands; i++) { wrld->inState[i] = Diderot_AllocStrand (strand); wrld->outState[i] = Diderot_AllocStrand (strand); wrld->status[i] = DIDEROT_ACTIVE; } wrld->numThreads = 0; pthread_mutex_init(&wrld->lock, NULL); pthread_cond_init(&wrld->workers, NULL); pthread_cond_init(&wrld->main, NULL); wrld->nWorkers = 0; sem_init(&wrld->stabilizing, 0, 0); wrld->nextStrand = 0; return wrld; } // get strand state pointers void *Diderot_InState (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return wrld->inState[i]; } void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return wrld->outState[i]; } bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return !wrld->status[i]; }
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |