SCM Repository
View of /branches/pure-cfg/src/lib/parallel-target/main.c
Parent Directory
|
Revision Log
Revision 1263 -
(download)
(as text)
(annotate)
Sun May 29 00:09:15 2011 UTC (11 years ago) by jhr
File size: 10095 byte(s)
Sun May 29 00:09:15 2011 UTC (11 years ago) by jhr
File size: 10095 byte(s)
Adding support for command-line arguments
/*! \file main.c * * \author John Reppy */ /* * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu) * All rights reserved. */ #include <string.h> #include <stdio.h> #include <assert.h> #include <Diderot/diderot.h> #include <pthread.h> // #ifdef HAVE_BUILTIN_ATOMIC_OPS // STATIC_INLINE uint32_t AtomicInc (uint32_t *x) // { // return __sync_add_and_fetch(x, 1); // } // STATIC_INLINE uint32_t AtomicDec (uint32_t *x) // { // return __sync_sub_and_fetch(x, 1); // } // #else // # error atomic operations not supported // #endif // The number of strands a worker will take for processing at one time #define BLOCK_SIZE 256 struct struct_world { bool isArray; // is the initialization an array or collection? uint32_t nDims; // depth of iteration nesting int32_t *base; // nDims array of base indices uint32_t *size; // nDims array of iteration sizes uint32_t numStrands; // number of strands in the world void **inState; void **outState; uint8_t *status; // array of strand status flags uint32_t numWorkers; // number of worker threads uint32_t nSteps; // number of super steps // synchronization state uint32_t nextStrand __attribute__((aligned(64))); // index of next strand to update uint32_t numActive __attribute__((aligned(64))); // # active strands uint32_t numAvail __attribute__((aligned(64))); // # unevaluated strands uint32_t numIdle __attribute__((aligned(64))); // # idle workers pthread_mutex_t lock; // big lock to protect wrld from multiple accesses pthread_cond_t barrier; // workers wait on this when they have no work pthread_cond_t mainWait; // used to signal main when the workers have finished }; typedef struct { int id; Diderot_World_t *wrld; } WorkerArg_t; // NOTE: we probably should put this in a file that supports runtime printing static bool VerboseFlg = false; /* Function which processes active strands. */ static void *Worker (void *arg) { WorkerArg_t *myArg = (WorkerArg_t *)arg; Diderot_World_t *wrld = myArg->wrld; bool existsStabilizing; while (true) { // barrier synchronization at start of super step pthread_mutex_lock (&wrld->lock); if (wrld->numIdle+1 < wrld->numWorkers) { wrld->numIdle++; pthread_cond_wait (&wrld->barrier, &wrld->lock); } else { // all other workers are idle, so we can proceed after some initialization wrld->numIdle = 0; wrld->numAvail = wrld->numStrands; // includes inactive strands wrld->nextStrand = 0; // swap in and out void **tmp = wrld->inState; wrld->inState = wrld->outState; wrld->outState = tmp; pthread_cond_broadcast (&wrld->barrier); } pthread_mutex_unlock (&wrld->lock); // if there are no active strands left, then we're done if (wrld->numActive == 0) { pthread_cond_signal (&wrld->mainWait); pthread_exit (0); } // iterate until there is no more work to do int blkStart, blkSize; existsStabilizing = false; int numDead = 0; do { // grab some work pthread_mutex_lock (&wrld->lock); blkStart = wrld->nextStrand; blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail; wrld->numAvail -= blkSize; wrld->nextStrand += blkSize; pthread_mutex_unlock (&wrld->lock); // update the strands for (int i = blkStart; i < blkStart+blkSize; i++) { if (! wrld->status[i]) { StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); switch (sts) { case DIDEROT_STABILIZE: wrld->status[i] = DIDEROT_STABILIZE; existsStabilizing = true; break; case DIDEROT_DIE: wrld->status[i] = DIDEROT_DIE; numDead++; break; default: break; } } else { assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE)); } } } while (blkSize > 0); // barrier synchronization pthread_mutex_lock (&wrld->lock); wrld->numActive -= numDead; if (wrld->numIdle+1 < wrld->numWorkers) { wrld->numIdle++; pthread_cond_wait (&wrld->barrier, &wrld->lock); } else { // all other workers are idle, so we can proceed wrld->numIdle = 0; pthread_cond_broadcast (&wrld->barrier); wrld->nSteps++; } pthread_mutex_unlock (&wrld->lock); /**** If there is a global computation phase, it goes here ****/ // stabilize any threads that need stabilization. Each worker is responsible for // a contiguous region of the strands if (existsStabilizing) { int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers; int start = myArg->id * nStrandsPerWorker; int limit = start + nStrandsPerWorker; if (limit > wrld->numStrands) limit = wrld->numStrands; int numStabilized = 0; for (int i = start; i < limit; i++) { if (wrld->status[i] == DIDEROT_STABILIZE) { // copy out to in so that both copies are the stable state memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb); wrld->status[i] = DIDEROT_STABLE; numStabilized++; } } // adjust the numActive count #if defined(HAVE_BUILTIN_ATOMIC_OPS) __sync_fetch_and_sub(&wrld->numActive, numStabilized); #else pthread_mutex_lock (&wrld->lock); wrld->numActive -= numStabilized; pthread_mutex_unlock (&wrld->lock); #endif } } // end while(true) } int main (int argc, const char **argv) { CPUInfo_t cpuInfo; if (! GetNumCPUs (&cpuInfo)) { fprintf(stderr, "unable to get number of processors\n"); exit (1); } Diderot_int_t np = cpuInfo.numHWCores; Diderot_Options_t *opts = Diderot_OptNew (); Diderot_OptAddFlag (opts, "verbose", "enable runtime-system messages", &VerboseFlg); Diderot_OptAddInt (opts, "np", "specify number of threads", &np, true); Diderot_RegisterGlobalOpts (opts); Diderot_OptProcess (opts, argc, argv); Diderot_OptFree (opts); if (VerboseFlg) fprintf (stderr, "initializing globals ...\n"); Diderot_InitGlobals (argc, argv); if (VerboseFlg) fprintf (stderr, "initializing strands ...\n"); Diderot_World_t *wrld = Diderot_Initially (); for (int i = 0; i < wrld->numStrands; i++) { // hack to make the invariant part of the state the same in both copies memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); } // Start worker threads int nWorkers = np; WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t)); if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers); wrld->numWorkers = nWorkers; wrld->numIdle = 0; for (int i = 0; i < nWorkers; i++) { pthread_t pid; args[i].wrld = wrld; args[i].id = i; if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) { fprintf (stderr, "unable to create worker thread\n"); exit (1); } pthread_detach (pid); } // wait for the computation to finish pthread_mutex_lock (&wrld->lock); pthread_cond_wait (&wrld->mainWait, &wrld->lock); pthread_mutex_unlock (&wrld->lock); if (VerboseFlg) fprintf (stderr, "done: %d steps\n", wrld->nSteps); // here we have the final state of all of the strands in the "in" buffer FILE *outS = fopen("mip.txt", "w"); if (outS == NULL) { fprintf (stderr, "Cannot open output file\n"); exit (8); } for (int i = 0; i < wrld->numStrands; i++) { if (wrld->status[i] == DIDEROT_STABLE) Diderot_Strands[0]->print (outS, wrld->inState[i]); } fclose (outS); Diderot_Shutdown (wrld); return 0; } // this should be the part of the scheduler void *Diderot_AllocStrand (Strand_t *strand) { return malloc(strand->stateSzb); } // block allocation of an initial collection of strands Diderot_World_t *Diderot_AllocInitially ( Strand_t *strand, // the type of strands being allocated bool isArray, // is the initialization an array or collection? uint32_t nDims, // depth of iteration nesting int32_t *base, // nDims array of base indices uint32_t *size) // nDims array of iteration sizes { Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t)); if (wrld == 0) { fprintf (stderr, "unable to allocate world\n"); exit (1); } wrld->isArray = isArray; wrld->nDims = nDims; wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t)); wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t)); size_t numStrands = 1; for (int i = 0; i < wrld->nDims; i++) { numStrands *= size[i]; wrld->base[i] = base[i]; wrld->size[i] = size[i]; } if (VerboseFlg) { printf("AllocInitially: %d", size[0]); for (int i = 1; i < nDims; i++) printf(" x %d", size[i]); printf("\n"); } // allocate the strand state pointers wrld->numStrands = numStrands; wrld->inState = (void **) malloc (numStrands * sizeof(void *)); wrld->outState = (void **) malloc (numStrands * sizeof(void *)); wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t)); if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) { fprintf (stderr, "unable to allocate strand states\n"); exit (1); } wrld->numActive = wrld->numStrands; wrld->nSteps = 0; wrld->numWorkers = 0; // initialize strand state pointers etc. for (size_t i = 0; i < numStrands; i++) { wrld->inState[i] = Diderot_AllocStrand (strand); wrld->outState[i] = Diderot_AllocStrand (strand); wrld->status[i] = DIDEROT_ACTIVE; } pthread_mutex_init (&wrld->lock, NULL); pthread_cond_init (&wrld->barrier, NULL); pthread_cond_init (&wrld->mainWait, NULL); return wrld; } // get strand state pointers void *Diderot_InState (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return wrld->inState[i]; } void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return wrld->outState[i]; } bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return !wrld->status[i]; }
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |