Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /branches/vis15/src/compiler/target-cpu/fragments/par-run.in
ViewVC logotype

View of /branches/vis15/src/compiler/target-cpu/fragments/par-run.in

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3937 - (download) (annotate)
Mon Jun 6 13:16:10 2016 UTC (2 years, 9 months ago) by jhr
File size: 7054 byte(s)
  Working on merge: FIXMEs
/* Function which processes active strands. */
static void *worker (void *arg)
{
    worker_arg *myArg = reinterpret_cast<worker_arg *>(arg);
    world *wrld = myArg->_wrld;
#ifndef DIDEROT_NO_GLOBALS
    globals *glob = wrld->_globals;
#endif
    Diderot_Sched_t *sched = wrld->_sched;

    int nStrandsPerWorker = wrld->_nstrands / sched->numWorkers;
    int start = myArg->_id * nStrandsPerWorker;
    int limit;
    if (sched->numWorkers-1 == myArg->_id)
        limit = wrld->_nstrands;
    else
        limit = start + nStrandsPerWorker;

    while (true) {
      // barrier synchronization at start of super step
        pthread_mutex_lock (&sched->lock);
            if (sched->numIdle+1 < sched->numWorkers) {
                sched->numIdle++;
                pthread_cond_wait (&sched->barrier, &sched->lock);
            }
            else {
              // all other workers are idle, so we can proceed after some initialization
                sched->numIdle = 0;
                sched->numAvail = wrld->_nstrands;  // includes inactive strands
                sched->nextStrand = 0;
		this->swap_state();
                pthread_cond_broadcast (&sched->barrier);
            }
        pthread_mutex_unlock (&sched->lock);

      // if there are no active strands left or we've hit the maximum number of steps, then we're done
        if ((sched->numActive == 0) || (sched->numSteps >= myArg->_max_nsteps)) {
            pthread_cond_signal (&sched->runWait);
            pthread_exit (0);
        }

      // iterate until there is no more work to do
        int blkStart, blkSize;
        int numDead = 0;
        do {
          // grab some work
            pthread_mutex_lock (&sched->lock);
                blkStart = sched->nextStrand;
                blkSize = (sched->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : sched->numAvail;
                sched->numAvail -= blkSize;
                sched->nextStrand += blkSize;
            pthread_mutex_unlock (&sched->lock);
          // update the strands
            for (int i = blkStart;  i < blkStart+blkSize;  i++) {
                if (! this->_status[i]) {
#ifdef DIDEROT_DUAL_STATE
# ifdef DIDEROT_NO_GLOBALS
                    diderot::strand_status sts = @STRAND@_update(this->_inState[i], this->_outState[i]);
# else
                    diderot::strand_status sts = @STRAND@_update(glob, this->_inState[i], this->_outState[i]);
# endif
#else
# ifdef DIDEROT_NO_GLOBALS
                    diderot::strand_status sts = @STRAND@_update(&this->_state[i]);
# else
                    diderot::strand_status sts = @STRAND@_update(glob, &this->_state[i]);
# endif
#endif
                    switch (sts) {
                      case diderot::kStabilize:
                        this->_status[i] = diderot::kStabilize;
                        break;
                      case diderot::kDie:
                        this->_status[i] = diderot::kDead;
                        numDead++;
                        break;
                      default:
                        break;
                    }
                }
                else {
                    assert ((this->_status[i] == diderot::kStable) || (this->_status[i] == diderot::kDead));
                }
            }
        } while (blkSize > 0);

      // barrier synchronization
        pthread_mutex_lock (&sched->lock);
            sched->numActive -= numDead;
            if (sched->numIdle+1 < sched->numWorkers) {
                sched->numIdle++;
                pthread_cond_wait (&sched->barrier, &sched->lock);
            }
            else {
              // all other workers are idle, so we can proceed
                sched->numIdle = 0;
                pthread_cond_broadcast (&sched->barrier);
                sched->numSteps++;
            }
        pthread_mutex_unlock (&sched->lock);

/**** If there is a global computation phase, it goes here ****/

      // stabilize any threads that need stabilization.  Each worker is responsible for
      // a contiguous region of the strands
        int numStabilized = 0;
        for (int i = start;  i < limit;  i++) {
            if (this->_status[i] == diderot::kStabilize) {
#ifdef DIDEROT_DUAL_STATE
              // stabilize the strand's state.  Note that the outState has been set by
              // the last call to update, so we make the inState be the target of the
              // stabilize method.
# ifdef DIDEROT_NO_GLOBALS
                @STRAND@_Stabilize(this->_outState[i], this->_inState[i]);
# else
                @STRAND@_Stabilize(glob, this->_outState[i], this->_inState[i]);
# endif
                std::memcpy (this->_outState[i], this->_inState[i], sizeof(@STRAND@_strand));
#else
# ifdef DIDEROT_NO_GLOBALS
                @STRAND@_stabilize(&this->_state[i]);
# else
                @STRAND@_stabilize(glob, &this->_state[i]);
# endif
#endif
                this->_status[i] = diderot::kStable;
                numStabilized++;
            }
        }
      // adjust the numActive count
#if defined(HAVE_BUILTIN_ATOMIC_OPS)
        __sync_fetch_and_sub(&sched->numActive, numStabilized);
#else
        pthread_mutex_lock (&sched->lock);
            sched->numActive -= numStabilized;
        pthread_mutex_unlock (&sched->lock);
#endif
    } // end while

    return 0;

}

//! Run the Diderot program (parallel version)
//! \param max_nsteps the limit on the number of super steps; 0 means unlimited
//! \return the number of steps taken, or -1 on error.
uint32_t world::run (uint32_t max_nsteps)
{
    if (this->_stage < diderot::POST_INITIALLY) {
        biffMsgAdd (this->_errors, "attempt to run uninitialized program\n");
        return -1;
    }
    this->_stage = diderot::RUNNING;

    sched_info *sched = this->_sched;

    if (maxNSteps == 0) max_nsteps = 0xffffffff;  // essentially unlimited

    double t0 = airTime();

  // Start worker threads
    int nWorkers = sched->numWorkers;
    worker_arg *args = new worker_arg[nWorkers];
    if (this->_verbose) {
	std::cerr << "run with " << this->_nstrands << " strands ..." << std::endl;
    }
    double t0 = airTime();
    sched->numWorkers = nWorkers;
    sched->numIdle = 0;
    sched->numSteps = 0;
    for (int i = 0; i < nWorkers; i++) {
        pthread_t pid;
        args[i].wrld = this;
        args[i].maxNSteps = maxNSteps;
        args[i].id = i;
        int sts = pthread_create (&pid, NULL, worker, (void *)&(args[i]));
        if (sts != 0) {
            biffMsgAddf (this->_errors, "unable to create worker thread; err = %d\n", sts);
            return -1;
        }
        pthread_detach (pid);
    }

  // wait for the computation to finish
    pthread_mutex_lock (&sched->lock);
        while (sched->numIdle < nWorkers) {
            pthread_cond_wait (&sched->runWait, &sched->lock);
	}
    pthread_mutex_unlock (&sched->lock);

    t0 = airTime() - t0;
    if (this->_verbose) {
	std::cerr << "done in " << t0 << " seconds" << std::endl;
    }
    this->_run_time += t0;

    delete[] args;

    return sched->numSteps;

} // world::run


root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0