Home My Page Projects Code Snippets Project Openings diderot
Summary Activity Tracker Tasks SCM

SCM Repository

[diderot] Annotation of /branches/vis12/src/compiler/c-target/fragments/par-run.in
ViewVC logotype

Annotation of /branches/vis12/src/compiler/c-target/fragments/par-run.in

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1828 - (view) (download)

1 : jhr 1826 /* Function which processes active strands. */
2 :     static void *@PREFIX@Worker (void *arg)
3 :     {
4 :     WorkerArg_t *myArg = (WorkerArg_t *)arg;
5 :     @PREFIX@World_t *wrld = myArg->wrld;
6 :     @PREFIX@Globals_t *glob = wrld->globals;
7 :     Diderot_Sched_t *sched = wrld->sched;
8 :     Strand_t *strand = &(wrld->strandDesc[0]);
9 :    
10 : jhr 1828 int nStrandsPerWorker = wrld->numStrands / sched->numWorkers;
11 : jhr 1826 int start = myArg->id * nStrandsPerWorker;
12 :     int limit;
13 :     if (sched->numWorkers-1 == myArg->id)
14 :     limit = wrld->numStrands;
15 :     else
16 :     limit = start + nStrandsPerWorker;
17 :    
18 : jhr 1828 while ((sched->numActive) && (sched->numSteps <= myArg->maxNSteps)) {
19 : jhr 1826 // barrier synchronization at start of super step
20 :     pthread_mutex_lock (&sched->lock);
21 :     if (sched->numIdle+1 < sched->numWorkers) {
22 :     sched->numIdle++;
23 :     pthread_cond_wait (&sched->barrier, &sched->lock);
24 :     }
25 :     else {
26 :     // all other workers are idle, so we can proceed after some initialization
27 :     sched->numIdle = 0;
28 : jhr 1828 sched->numAvail = wrld->numStrands; // includes inactive strands
29 : jhr 1826 sched->nextStrand = 0;
30 :     // swap in and out
31 : jhr 1828 @STRANDTY@ **tmp = wrld->inState;
32 : jhr 1827 wrld->inState = wrld->outState;
33 :     wrld->outState = tmp;
34 : jhr 1826 pthread_cond_broadcast (&sched->barrier);
35 :     }
36 :     pthread_mutex_unlock (&sched->lock);
37 :    
38 :     // if there are no active strands left, then we're done
39 :     if (sched->numActive == 0) {
40 : jhr 1828 pthread_cond_signal (&sched->runWait);
41 : jhr 1826 pthread_exit (0);
42 :     }
43 :    
44 :     // iterate until there is no more work to do
45 :     int blkStart, blkSize;
46 :     int numDead = 0;
47 :     do {
48 :     // grab some work
49 :     pthread_mutex_lock (&sched->lock);
50 :     blkStart = sched->nextStrand;
51 :     blkSize = (sched->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : sched->numAvail;
52 :     sched->numAvail -= blkSize;
53 :     sched->nextStrand += blkSize;
54 :     pthread_mutex_unlock (&sched->lock);
55 :     // update the strands
56 :     for (int i = blkStart; i < blkStart+blkSize; i++) {
57 :     if (! wrld->status[i]) {
58 :     // StrandStatus_t sts = strand->update(glob, wrld->inState[i], wrld->outState[i]);
59 :     StrandStatus_t sts = @STRAND@_Update(glob, wrld->inState[i], wrld->outState[i]);
60 :     switch (sts) {
61 :     case DIDEROT_STABILIZE:
62 :     wrld->status[i] = DIDEROT_STABILIZE;
63 :     break;
64 :     case DIDEROT_DIE:
65 :     wrld->status[i] = DIDEROT_DIE;
66 :     numDead++;
67 :     break;
68 :     default:
69 :     break;
70 :     }
71 :     }
72 :     else {
73 :     assert ((wrld->status[i] == DIDEROT_STABLE) || (wrld->status[i] == DIDEROT_DIE));
74 :     }
75 :     }
76 :     } while (blkSize > 0);
77 :    
78 :     // barrier synchronization
79 :     pthread_mutex_lock (&sched->lock);
80 :     sched->numActive -= numDead;
81 :     if (sched->numIdle+1 < sched->numWorkers) {
82 :     sched->numIdle++;
83 :     pthread_cond_wait (&sched->barrier, &sched->lock);
84 :     }
85 :     else {
86 :     // all other workers are idle, so we can proceed
87 :     sched->numIdle = 0;
88 :     pthread_cond_broadcast (&sched->barrier);
89 : jhr 1828 sched->numSteps++;
90 : jhr 1826 }
91 :     pthread_mutex_unlock (&sched->lock);
92 :    
93 :     /**** If there is a global computation phase, it goes here ****/
94 :    
95 :     // stabilize any threads that need stabilization. Each worker is responsible for
96 :     // a contiguous region of the strands
97 :     // FIXME: once we switch to dynamic lists of strand blocks, then we can use finer-grain tracking
98 :     int numStabilized = 0;
99 :     for (int i = start; i < limit; i++) {
100 :     if (wrld->status[i] == DIDEROT_STABILIZE) {
101 :     // stabilize the strand's state. Note that the outState has been set by
102 :     // the last call to update, so we make the inState be the target of the
103 :     // stabilize method.
104 : jhr 1828 // strand->stabilize(glob, wrld->outState[i], wrld->inState[i]);
105 :     @STRAND@_Stabilize(glob, wrld->outState[i], wrld->inState[i]);
106 : jhr 1826 memcpy (wrld->outState[i], wrld->inState[i], strand->stateSzb);
107 :     wrld->status[i] = DIDEROT_STABLE;
108 :     numStabilized++;
109 :     }
110 :     }
111 :     // adjust the numActive count
112 :     #if defined(HAVE_BUILTIN_ATOMIC_OPS)
113 :     __sync_fetch_and_sub(&sched->numActive, numStabilized);
114 :     #else
115 :     pthread_mutex_lock (&sched->lock);
116 :     sched->numActive -= numStabilized;
117 :     pthread_mutex_unlock (&sched->lock);
118 :     #endif
119 : jhr 1828 } // end while
120 : jhr 1826
121 : jhr 1828 return 0;
122 :    
123 : jhr 1826 }
124 :    
125 :     //! Run the Diderot program (parallel version)
126 :     //! \param wrld the world-state of the Diderot program
127 :     //! \param maxNSteps the limit on the number of super steps; 0 means unlimited
128 :     //! \return the number of steps taken.
129 :     uint32_t @PREFIX@Run (@PREFIX@World_t *wrld, uint32_t maxNSteps)
130 :     {
131 :     Diderot_Sched_t *sched = wrld->sched;
132 :    
133 :     // Start worker threads
134 : jhr 1828 int nWorkers = sched->numWorkers;
135 : jhr 1826 WorkerArg_t *args = NEWVEC(WorkerArg_t, nWorkers);
136 :     if (wrld->verboseFlg) fprintf (stderr, "initializing %d workers ...\n", nWorkers);
137 :     double t0 = airTime();
138 :     sched->numWorkers = nWorkers;
139 :     sched->numIdle = 0;
140 : jhr 1828 sched->numSteps = 0;
141 : jhr 1826 for (int i = 0; i < nWorkers; i++) {
142 :     pthread_t pid;
143 :     args[i].wrld = wrld;
144 :     args[i].id = i;
145 : jhr 1828 if (pthread_create (&pid, NULL, @PREFIX@Worker, (void *)&(args[i])) != 0) {
146 : jhr 1826 fprintf (stderr, "unable to create worker thread\n");
147 :     exit (1);
148 :     }
149 :     pthread_detach (pid);
150 :     }
151 :    
152 :     // wait for the computation to finish
153 :     pthread_mutex_lock (&sched->lock);
154 : jhr 1828 pthread_cond_wait (&sched->runWait, &sched->lock);
155 : jhr 1826 pthread_mutex_unlock (&sched->lock);
156 :    
157 :     FREE(args);
158 :    
159 : jhr 1828 return sched->numSteps;
160 : jhr 1826 }
161 :    

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0