Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] View of /branches/vis15/src/compiler/target-cpu/fragments/par-run-nobsp.in
ViewVC logotype

View of /branches/vis15/src/compiler/target-cpu/fragments/par-run-nobsp.in

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4489 - (download) (annotate)
Sat Sep 3 00:32:03 2016 UTC (3 years, 1 month ago) by jhr
File size: 4621 byte(s)
working on merge: changing initially to start
/* Worker thread for when we do not need super-step synchronization */
static void *worker (void *arg)
{
    worker_arg *myArg = reinterpret_cast<worker_arg *>(arg);
    world *wrld = myArg->_wrld;
#ifndef DIDEROT_NO_GLOBALS
    globals *glob = wrld->_globals;
#endif
    diderot::scheduler *sched = wrld->_sched;

  // iterate until there is no more work to do
    int blkStart, blkSize;
    uint32_t numDead = 0;
    uint32_t numStabilized = 0;
    uint32_t maxSteps = 0;
    uint32_t stepLimit = myArg->_max_nsteps;
    do {
      // grab a block of strands
        pthread_mutex_lock (&sched->_lock);
            blkStart = sched->_nextStrand;
            blkSize = (sched->_navail >= BLOCK_SIZE) ? BLOCK_SIZE : sched->_navail;
            sched->_navail -= blkSize;
            sched->_nextStrand += blkSize;
        pthread_mutex_unlock (&sched->_lock);
      // update the strands
        for (int i = blkStart;  i < blkStart+blkSize;  i++) {
          // run the strand to completion, or until the step limit is exceeded
            @STRANDTY@ *self = &wrld->_state[i];
            diderot::strand_status sts = static_cast<diderot::strand_status>(wrld->_status[i]);
            uint32_t nSteps = 0;
            while ((! sts) && (nSteps < stepLimit)) {
                nSteps++;
                sts = @STRAND@_update(@UPDATE_ARGS@self);
            }
          // finalize the strand state as necessary
            switch (sts) {
              case diderot::kStabilize:
              // stabilize the strand's state.
#ifdef DIDEROT_HAS_STABILIZE
                @STRAND@_stabilize(@STABILIZE_ARGS@self);
#endif
                wrld->_status[i] = diderot::kStable;
                numStabilized++;
                break;
              case diderot::kDie:
                wrld->_status[i] = diderot::kDead;
                numDead++;
                break;
              default:
                assert (sts == wrld->_status[i]);
                break;
            }
            if (maxSteps < nSteps) maxSteps = nSteps;
        }
    } while (blkSize > 0);

  // update scheduler state
    pthread_mutex_lock (&sched->_lock);
        sched->_nactive -= numStabilized + numDead;
        sched->_nstable += numStabilized;
        if (maxSteps > sched->_nsteps) {
            sched->_nsteps = maxSteps;
        }
        if (++sched->_nidle == sched->_numWorkers) {
            pthread_cond_signal (&sched->_runWait);
        }
    pthread_mutex_unlock (&sched->_lock);

    return 0;

}

//! Run the Diderot program (parallel version)
//! \param max_nsteps the limit on the number of super steps; 0 means unlimited
//! \return the number of steps taken, or -1 on error.
uint32_t world::run (uint32_t max_nsteps)
{
    if (this->_stage < diderot::POST_CREATE) {
        biffMsgAdd (this->_errors, "attempt to run uninitialized program\n");
        return -1;
    }
    else if (this->_stage == diderot::DONE) {
        return 0;
    }
    else if (this->_stage == diderot::POST_CREATE) {
#ifdef DIDEROT_HAS_GLOBAL_START
        this->global_start();
#endif
        this->_stage = diderot::RUNNING;
    }
    assert (this->_stage == diderot::RUNNING);

    diderot::scheduler *sched = this->_sched;

    if (max_nsteps == 0) {
        max_nsteps = 0xffffffff;  // essentially unlimited
    }

    double t0 = airTime();

  // Start worker threads
    int nWorkers = sched->_numWorkers;
    worker_arg *args = new worker_arg[nWorkers];
    if (this->_verbose) {
        std::cerr << "run with " << this->_nstrands << " strands / "
            << nWorkers << " workers ..." << std::endl;
    }
    sched->_numWorkers = nWorkers;
    sched->_navail = this->_nstrands;
    sched->_nidle = 0;
    sched->_nsteps = 0;
    sched->_nextStrand = 0;
    for (int i = 0; i < nWorkers; i++) {
        pthread_t pid;
        args[i]._id = i;
        args[i]._max_nsteps = max_nsteps;
        args[i]._wrld = this;
        int sts = pthread_create (&pid, NULL, worker, (void *)&(args[i]));
        if (sts != 0) {
            biffMsgAddf (this->_errors, "unable to create worker thread; err = %d\n", sts);
            return -1;
        }
        pthread_detach (pid);
    }

  // wait for the computation to finish
    pthread_mutex_lock (&sched->_lock);
        while (sched->_nidle < nWorkers) {
            pthread_cond_wait (&sched->_runWait, &sched->_lock);
        }
    pthread_mutex_unlock (&sched->_lock);

    t0 = airTime() - t0;
    if (this->_verbose) {
        std::cerr << "done in " << t0 << " seconds" << std::endl;
    }
    this->_run_time += t0;

    delete[] args;

    return sched->_nsteps;

} // world::run

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0