Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /trunk/src/lib/parallel-target/main.c
ViewVC logotype

View of /trunk/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1232 - (download) (as text) (annotate)
Mon May 16 23:37:52 2011 UTC (8 years, 5 months ago) by jhr
File size: 9565 byte(s)
  Porting many changes from the pure-cfg branch, including value numbering
  and support for parallel execution on SMP systems.
/*! \file main.c
 *
 * \author John Reppy
 */

/*
 * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
 * All rights reserved.
 */

#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <Diderot/diderot.h>
#include <pthread.h>

// #ifdef HAVE_BUILTIN_ATOMIC_OPS
// STATIC_INLINE uint32_t AtomicInc (uint32_t *x)
// {
//     return __sync_add_and_fetch(x, 1);
// }
// STATIC_INLINE uint32_t AtomicDec (uint32_t *x)
// {
//     return __sync_sub_and_fetch(x, 1);
// }
// #else
// #  error atomic operations not supported
// #endif

// The number of strands a worker will take for processing at one time
#define BLOCK_SIZE 256

struct struct_world {
    bool		isArray;	// is the initialization an array or collection?
    uint32_t		nDims;		// depth of iteration nesting
    int32_t		*base;		// nDims array of base indices
    uint32_t		*size;		// nDims array of iteration sizes
    uint32_t		numStrands;	// number of strands in the world
    void		**inState;
    void		**outState;
    uint8_t		*status;	// array of strand status flags
    uint32_t		numWorkers;	// number of worker threads
    uint32_t		nSteps;		// number of super steps
  // synchronization state
    uint32_t		nextStrand __attribute__((aligned(64))); // index of next strand to update
    uint32_t		numActive __attribute__((aligned(64))); // # active strands
    uint32_t		numAvail __attribute__((aligned(64))); // # unevaluated strands
    uint32_t		numIdle __attribute__((aligned(64)));	// # idle workers
    pthread_mutex_t	lock;		// big lock to protect wrld from multiple accesses
    pthread_cond_t	barrier;	// workers wait on this when they have no work
    pthread_cond_t	mainWait;	// used to signal main when the workers have finished
};

typedef struct {
    int			id;
    Diderot_World_t	*wrld;
} WorkerArg_t;

/* Function which processes active strands. */
static void *Worker (void *arg)
{
    WorkerArg_t		*myArg = (WorkerArg_t *)arg;
    Diderot_World_t	*wrld = myArg->wrld;
    bool		existsStabilizing;

    while (true) {
      // barrier synchronization at start of super step
	pthread_mutex_lock (&wrld->lock);
	    if (wrld->numIdle+1 < wrld->numWorkers) {
		wrld->numIdle++;
		pthread_cond_wait (&wrld->barrier, &wrld->lock);
	    }
	    else {
	      // all other workers are idle, so we can proceed after some initialization
		wrld->numIdle = 0;
		wrld->numAvail = wrld->numStrands;  // includes inactive strands
		wrld->nextStrand = 0;
	      // swap in and out
		void **tmp = wrld->inState;
		wrld->inState = wrld->outState;
		wrld->outState = tmp;
		pthread_cond_broadcast (&wrld->barrier);
	    }
	pthread_mutex_unlock (&wrld->lock);

      // if there are no active strands left, then we're done
	if (wrld->numActive == 0) {
	    pthread_cond_signal (&wrld->mainWait);
	    pthread_exit (0);
	}

      // iterate until there is no more work to do
	int blkStart, blkSize;
	existsStabilizing = false;
	int numDead = 0;
	do {
	  // grab some work
	    pthread_mutex_lock (&wrld->lock);
		blkStart = wrld->nextStrand;
		blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail;
		wrld->numAvail -= blkSize;
		wrld->nextStrand += blkSize;
	    pthread_mutex_unlock (&wrld->lock);
	  // update the strands
	    for (int i = blkStart;  i < blkStart+blkSize;  i++) {
		if (! wrld->status[i]) {
		    StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
		    switch (sts) {
		      case DIDEROT_STABILIZE:
			wrld->status[i] = DIDEROT_STABILIZE;
			existsStabilizing = true;
			break;
		      case DIDEROT_DIE:
			wrld->status[i] = DIDEROT_DIE;
			numDead++;
			break;
		      default:
			break;
		    }
		}
		else {
		    assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE));
		}
	    }
	} while (blkSize > 0);

      // barrier synchronization
	pthread_mutex_lock (&wrld->lock);
	    wrld->numActive -= numDead;
	    if (wrld->numIdle+1 < wrld->numWorkers) {
		wrld->numIdle++;
		pthread_cond_wait (&wrld->barrier, &wrld->lock);
	    }
	    else {
	      // all other workers are idle, so we can proceed
		wrld->numIdle = 0;
		pthread_cond_broadcast (&wrld->barrier);
		wrld->nSteps++;
	    }
	pthread_mutex_unlock (&wrld->lock);

/**** If there is a global computation phase, it goes here ****/

      // stabilize any threads that need stabilization.  Each worker is responsible for
      // a contiguous region of the strands
	if (existsStabilizing) {
	    int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers;
	    int start = myArg->id * nStrandsPerWorker;
	    int limit = start + nStrandsPerWorker;
	    if (limit > wrld->numStrands) limit = wrld->numStrands;
	    int numStabilized = 0;
	    for (int i = start;  i < limit;  i++) {
		if (wrld->status[i] == DIDEROT_STABILIZE) {
		  // copy out to in so that both copies are the stable state
		    memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
		    wrld->status[i] = DIDEROT_STABLE;
		    numStabilized++;
		}
	    }
	  // adjust the numActive count
#if defined(HAVE_BUILTIN_ATOMIC_OPS)
	    __sync_fetch_and_sub(&wrld->numActive, numStabilized);
#else
	    pthread_mutex_lock (&wrld->lock);
		wrld->numActive -= numStabilized;
	    pthread_mutex_unlock (&wrld->lock);
#endif
	}
    } // end while(true)

}


int main (int argc, const char **argv)
{
    fprintf (stderr, "initializing globals ...\n");
    Diderot_InitGlobals (argc, argv);

    fprintf (stderr, "initializing strands ...\n");
    Diderot_World_t *wrld = Diderot_Initially ();
    for (int i = 0;  i < wrld->numStrands;  i++) {
      // hack to make the invariant part of the state the same in both copies
	memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
    }

    CPUInfo_t cpuInfo;
    if (! GetNumCPUs (&cpuInfo)) {
	fprintf(stderr, "unable to get number of processors\n");
	exit (1);
    }

  // Start worker threads
    int nWorkers = cpuInfo.numHWCores;
    WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t));
    printf ("initializing %d workers ...\n", nWorkers);
    wrld->numWorkers = nWorkers;
    wrld->numIdle = 0;
    for (int i = 0; i < nWorkers; i++) {
	pthread_t pid;
	args[i].wrld = wrld;
	args[i].id = i;
	if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) {
	    fprintf (stderr, "unable to create worker thread\n");
	    exit (1);
	}
	pthread_detach (pid);
    }

  // wait for the computation to finish
    pthread_mutex_lock (&wrld->lock);
	pthread_cond_wait (&wrld->mainWait, &wrld->lock);
    pthread_mutex_unlock (&wrld->lock);

fprintf (stderr, "done: %d steps\n", wrld->nSteps);
  // here we have the final state of all of the strands in the "in" buffer
    FILE *outS = fopen("mip.txt", "w");
    if (outS == NULL) {
	fprintf (stderr, "Cannot open output file\n");
	exit (8);
    }

    for (int i = 0;  i < wrld->numStrands;  i++) {
	if (wrld->status[i] == DIDEROT_STABLE)
	    Diderot_Strands[0]->print (outS, wrld->inState[i]);
    }
    fclose (outS);

    Diderot_Shutdown (wrld);

    return 0;

}


// this should be the part of the scheduler
void *Diderot_AllocStrand (Strand_t *strand)
{
    return malloc(strand->stateSzb);
}

// block allocation of an initial collection of strands
Diderot_World_t *Diderot_AllocInitially (
    Strand_t *strand,	// the type of strands being allocated
    bool isArray,	// is the initialization an array or collection?
    uint32_t nDims,	// depth of iteration nesting
    int32_t *base,	// nDims array of base indices
    uint32_t *size)	// nDims array of iteration sizes
{
    Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
    if (wrld == 0) {
	fprintf (stderr, "unable to allocate world\n");
	exit (1);
    }

    wrld->isArray = isArray;
    wrld->nDims = nDims;
    wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
    wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t));
    size_t numStrands = 1;
    for (int i = 0;  i < wrld->nDims;  i++) {
	numStrands *= size[i];
	wrld->base[i] = base[i];
	wrld->size[i] = size[i];
    }

fprintf(stderr, "AllocInitially: %d", size[0]);
for (int i = 1;	 i < nDims;  i++)
    fprintf(stderr, " x %d", size[i]);
fprintf(stderr, "\n");

  // allocate the strand state pointers
    wrld->numStrands = numStrands;
    wrld->inState = (void **) malloc (numStrands * sizeof(void *));
    wrld->outState = (void **) malloc (numStrands * sizeof(void *));
    wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
    if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
	fprintf (stderr, "unable to allocate strand states\n");
	exit (1);
    }
    wrld->numActive = wrld->numStrands;
    wrld->nSteps = 0;
    wrld->numWorkers = 0;

  // initialize strand state pointers etc.
    for (size_t i = 0;	i < numStrands;	 i++) {
	wrld->inState[i] = Diderot_AllocStrand (strand);
	wrld->outState[i] = Diderot_AllocStrand (strand);
	wrld->status[i] = DIDEROT_ACTIVE;
    }

    pthread_mutex_init (&wrld->lock, NULL);
    pthread_cond_init (&wrld->barrier, NULL);
    pthread_cond_init (&wrld->mainWait, NULL);

    return wrld;

}

// get strand state pointers
void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return wrld->inState[i];
}

void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return wrld->outState[i];
}

bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return !wrld->status[i];
}

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0