Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /trunk/src/lib/main-parallel.c
ViewVC logotype

View of /trunk/src/lib/main-parallel.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1187 - (download) (as text) (annotate)
Wed May 11 15:06:15 2011 UTC (9 years, 9 months ago) by nseltzer
File size: 7511 byte(s)
First go at a parallel runtime
/*! \file main.c
 *
 * \author John Reppy
 */

/*
 * COPYRIGHT (c) 2011 The Diderot Project (http://diderot-language.cs.uchicago.edu)
 * All rights reserved.
 */

#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <Diderot/diderot.h>
#include <pthread.h>
#include <semaphore.h>

// The number of workers in the thread pool
#define NTHREADS 3

// The number of strands a worker will take for processing at one time
#define BLOCK_SIZE 32

struct struct_world {
	bool			isArray;	// is the initialization an array or collection?
	uint32_t		nDims;		// depth of iteration nesting
	int32_t			*base;		// nDims array of base indices
	uint32_t		*size;		// nDims array of iteration sizes
	uint32_t		numStrands;	// number of strands in the world
	sem_t			numActive;	// number of active strands in the world
	void			**inState;
	void			**outState;
	uint8_t			*status;	// array of strand status flags
	uint32_t		numThreads;	// number of worker threads
	pthread_mutex_t	lock;		// big lock to protect wrld from multiple accesses
	pthread_cond_t 	workers;	// workers wait on this when they have no work
	uint8_t			nWorkers;	// number of workers waiting for work to do
	pthread_cond_t 	main;		// used to signal main when the workers have finished
	sem_t			stabilizing;// number of strands stabilizing this step
	uint32_t		nextStrand;	// the next strand to be processed this iteration
};

extern float getOutf (void *self);

/* Function which processes active strands. */
void * worker_func(void *arg)
{
	Diderot_World_t *wrld = (Diderot_World_t *) arg;

	pthread_detach(pthread_self());

	while(true)
	{
		pthread_mutex_lock(&wrld->lock);

		// If there is no more work to do this iteration, we wait
		while(wrld->nextStrand == wrld->numStrands)
		{
			wrld->nWorkers++;
			if(wrld->nWorkers == wrld->numThreads)
			{
				pthread_cond_signal(&wrld->main);
			}
			pthread_cond_wait(&wrld->workers, &wrld->lock);
			int nActive = 0;
			sem_getvalue(&wrld->numActive, &nActive);
			if(nActive == 0)
			{
				pthread_mutex_unlock(&wrld->lock);
				wrld->numThreads--;
				if(wrld->nWorkers == wrld->numThreads)
				{
					pthread_cond_signal(&wrld->main);
				}
				pthread_exit(0);
			}
		}

		// Get BLOCK_SIZE active strands to process
		uint32_t to_process = wrld->nextStrand;
		uint32_t end = to_process;
		uint32_t num = 0;
		while(end != wrld->numStrands && num < BLOCK_SIZE)
		{
			if(! wrld->status[end]) num++;
			end++;
		}
		wrld->nextStrand = end;

		pthread_mutex_unlock(&wrld->lock);

		// Update the strands in this block
		for(int i = to_process; i < end; i++)
		{
			if (! wrld->status[i])
			{
				StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]);
				switch (sts)
				{
					case DIDEROT_STABILIZE:
						wrld->status[i] = DIDEROT_STABILIZE;
						sem_post(&wrld->stabilizing);
						sem_wait(&wrld->numActive);
						break;
					case DIDEROT_DIE:
						wrld->status[i] = DIDEROT_DIE;
						sem_wait(&wrld->numActive);
						break;
				}
			}
		}
	}
}


int main (int argc, const char **argv)
{
	printf("initializing globals ...\n");
	Diderot_InitGlobals ();

	printf("initializing strands ...\n");
	Diderot_World_t *wrld = Diderot_Initially ();

	for (int i = 0;  i < wrld->numStrands;  i++)
	{
		// hack to make the invariant part of the state the same in both copies
		memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb);
	}

	// Start worker threads
	pthread_t workers[NTHREADS];

	wrld->numThreads = NTHREADS;
	for (int i = 0; i < NTHREADS; i++)
	{
		pthread_create(&workers[i], NULL, worker_func, wrld);
	}

	int nSteps = 0;
	int nActive = 0;
	sem_getvalue(&wrld->numActive, &nActive);
	while(nActive > 0)
	{
		nSteps++;

		pthread_mutex_lock(&wrld->lock);

		// Wait until all workers have finished this iteration
		while(wrld->nWorkers != wrld->numThreads)
		{
			pthread_cond_wait(&wrld->main, &wrld->lock);
		}
		wrld->nWorkers = 0;

		// Handle stabilizing strands
		int numStabilizing = 0;
		sem_getvalue(&wrld->stabilizing, &numStabilizing);
		for (int i = 0;  i < wrld->numStrands && numStabilizing > 0;  i++)
		{
			if (wrld->status[i] == DIDEROT_STABILIZE)
			{
				sem_wait(&wrld->stabilizing);
				memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb);
				wrld->status[i] = DIDEROT_STABLE;
				numStabilizing--;
			}
		}

		sem_getvalue(&wrld->numActive, &nActive);

		// Do any global update stuff that needs doing
		void **tmp = wrld->inState;
		wrld->inState = wrld->outState;
		wrld->outState = tmp;

		wrld->nextStrand = 0;

		// Wake up all the workers to process the next step
		pthread_cond_broadcast(&wrld->workers);
		pthread_mutex_unlock(&wrld->lock);
	}

printf("done: %d steps\n", nSteps);
  // here we have the final state of all of the strands in the "in" buffer
	FILE *outS = fopen("mip.txt", "w");
	if (outS == NULL) {
		fprintf (stderr, "Cannot open output file\n");
		exit (8);
	}

	for (int i = 0;  i < wrld->numStrands;  i++) {
	if (wrld->status[i] == DIDEROT_STABLE)
		Diderot_Strands[0]->print (outS, wrld->inState[i]);
	}
	fclose (outS);

	return 0;

}


// this should be the part of the scheduler
void *Diderot_AllocStrand (Strand_t *strand)
{
	return malloc(strand->stateSzb);
}

// block allocation of an initial collection of strands
Diderot_World_t *Diderot_AllocInitially (
	Strand_t *strand,		// the type of strands being allocated
	bool isArray,		// is the initialization an array or collection?
	uint32_t nDims,		// depth of iteration nesting
	int32_t *base,		// nDims array of base indices
	uint32_t *size)		// nDims array of iteration sizes
{
	Diderot_World_t *wrld = (Diderot_World_t *) malloc (sizeof(Diderot_World_t));
	if (wrld == 0) {
	fprintf (stderr, "unable to allocate world\n");
	exit (1);
	}

	wrld->isArray = isArray;
	wrld->nDims = nDims;
	wrld->base = (int32_t *) malloc (nDims * sizeof(int32_t));
	wrld->size = (int32_t *) malloc (nDims * sizeof(int32_t));
	size_t numStrands = 1;
	for (int i = 0;  i < wrld->nDims;  i++) {
	numStrands *= size[i];
	wrld->base[i] = base[i];
	wrld->size[i] = size[i];
	}

printf("AllocInitially: %d", size[0]);
for (int i = 1;  i < nDims;  i++) printf(" x %d", size[i]);
printf("\n");

  // allocate the strand state pointers
	wrld->numStrands = numStrands;
	sem_init(&wrld->numActive, 0, numStrands);
	wrld->inState = (void **) malloc (numStrands * sizeof(void *));
	wrld->outState = (void **) malloc (numStrands * sizeof(void *));
	wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t));
	if ((wrld->inState == 0) || (wrld->outState == 0) || (wrld->status == 0)) {
	fprintf (stderr, "unable to allocate strand states\n");
	exit (1);
	}

  // initialize strand state pointers etc.
	for (size_t i = 0;  i < numStrands;  i++) {
	wrld->inState[i] = Diderot_AllocStrand (strand);
	wrld->outState[i] = Diderot_AllocStrand (strand);
	wrld->status[i] = DIDEROT_ACTIVE;
	}

	wrld->numThreads = 0;
	pthread_mutex_init(&wrld->lock, NULL);
	pthread_cond_init(&wrld->workers, NULL);
	pthread_cond_init(&wrld->main, NULL);
	wrld->nWorkers = 0;
	sem_init(&wrld->stabilizing, 0, 0);
	wrld->nextStrand = 0;

	return wrld;

}

// get strand state pointers
void *Diderot_InState (Diderot_World_t *wrld, uint32_t i)
{
	assert (i < wrld->numStrands);
	return wrld->inState[i];
}

void *Diderot_OutState (Diderot_World_t *wrld, uint32_t i)
{
	assert (i < wrld->numStrands);
	return wrld->outState[i];
}

bool Diderot_IsActive (Diderot_World_t *wrld, uint32_t i)
{
	assert (i < wrld->numStrands);
	return !wrld->status[i];
}

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0