Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /branches/vis15/src/compiler/target-cpu/fragments/par-run.in
ViewVC logotype

View of /branches/vis15/src/compiler/target-cpu/fragments/par-run.in

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4489 - (download) (annotate)
Sat Sep 3 00:32:03 2016 UTC (3 years, 1 month ago) by jhr
File size: 7498 byte(s)
working on merge: changing initially to start
/* Function which processes active strands. */
static void *worker (void *arg)
{
    worker_arg *myArg = reinterpret_cast<worker_arg *>(arg);
    world *wrld = myArg->_wrld;
#ifndef DIDEROT_NO_GLOBALS
    globals *glob = wrld->_globals;
#endif
    diderot::scheduler *sched = wrld->_sched;

    int nStrandsPerWorker = wrld->_nstrands / sched->_numWorkers;
    int start = myArg->_id * nStrandsPerWorker;
    int limit;
    if (sched->_numWorkers-1 == myArg->_id)
        limit = wrld->_nstrands;
    else
        limit = start + nStrandsPerWorker;

    while (true) {
      // barrier synchronization at start of super step
        pthread_mutex_lock (&sched->_lock);
            if (sched->_nidle+1 < sched->numWorkers) {
                sched->_nidle++;
                pthread_cond_wait (&sched->_barrier, &sched->_lock);
            }
            else {
              // all other workers are idle, so we can proceed after some initialization
                sched->_nidle = 0;
                sched->_navail = wrld->_nstrands;  // includes inactive strands
                sched->nextStrand = 0;
                wrld->swap_state();
                pthread_cond_broadcast (&sched->_barrier);
            }
        pthread_mutex_unlock (&sched->_lock);

      // if there are no active strands left or we've hit the maximum number of steps, then we're done
        if ((sched->_nactive == 0) || (sched->_nsteps >= myArg->_max_nsteps)) {
            pthread_cond_signal (&sched->_runWait);
            pthread_exit (0);
        }

#ifdef DIDEROT_HAS_STRAND_COMMUNICATION
      // build spatial partition to support communication
        wrld->build_tree();
#endif

      // iterate until there is no more work to do
        int blkStart, blkSize;
        int numDead = 0;
        do {
          // grab some work
            pthread_mutex_lock (&sched->_lock);
                blkStart = sched->_nextStrand;
                blkSize = (sched->_navail >= BLOCK_SIZE) ? BLOCK_SIZE : sched->_navail;
                sched->_navail -= blkSize;
                sched->_nextStrand += blkSize;
            pthread_mutex_unlock (&sched->_lock);
          // update the strands
            for (int i = blkStart;  i < blkStart+blkSize;  i++) {
                if (! wrld->_status[i]) {
#ifdef DIDEROT_DUAL_STATE
                diderot::strand_status sts =
                    @STRAND@_update(@UPDATE_ARGS@wrld->_inState[i], wrld->_outState[i]);
#else
                diderot::strand_status sts =
                    @STRAND@_update(@UPDATE_ARGS@&wrld->_state[i]);
#endif
                    switch (sts) {
                      case diderot::kStabilize:
                        wrld->_status[i] = diderot::kStabilize;
                        break;
                      case diderot::kDie:
                        wrld->_status[i] = diderot::kDead;
                        numDead++;
                        break;
                      default:
                        break;
                    }
                }
                else {
                    assert ((wrld->_status[i] == diderot::kStable) || (wrld->_status[i] == diderot::kDead));
                }
            }
        } while (blkSize > 0);

      // barrier synchronization
        pthread_mutex_lock (&sched->_lock);
            sched->_nactive -= numDead;
            if (sched->_nidle+1 < sched->_numWorkers) {
                sched->_nidle++;
                pthread_cond_wait (&sched->_barrier, &sched->_lock);
            }
            else {
              // all other workers are idle, so we can proceed
                sched->_nidle = 0;
                pthread_cond_broadcast (&sched->_barrier);
                sched->_nsteps++;
            }
        pthread_mutex_unlock (&sched->_lock);

      // stabilize any threads that need stabilization.  Each worker is responsible for
      // a contiguous region of the strands
        int numStabilized = 0;
        for (int i = start;  i < limit;  i++) {
            if (wrld->_status[i] == diderot::kStabilize) {
#if defined(DIDEROT_HAS_STABILIZE)
	      // stabilize the strand's state.  Note that the _outState has been set by
	      // the last call to update, so we make the _inState be the target of the
	      // stabilize method.
#  ifdef DIDEROT_DUAL_STATE
		@STRAND@_stabilize(@STABILIZE_ARGS@&this->_outState[i], &this->_inState[i]);
#    if defined(DIDEROT_HAS_STRAND_COMMUNICATION) || defined(DIDEROT_HAS_MAPREDUCE)
	      // need to copy stable input state to output so that subsequent queries/reductions work
		std::memcpy (&this->_outState[i], &this->_inState[i], sizeof(@STRAND@_strand));
#    endif
#  else
		@STRAND@_stabilize(@STABILIZE_ARGS@&this->_state[i]);
#  endif
#elif defined(DIDEROT_DUAL_STATE)
	      // need to copy output state to input so that subsequent queries/reductions work
		std::memcpy (&this->_inState[i], &this->_outState[i], sizeof(@STRAND@_strand));
#endif
                wrld->_status[i] = diderot::kStable;
                numStabilized++;
            }
        }

/**** If there is a global computation phase, it goes here ****/

      // adjust the numActive count
#if defined(HAVE_BUILTIN_ATOMIC_OPS)
        __sync_fetch_and_sub(&sched->_nactive, numStabilized);
#else
        pthread_mutex_lock (&sched->_lock);
            sched->_nactive -= numStabilized;
        pthread_mutex_unlock (&sched->_lock);
#endif
    } // end while

    return 0;

}

//! Run the Diderot program (parallel version)
//! \param max_nsteps the limit on the number of super steps; 0 means unlimited
//! \return the number of steps taken, or -1 on error.
uint32_t world::run (uint32_t max_nsteps)
{
    if (this->_stage < diderot::POST_CREATE) {
        biffMsgAdd (this->_errors, "attempt to run uninitialized program\n");
        return -1;
    }
    else if (this->_stage == diderot::DONE) {
        return 0;
    }
    else if (this->_stage == diderot::POST_CREATE) {
#ifdef DIDEROT_HAS_GLOBAL_START
        this->global_start();
#endif
        this->_stage = diderot::RUNNING;
    }
    assert (this->_stage == diderot::RUNNING);

    diderot::scheduler *sched = this->_sched;

    if (maxNSteps == 0) max_nsteps = 0xffffffff;  // essentially unlimited

    double t0 = airTime();

  // Start worker threads
    int nWorkers = sched->_numWorkers;
    worker_arg *args = new worker_arg[nWorkers];
    if (this->_verbose) {
        std::cerr << "run with " << this->_nstrands << " strands / "
            << nWorkers << " workers ..." << std::endl;
    }
    double t0 = airTime();
    sched->_numWorkers = nWorkers;
    sched->_nidle = 0;
    sched->_nsteps = 0;
    for (int i = 0; i < nWorkers; i++) {
        pthread_t pid;
        args[i]._id = i;
        args[i]._max_nsteps = max_nsteps;
        args[i]._wrld = this;
        int sts = pthread_create (&pid, NULL, worker, (void *)&(args[i]));
        if (sts != 0) {
            biffMsgAddf (this->_errors, "unable to create worker thread; err = %d\n", sts);
            return -1;
        }
        pthread_detach (pid);
    }

  // wait for the computation to finish
    pthread_mutex_lock (&sched->_lock);
        while (sched->_nidle < nWorkers) {
            pthread_cond_wait (&sched->_runWait, &sched->_lock);
        }
    pthread_mutex_unlock (&sched->_lock);

    t0 = airTime() - t0;
    if (this->_verbose) {
        std::cerr << "done in " << t0 << " seconds" << std::endl;
    }
    this->_run_time += t0;

    delete[] args;

    return sched->_nsteps;

} // world::run


root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0