Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /branches/pure-cfg/src/lib/parallel-target/main.c
ViewVC logotype

View of /branches/pure-cfg/src/lib/parallel-target/main.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1198 - (download) (as text) (annotate)
Thu May 12 04:18:38 2011 UTC (10 years, 6 months ago) by jhr
File size: 7910 byte(s)
  Add parallel runtime support
/*! \file main.c
 *
 * \author John Reppy
 */

/*
 * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
 * All rights reserved.
 */

#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <Diderot/diderot.h>
#include <pthread.h>
#include <semaphore.h>

// The number of strands a worker will take for processing at one time
#define BLOCK_SIZE 32

struct struct_world {
    bool	    isArray;	// is the initialization an array or collection?
    uint32_t	    nDims;	// depth of iteration nesting
    int32_t	    *base;	// nDims array of base indices
    uint32_t	    *size;	// nDims array of iteration sizes
    uint32_t	    numStrands; // number of strands in the world
    sem_t	    numActive;	// number of active strands in the world
    void	    **inState;
    void	    **outState;
    uint8_t	    *status;	// array of strand status flags
    uint32_t	    numThreads; // number of worker threads
    pthread_mutex_t lock;	// big lock to protect wrld from multiple accesses
    pthread_cond_t  workers;	// workers wait on this when they have no work
    uint8_t	    nWorkers;	// number of workers waiting for work to do
    pthread_cond_t  main;	// used to signal main when the workers have finished
    sem_t	    stabilizing;// number of strands stabilizing this step
    uint32_t	    nextStrand; // the next strand to be processed this iteration
};

extern float getOutf (void *self);

/* Function which processes active strands. */
void *worker_func (void *arg)
{
    Diderot_World_t *wrld = (Diderot_World_t *) arg;

    pthread_detach(pthread_self());

    while (true) {
	pthread_mutex_lock(&wrld->lock);

	// If there is no more work to do this iteration, we wait
	while(wrld->nextStrand == wrld->numStrands)
	{
	    wrld->nWorkers++;
	    if (wrld->nWorkers == wrld->numThreads) {
		pthread_cond_signal(&wrld->main);
	    }
	    pthread_cond_wait(&wrld->workers, &wrld->lock);
	    int nActive = 0;
	    sem_getvalue(&wrld->numActive, &nActive);
	    if (nActive == 0) {
		pthread_mutex_unlock(&wrld->lock);
		wrld->numThreads--;
		if (wrld->nWorkers == wrld->numThreads) {
		    pthread_cond_signal(&wrld->main);
		}
		pthread_exit(0);
	    }
	}

	// Get BLOCK_SIZE active strands to process
	uint32_t to_process = wrld->nextStrand;
	uint32_t end = to_process;
	uint32_t num = 0;
	while (end != wrld->numStrands && num < BLOCK_SIZE) {
	    if(! wrld->status[end]) num++;
	    end++;
	}
	wrld->nextStrand = end;

	pthread_mutex_unlock(&wrld->lock);

	// Update the strands in this block
	for (int i = to_process; i < end; i++) {
	    if (! wrld->status[i]) {
		StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
		switch (sts) {
		  case DIDEROT_STABILIZE:
		    wrld->status[i] = DIDEROT_STABILIZE;
		    sem_post(&wrld->stabilizing);
		    sem_wait(&wrld->numActive);
		    break;
		  case DIDEROT_DIE:
		    wrld->status[i] = DIDEROT_DIE;
		    sem_wait(&wrld->numActive);
		    break;
		  default:
		    break;
		}
	    }
	}
    }
}


int main (int argc, const char **argv)
{
    printf ("initializing globals ...\n");
    Diderot_InitGlobals ();

    printf ("initializing strands ...\n");
    Diderot_World_t *wrld = Diderot_Initially ();

    for (int i = 0;  i < wrld->numStrands;  i++) {
      // hack to make the invariant part of the state the same in both copies
	memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
    }

    CPUInfo_t cpuInfo;
    if (! GetNumCPUs (&cpuInfo)) {
	fprintf(stderr, "unable to get number of processors\n");
	exit (1);
    }
    // Start worker threads
    int nWorkers = cpuInfo.numHWCores;
    pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t));

    wrld->numThreads = nWorkers;
    for (int i = 0; i < nWorkers; i++) {
	pthread_create (&workers[i], NULL, worker_func, wrld);
    }

    int nSteps = 0;
    int nActive = 0;
    sem_getvalue(&wrld->numActive, &nActive);
    while(nActive > 0) {
	nSteps++;

	pthread_mutex_lock(&wrld->lock);

	// Wait until all workers have finished this iteration
	while(wrld->nWorkers != wrld->numThreads) {
	    pthread_cond_wait(&wrld->main, &wrld->lock);
	}
	wrld->nWorkers = 0;

	// Handle stabilizing strands
	int numStabilizing = 0;
	sem_getvalue(&wrld->stabilizing, &numStabilizing);
	for (int i = 0;	 i < wrld->numStrands && numStabilizing > 0;  i++) {
	    if (wrld->status[i] == DIDEROT_STABILIZE) {
		sem_wait(&wrld->stabilizing);
		memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
		wrld->status[i] = DIDEROT_STABLE;
		numStabilizing--;
	    }
	}

	sem_getvalue(&wrld->numActive, &nActive);

	// Do any global update stuff that needs doing
	void **tmp = wrld->inState;
	wrld->inState = wrld->outState;
	wrld->outState = tmp;

	wrld->nextStrand = 0;

	// Wake up all the workers to process the next step
	pthread_cond_broadcast(&wrld->workers);
	pthread_mutex_unlock(&wrld->lock);
    }

printf("done: %d steps\n", nSteps);
  // here we have the final state of all of the strands in the "in" buffer
    FILE *outS = fopen("mip.txt", "w");
    if (outS == NULL) {
	fprintf (stderr, "Cannot open output file\n");
	exit (8);
    }

    for (int i = 0;  i < wrld->numStrands;  i++) {
	if (wrld->status[i] == DIDEROT_STABLE)
	    Diderot_Strands[0]->print (outS, wrld->inState[i]);
    }
    fclose (outS);

    return 0;

}


// this should be the part of the scheduler
void *Diderot_AllocStrand (Strand_t *strand)
{
    return malloc(strand->stateSzb);
}

// block allocation of an initial collection of strands
Diderot_World_t *Diderot_AllocInitially (
    Strand_t *strand,	    // the type of strands being allocated
    bool isArray,	// is the initialization an array or collection?
    uint32_t nDims,	// depth of iteration nesting
    int32_t *base,	// nDims array of base indices
    uint32_t *size)	// nDims array of iteration sizes
{
    Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
    if (wrld == 0) {
	fprintf (stderr, "unable to allocate world\n");
	exit (1);
    }

    wrld->isArray = isArray;
    wrld->nDims = nDims;
    wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
    wrld->size = (uint32_t *) malloc (nDims * sizeof(uint32_t));
    size_t numStrands = 1;
    for (int i = 0;  i < wrld->nDims;  i++) {
	numStrands *= size[i];
	wrld->base[i] = base[i];
	wrld->size[i] = size[i];
    }

printf("AllocInitially: %d", size[0]);
for (int i = 1;	 i < nDims;  i++) printf(" x %d", size[i]);
printf("\n");

  // allocate the strand state pointers
    wrld->numStrands = numStrands;
    sem_init (&wrld->numActive, 0, numStrands);
    wrld->inState = (void **) malloc (numStrands * sizeof(void *));
    wrld->outState = (void **) malloc (numStrands * sizeof(void *));
    wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
    if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
	fprintf (stderr, "unable to allocate strand states\n");
	exit (1);
    }

  // initialize strand state pointers etc.
    for (size_t i = 0;	i < numStrands;	 i++) {
	wrld->inState[i] = Diderot_AllocStrand (strand);
	wrld->outState[i] = Diderot_AllocStrand (strand);
	wrld->status[i] = DIDEROT_ACTIVE;
    }

    wrld->numThreads = 0;
    pthread_mutex_init(&wrld->lock, NULL);
    pthread_cond_init(&wrld->workers, NULL);
    pthread_cond_init(&wrld->main, NULL);
    wrld->nWorkers = 0;
    sem_init(&wrld->stabilizing, 0, 0);
    wrld->nextStrand = 0;

    return wrld;

}

// get strand state pointers
void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return wrld->inState[i];
}

void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return wrld->outState[i];
}

bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
{
    assert (i < wrld->numStrands);
    return !wrld->status[i];
}

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0