SCM Repository
View of /trunk/src/lib/parallel-target/main.c
Parent Directory
|
Revision Log
Revision 1825 -
(download)
(as text)
(annotate)
Thu Apr 12 13:57:45 2012 UTC (10 years, 2 months ago) by jhr
File size: 10650 byte(s)
Thu Apr 12 13:57:45 2012 UTC (10 years, 2 months ago) by jhr
File size: 10650 byte(s)
Use NEWVEC macro to allocate worker argument array
/*! \file main.c * * \author John Reppy */ /* * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu) * All rights reserved. */ #include <Diderot/diderot.h> #include <pthread.h> #include <teem/nrrd.h> // #ifdef HAVE_BUILTIN_ATOMIC_OPS // STATIC_INLINE uint32_t AtomicInc (uint32_t *x) // { // return __sync_add_and_fetch(x, 1); // } // STATIC_INLINE uint32_t AtomicDec (uint32_t *x) // { // return __sync_sub_and_fetch(x, 1); // } // #else // # error atomic operations not supported // #endif // The number of strands a worker will take for processing at one time #define BLOCK_SIZE 4096 // cache-line alignment (Xeon has 64-byte lines) #define CACHE_ALIGN __attribute__((aligned(64))) struct struct_world { STRUCT_WORLD_PREFIX void **inState; void **outState; uint32_t numWorkers; // number of worker threads uint32_t nSteps; // number of super steps // synchronization state uint32_t nextStrand CACHE_ALIGN; // index of next strand to update uint32_t numActive CACHE_ALIGN; // # active strands uint32_t numAvail CACHE_ALIGN; // # unevaluated strands uint32_t numIdle CACHE_ALIGN; // # idle workers pthread_mutex_t lock; // big lock to protect wrld from multiple accesses pthread_cond_t barrier; // workers wait on this when they have no work pthread_cond_t mainWait; // used to signal main when the workers have finished }; typedef struct { int id; Diderot_World_t *wrld; } WorkerArg_t CACHE_ALIGN; static void *Worker (void *arg); int main (int argc, const char **argv) { CPUInfo_t cpuInfo; if (! GetNumCPUs (&cpuInfo)) { fprintf(stderr, "unable to get number of processors\n"); exit (1); } Diderot_int_t np = cpuInfo.numHWCores; Diderot_Options_t *opts = Diderot_OptNew (); Diderot_OptAddInt (opts, "np", "specify number of worker threads", &np, true); Diderot_RegisterGlobalOpts (opts); Diderot_OptProcess (opts, argc, argv); Diderot_OptFree (opts); if (VerboseFlg) fprintf (stderr, "initializing globals ...\n"); Diderot_InitGlobals (); if (VerboseFlg) fprintf (stderr, "initializing strands ...\n"); Diderot_World_t *wrld = Diderot_Initially (); for (int i = 0; i < wrld->numStrands; i++) { // hack to make the invariant part of the state the same in both copies memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); } // Start worker threads int nWorkers = np; WorkerArg_t *args = NEWVEC(WorkerArg_t, nWorkers); if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers); double t0 = airTime(); wrld->numWorkers = nWorkers; wrld->numIdle = 0; for (int i = 0; i < nWorkers; i++) { pthread_t pid; args[i].wrld = wrld; args[i].id = i; if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) { fprintf (stderr, "unable to create worker thread\n"); exit (1); } pthread_detach (pid); } // wait for the computation to finish pthread_mutex_lock (&wrld->lock); pthread_cond_wait (&wrld->mainWait, &wrld->lock); pthread_mutex_unlock (&wrld->lock); double totalTime = airTime() - t0; if (VerboseFlg) fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime); else if (TimingFlg) printf ("np=%d usr=%f\n", nWorkers, totalTime); // output the final strand states if (NrrdOutputFlg) Diderot_Output (wrld, Diderot_Strands[0]->outputSzb); else Diderot_Print (wrld); Diderot_Shutdown (wrld); return 0; } /* Function which processes active strands. */ static void *Worker (void *arg) { WorkerArg_t *myArg = (WorkerArg_t *)arg; Diderot_World_t *wrld = myArg->wrld; int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers; int start = myArg->id * nStrandsPerWorker; int limit; if (wrld->numWorkers-1 == myArg->id) limit = wrld->numStrands; else limit = start + nStrandsPerWorker; while (true) { // barrier synchronization at start of super step pthread_mutex_lock (&wrld->lock); if (wrld->numIdle+1 < wrld->numWorkers) { wrld->numIdle++; pthread_cond_wait (&wrld->barrier, &wrld->lock); } else { // all other workers are idle, so we can proceed after some initialization wrld->numIdle = 0; wrld->numAvail = wrld->numStrands; // includes inactive strands wrld->nextStrand = 0; // swap in and out void **tmp = wrld->inState; wrld->inState = wrld->outState; wrld->outState = tmp; pthread_cond_broadcast (&wrld->barrier); } pthread_mutex_unlock (&wrld->lock); // if there are no active strands left, then we're done if (wrld->numActive == 0) { pthread_cond_signal (&wrld->mainWait); pthread_exit (0); } // iterate until there is no more work to do int blkStart, blkSize; int numDead = 0; do { // grab some work pthread_mutex_lock (&wrld->lock); blkStart = wrld->nextStrand; blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail; wrld->numAvail -= blkSize; wrld->nextStrand += blkSize; pthread_mutex_unlock (&wrld->lock); // update the strands for (int i = blkStart; i < blkStart+blkSize; i++) { if (! wrld->status[i]) { StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); switch (sts) { case DIDEROT_STABILIZE: wrld->status[i] = DIDEROT_STABILIZE; break; case DIDEROT_DIE: wrld->status[i] = DIDEROT_DIE; numDead++; break; default: break; } } else { assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE)); } } } while (blkSize > 0); // barrier synchronization pthread_mutex_lock (&wrld->lock); wrld->numActive -= numDead; if (wrld->numIdle+1 < wrld->numWorkers) { wrld->numIdle++; pthread_cond_wait (&wrld->barrier, &wrld->lock); } else { // all other workers are idle, so we can proceed wrld->numIdle = 0; pthread_cond_broadcast (&wrld->barrier); wrld->nSteps++; } pthread_mutex_unlock (&wrld->lock); /**** If there is a global computation phase, it goes here ****/ // stabilize any threads that need stabilization. Each worker is responsible for // a contiguous region of the strands // FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking int numStabilized = 0; for (int i = start; i < limit; i++) { if (wrld->status[i] == DIDEROT_STABILIZE) { // stabilize the strand's state. Note that the outState has been set by // the last call to update, so we make the inState be the target of the // stabilize method. Diderot_Strands[0]->stabilize(wrld->outState[i], wrld->inState[i]); memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); wrld->status[i] = DIDEROT_STABLE; numStabilized++; } } // adjust the numActive count #if defined(HAVE_BUILTIN_ATOMIC_OPS) __sync_fetch_and_sub(&wrld->numActive, numStabilized); #else pthread_mutex_lock (&wrld->lock); wrld->numActive -= numStabilized; pthread_mutex_unlock (&wrld->lock); #endif } // end while(true) } // block allocation of an initial collection of strands Diderot_World_t *Diderot_AllocInitially ( const char *name, // the name of the program Strand_t *strand, // the type of strands being allocated bool isArray, // is the initialization an array or collection? uint32_t nDims, // depth of iteration nesting int32_t *base, // nDims array of base indices uint32_t *size) // nDims array of iteration sizes { Diderot_World_t *wrld = NEW(Diderot_World_t); if (wrld == 0) { fprintf (stderr, "unable to allocate world\n"); exit (1); } wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */ wrld->isArray = isArray; wrld->nDims = nDims; wrld->base = NEWVEC(int32_t, nDims); wrld->size = NEWVEC(uint32_t, nDims); size_t numStrands = 1; for (int i = 0; i < wrld->nDims; i++) { numStrands *= size[i]; wrld->base[i] = base[i]; wrld->size[i] = size[i]; } if (VerboseFlg) { fprintf(stderr, "AllocInitially: %d", size[0]); for (int i = 1; i < nDims; i++) fprintf(stderr, " x %d", size[i]); fprintf(stderr, "\n"); } // allocate the strand state pointers wrld->numStrands = numStrands; wrld->inState = NEWVEC(void *, numStrands); wrld->outState = NEWVEC(void *, numStrands); wrld->status = NEWVEC(uint8_t, numStrands); wrld->numActive = wrld->numStrands; wrld->nSteps = 0; wrld->numWorkers = 0; // initialize strand state pointers etc. for (size_t i = 0; i < numStrands; i++) { wrld->inState[i] = CheckedAlloc (strand->stateSzb); wrld->outState[i] = CheckedAlloc (strand->stateSzb); wrld->status[i] = DIDEROT_ACTIVE; } pthread_mutex_init (&wrld->lock, NULL); pthread_cond_init (&wrld->barrier, NULL); pthread_cond_init (&wrld->mainWait, NULL); return wrld; } // get strand state pointers void *Diderot_InState (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return wrld->inState[i]; } void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i) { assert (i < wrld->numStrands); return wrld->outState[i]; }
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |