Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /trunk/src/lib/parallel-target/main.c
ViewVC logotype

View of /trunk/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1825 - (download) (as text) (annotate)
Thu Apr 12 13:57:45 2012 UTC (7 years, 4 months ago) by jhr
File size: 10650 byte(s)
  Use NEWVEC macro to allocate worker argument array
/*! \file main.c
 *
 * \author John Reppy
 */

/*
 * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
 * All rights reserved.
 */

#include <Diderot/diderot.h>
#include <pthread.h>
#include <teem/nrrd.h>

// #ifdef HAVE_BUILTIN_ATOMIC_OPS
// STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
// {
//     return __sync_add_and_fetch(x, 1);
// }
// STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
// {
//     return __sync_sub_and_fetch(x, 1);
// }
// #else
// #  error atomic operations not supported
// #endif

// The number of strands a worker will take for processing at one time
#define BLOCK_SIZE 4096

// cache-line alignment (Xeon has 64-byte lines)
#define CACHE_ALIGN     __attribute__((aligned(64)))

struct struct_world {
    STRUCT_WORLD_PREFIX
    void                **inState;
    void                **outState;
    uint32_t            numWorkers;     // number of worker threads
    uint32_t            nSteps;         // number of super steps
  // synchronization state
    uint32_t            nextStrand CACHE_ALIGN; // index of next strand to update
    uint32_t            numActive CACHE_ALIGN;  // # active strands
    uint32_t            numAvail CACHE_ALIGN;   // # unevaluated strands
    uint32_t            numIdle CACHE_ALIGN;    // # idle workers
    pthread_mutex_t     lock;           // big lock to protect wrld from multiple accesses
    pthread_cond_t      barrier;        // workers wait on this when they have no work
    pthread_cond_t      mainWait;       // used to signal main when the workers have finished
};

typedef struct {
    int                 id;
    Diderot_World_t     *wrld;
} WorkerArg_t CACHE_ALIGN;

static void *Worker (void *arg);

int main (int argc, const char **argv)
{
    CPUInfo_t cpuInfo;
    if (! GetNumCPUs (&cpuInfo)) {
        fprintf(stderr, "unable to get number of processors\n");
        exit (1);
    }

    Diderot_int_t np = cpuInfo.numHWCores;

    Diderot_Options_t *opts = Diderot_OptNew ();

    Diderot_OptAddInt (opts, "np", "specify number of worker threads", &np, true);
    Diderot_RegisterGlobalOpts (opts);
    Diderot_OptProcess (opts, argc, argv);
    Diderot_OptFree (opts);

    if (VerboseFlg) fprintf (stderr, "initializing globals ...\n");
    Diderot_InitGlobals ();

    if (VerboseFlg) fprintf (stderr, "initializing strands ...\n");
    Diderot_World_t *wrld = Diderot_Initially ();

    for (int i = 0;  i < wrld->numStrands;  i++) {
      // hack to make the invariant part of the state the same in both copies
        memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
    }

  // Start worker threads
    int nWorkers = np;
    WorkerArg_t *args = NEWVEC(WorkerArg_t, nWorkers);
    if (VerboseFlg) printf ("initializing %d workers ...\n", nWorkers);
    double t0 = airTime();
    wrld->numWorkers = nWorkers;
    wrld->numIdle = 0;
    for (int i = 0; i < nWorkers; i++) {
        pthread_t pid;
        args[i].wrld = wrld;
        args[i].id = i;
        if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
            fprintf (stderr, "unable to create worker thread\n");
            exit (1);
        }
        pthread_detach (pid);
    }

  // wait for the computation to finish
    pthread_mutex_lock (&wrld->lock);
        pthread_cond_wait (&wrld->mainWait, &wrld->lock);
    pthread_mutex_unlock (&wrld->lock);

    double totalTime = airTime() - t0;

    if (VerboseFlg)
        fprintf (stderr, "done: %d steps, in %f seconds\n", wrld->nSteps, totalTime);
    else if (TimingFlg)
        printf ("np=%d usr=%f\n", nWorkers, totalTime);

  // output the final strand states
    if (NrrdOutputFlg)
        Diderot_Output (wrld, Diderot_Strands[0]->outputSzb);
    else
        Diderot_Print (wrld);

    Diderot_Shutdown (wrld);

    return 0;

}

/* Function which processes active strands. */
static void *Worker (void *arg)
{
    WorkerArg_t         *myArg = (WorkerArg_t *)arg;
    Diderot_World_t     *wrld = myArg->wrld;

    int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
    int start = myArg->id * nStrandsPerWorker;
    int limit;
    if (wrld->numWorkers-1 == myArg->id)
        limit = wrld->numStrands;
    else
        limit = start + nStrandsPerWorker;

    while (true) {
      // barrier synchronization at start of super step
        pthread_mutex_lock (&wrld->lock);
            if (wrld->numIdle+1 < wrld->numWorkers) {
                wrld->numIdle++;
                pthread_cond_wait (&wrld->barrier, &wrld->lock);
            }
            else {
              // all other workers are idle, so we can proceed after some initialization
                wrld->numIdle = 0;
                wrld->numAvail = wrld->numStrands;  // includes inactive strands
                wrld->nextStrand = 0;
              // swap in and out
                void **tmp = wrld->inState;
                wrld->inState = wrld->outState;
                wrld->outState = tmp;
                pthread_cond_broadcast (&wrld->barrier);
            }
        pthread_mutex_unlock (&wrld->lock);

      // if there are no active strands left, then we're done
        if (wrld->numActive == 0) {
            pthread_cond_signal (&wrld->mainWait);
            pthread_exit (0);
        }

      // iterate until there is no more work to do
        int blkStart, blkSize;
        int numDead = 0;
        do {
          // grab some work
            pthread_mutex_lock (&wrld->lock);
                blkStart = wrld->nextStrand;
                blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
                wrld->numAvail -= blkSize;
                wrld->nextStrand += blkSize;
            pthread_mutex_unlock (&wrld->lock);
          // update the strands
            for (int i = blkStart;  i < blkStart+blkSize;  i++) {
                if (! wrld->status[i]) {
                    StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
                    switch (sts) {
                      case DIDEROT_STABILIZE:
                        wrld->status[i] = DIDEROT_STABILIZE;
                        break;
                      case DIDEROT_DIE:
                        wrld->status[i] = DIDEROT_DIE;
                        numDead++;
                        break;
                      default:
                        break;
                    }
                }
                else {
                    assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE));
                }
            }
        } while (blkSize > 0);

      // barrier synchronization
        pthread_mutex_lock (&wrld->lock);
            wrld->numActive -= numDead;
            if (wrld->numIdle+1 < wrld->numWorkers) {
                wrld->numIdle++;
                pthread_cond_wait (&wrld->barrier, &wrld->lock);
            }
            else {
              // all other workers are idle, so we can proceed
                wrld->numIdle = 0;
                pthread_cond_broadcast (&wrld->barrier);
                wrld->nSteps++;
            }
        pthread_mutex_unlock (&wrld->lock);

/**** If there is a global computation phase, it goes here ****/

      // stabilize any threads that need stabilization.  Each worker is responsible for
      // a contiguous region of the strands
// FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking
        int numStabilized = 0;
        for (int i = start;  i < limit;  i++) {
            if (wrld->status[i] == DIDEROT_STABILIZE) {
              // stabilize the strand's state.  Note that the outState has been set by
              // the last call to update, so we make the inState be the target of the
              // stabilize method.
                Diderot_Strands[0]->stabilize(wrld->outState[i], wrld->inState[i]);
                memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
                wrld->status[i] = DIDEROT_STABLE;
                numStabilized++;
            }
        }
      // adjust the numActive count
#if defined(HAVE_BUILTIN_ATOMIC_OPS)
        __sync_fetch_and_sub(&wrld->numActive, numStabilized);
#else
        pthread_mutex_lock (&wrld->lock);
            wrld->numActive -= numStabilized;
        pthread_mutex_unlock (&wrld->lock);
#endif
    } // end while(true)

}

// block allocation of an initial collection of strands
Diderot_World_t *Diderot_AllocInitially (
    const char *name,           // the name of the program
    Strand_t *strand,           // the type of strands being allocated
    bool isArray,               // is the initialization an array or collection?
    uint32_t nDims,             // depth of iteration nesting
    int32_t *base,              // nDims array of base indices
    uint32_t *size)             // nDims array of iteration sizes
{
    Diderot_World_t *wrld = NEW(Diderot_World_t);
    if (wrld == 0) {
        fprintf (stderr, "unable to allocate world\n");
        exit (1);
    }

    wrld->name = name; /* NOTE: we are assuming that name is statically allocated! */
    wrld->isArray = isArray;
    wrld->nDims = nDims;
    wrld->base = NEWVEC(int32_t, nDims);
    wrld->size = NEWVEC(uint32_t, nDims);
    size_t numStrands = 1;
    for (int i = 0;  i < wrld->nDims;  i++) {
        numStrands *= size[i];
        wrld->base[i] = base[i];
        wrld->size[i] = size[i];
    }

    if (VerboseFlg) {
        fprintf(stderr, "AllocInitially: %d", size[0]);
        for (int i = 1;  i < nDims;  i++) fprintf(stderr, " x %d", size[i]);
        fprintf(stderr, "\n");
    }

  // allocate the strand state pointers
    wrld->numStrands = numStrands;
    wrld->inState = NEWVEC(void *, numStrands);
    wrld->outState = NEWVEC(void *, numStrands);
    wrld->status = NEWVEC(uint8_t, numStrands);
    wrld->numActive = wrld->numStrands;
    wrld->nSteps = 0;
    wrld->numWorkers = 0;

  // initialize strand state pointers etc.
    for (size_t i = 0;  i < numStrands;  i++) {
        wrld->inState[i] = CheckedAlloc (strand->stateSzb);
        wrld->outState[i] = CheckedAlloc (strand->stateSzb);
        wrld->status[i] = DIDEROT_ACTIVE;
    }

    pthread_mutex_init (&wrld->lock, NULL);
    pthread_cond_init (&wrld->barrier, NULL);
    pthread_cond_init (&wrld->mainWait, NULL);

    return wrld;

}

// get strand state pointers
void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return wrld->inState[i];
}

void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return wrld->outState[i];
}

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0