28 |
#endif |
#endif |
29 |
|
|
30 |
// The number of strands a worker will take for processing at one time |
// The number of strands a worker will take for processing at one time |
31 |
#define BLOCK_SIZE 32 |
#define BLOCK_SIZE 256 |
32 |
|
|
33 |
struct struct_world { |
struct struct_world { |
34 |
bool isArray; // is the initialization an array or collection? |
bool isArray; // is the initialization an array or collection? |
36 |
int32_t *base; // nDims array of base indices |
int32_t *base; // nDims array of base indices |
37 |
uint32_t *size; // nDims array of iteration sizes |
uint32_t *size; // nDims array of iteration sizes |
38 |
uint32_t numStrands; // number of strands in the world |
uint32_t numStrands; // number of strands in the world |
|
uint32_t numActive; // number of active strands in the world |
|
39 |
void **inState; |
void **inState; |
40 |
void **outState; |
void **outState; |
41 |
uint8_t *status; // array of strand status flags |
uint8_t *status; // array of strand status flags |
42 |
uint32_t numThreads; // number of worker threads |
uint32_t numWorkers; // number of worker threads |
43 |
|
uint32_t nSteps; // number of super steps |
44 |
|
// synchronization state |
45 |
|
uint32_t nextStrand __attribute__((aligned(64))); // index of next strand to update |
46 |
|
uint32_t numActive __attribute__((aligned(64))); // # active strands |
47 |
|
uint32_t numAvail __attribute__((aligned(64))); // # unevaluated strands |
48 |
|
uint32_t numIdle __attribute__((aligned(64))); // # idle workers |
49 |
pthread_mutex_t lock; // big lock to protect wrld from multiple accesses |
pthread_mutex_t lock; // big lock to protect wrld from multiple accesses |
50 |
pthread_cond_t workers; // workers wait on this when they have no work |
pthread_cond_t barrier; // workers wait on this when they have no work |
51 |
uint8_t nWorkers; // number of workers waiting for work to do |
pthread_cond_t mainWait; // used to signal main when the workers have finished |
|
pthread_cond_t main; // used to signal main when the workers have finished |
|
|
sem_t stabilizing;// number of strands stabilizing this step |
|
|
uint32_t nextStrand; // the next strand to be processed this iteration |
|
52 |
}; |
}; |
53 |
|
|
54 |
extern float getOutf (void *self); |
typedef struct { |
55 |
|
int id; |
56 |
|
Diderot_World_t *wrld; |
57 |
|
} WorkerArg_t; |
58 |
|
|
59 |
/* Function which processes active strands. */ |
/* Function which processes active strands. */ |
60 |
void *worker_func (void *arg) |
static void *Worker (void *arg) |
61 |
{ |
{ |
62 |
Diderot_World_t *wrld = (Diderot_World_t *) arg; |
WorkerArg_t *myArg = (WorkerArg_t *)arg; |
63 |
|
Diderot_World_t *wrld = myArg->wrld; |
64 |
pthread_detach(pthread_self()); |
bool existsStabilizing; |
65 |
|
|
66 |
while (true) { |
while (true) { |
67 |
|
// barrier synchronization at start of super step |
68 |
pthread_mutex_lock(&wrld->lock); |
pthread_mutex_lock(&wrld->lock); |
69 |
|
if (wrld->numIdle+1 < wrld->numWorkers) { |
70 |
// If there is no more work to do this iteration, we wait |
wrld->numIdle++; |
71 |
while(wrld->nextStrand == wrld->numStrands) { |
pthread_cond_wait (&wrld->barrier, &wrld->lock); |
72 |
wrld->nWorkers++; |
} |
73 |
if (wrld->nWorkers == wrld->numThreads) { |
else { |
74 |
pthread_cond_signal (&wrld->main); |
// all other workers are idle, so we can proceed after some initialization |
75 |
} |
wrld->numIdle = 0; |
76 |
pthread_cond_wait (&wrld->workers, &wrld->lock); |
wrld->numAvail = wrld->numActive; |
77 |
int nActive = 0; |
wrld->nextStrand = 0; |
78 |
sem_getvalue(&wrld->numActive, &nActive); |
// swap in and out |
79 |
if (nActive == 0) { |
void **tmp = wrld->inState; |
80 |
pthread_mutex_unlock (&wrld->lock); |
wrld->inState = wrld->outState; |
81 |
wrld->numThreads--; |
wrld->outState = tmp; |
82 |
if (wrld->nWorkers == wrld->numThreads) { |
pthread_cond_broadcast (&wrld->barrier); |
|
pthread_cond_signal (&wrld->main); |
|
83 |
} |
} |
84 |
|
pthread_mutex_unlock (&wrld->lock); |
85 |
|
|
86 |
|
// if there are no active strands left, then we're done |
87 |
|
if (wrld->numActive == 0) { |
88 |
|
pthread_cond_signal (&wrld->mainWait); |
89 |
pthread_exit(0); |
pthread_exit(0); |
90 |
} |
} |
|
} |
|
91 |
|
|
92 |
// Get BLOCK_SIZE active strands to process |
// iterate until there is no more work to do |
93 |
uint32_t to_process = wrld->nextStrand; |
int blkStart, blkSize; |
94 |
uint32_t end = to_process; |
existsStabilizing = false; |
95 |
uint32_t num = 0; |
do { |
96 |
while (end != wrld->numStrands && num < BLOCK_SIZE) { |
// grab some work |
97 |
if(! wrld->status[end]) num++; |
pthread_mutex_lock (&wrld->lock); |
98 |
end++; |
if (wrld->numAvail > 0) { |
99 |
|
blkStart = wrld->nextStrand; |
100 |
|
blkSize = (wrld->numAvail >= BLOCK_SIZE) ? BLOCK_SIZE : wrld->numAvail; |
101 |
|
wrld->numAvail -= blkSize; |
102 |
} |
} |
|
wrld->nextStrand = end; |
|
|
|
|
103 |
pthread_mutex_unlock(&wrld->lock); |
pthread_mutex_unlock(&wrld->lock); |
104 |
|
// update the strands |
105 |
// Update the strands in this block |
for (int i = blkStart; i < blkStart+blkSize; i++) { |
|
for (int i = to_process; i < end; i++) { |
|
106 |
if (! wrld->status[i]) { |
if (! wrld->status[i]) { |
107 |
StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); |
StrandStatus_t sts = Diderot_Strands[0]->update(wrld->inState[i], wrld->outState[i]); |
108 |
switch (sts) { |
switch (sts) { |
109 |
case DIDEROT_STABILIZE: |
case DIDEROT_STABILIZE: |
110 |
wrld->status[i] = DIDEROT_STABILIZE; |
wrld->status[i] = DIDEROT_STABILIZE; |
111 |
sem_post(&wrld->stabilizing); |
existsStabilizing = true; |
|
sem_wait(&wrld->numActive); |
|
112 |
break; |
break; |
113 |
case DIDEROT_DIE: |
case DIDEROT_DIE: |
114 |
wrld->status[i] = DIDEROT_DIE; |
wrld->status[i] = DIDEROT_DIE; |
115 |
sem_wait(&wrld->numActive); |
AtomicDec(&wrld->numActive); |
116 |
break; |
break; |
117 |
default: |
default: |
118 |
break; |
break; |
119 |
} |
} |
120 |
} |
} |
121 |
} |
} |
122 |
|
} while (blkSize > 0); |
123 |
|
|
124 |
|
// barrier synchronization |
125 |
|
pthread_mutex_lock (&wrld->lock); |
126 |
|
if (wrld->numIdle+1 < wrld->numWorkers) { |
127 |
|
wrld->numIdle++; |
128 |
|
pthread_cond_wait (&wrld->barrier, &wrld->lock); |
129 |
|
} |
130 |
|
else { |
131 |
|
// all other workers are idle, so we can proceed |
132 |
|
wrld->numIdle = 0; |
133 |
|
wrld->numAvail = wrld->numActive; |
134 |
|
pthread_cond_broadcast (&wrld->barrier); |
135 |
|
} |
136 |
|
pthread_mutex_unlock (&wrld->lock); |
137 |
|
|
138 |
|
/**** If there is a global computation phase, it goes here ****/ |
139 |
|
|
140 |
|
// stabilize any threads that need stabilization. Each worker is responsible for |
141 |
|
// a contiguous region of the strands |
142 |
|
if (existsStabilizing) { |
143 |
|
int nStrandsPerWorker = wrld->numStrands / wrld->numWorkers; |
144 |
|
int start = myArg->id * nStrandsPerWorker; |
145 |
|
int limit = start + nStrandsPerWorker; |
146 |
|
if (limit > wrld->numStrands) limit = wrld->numStrands; |
147 |
|
for (int i = start; i < limit; i++) { |
148 |
|
if (wrld->status[i] == DIDEROT_STABILIZE) { |
149 |
|
// copy out to in so that both copies are the stable state |
150 |
|
memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb); |
151 |
|
wrld->status[i] = DIDEROT_STABLE; |
152 |
|
AtomicDec (&wrld->numActive); |
153 |
|
} |
154 |
|
} |
155 |
} |
} |
156 |
} |
} |
157 |
|
|
158 |
|
} |
159 |
|
|
160 |
|
|
161 |
int main (int argc, const char **argv) |
int main (int argc, const char **argv) |
162 |
{ |
{ |
163 |
printf ("initializing globals ...\n"); |
fprintf (stderr, "initializing globals ...\n"); |
164 |
Diderot_InitGlobals (); |
Diderot_InitGlobals (argc, argv); |
165 |
|
|
166 |
printf ("initializing strands ...\n"); |
fprintf (stderr, "initializing strands ...\n"); |
167 |
Diderot_World_t *wrld = Diderot_Initially (); |
Diderot_World_t *wrld = Diderot_Initially (); |
|
|
|
168 |
for (int i = 0; i < wrld->numStrands; i++) { |
for (int i = 0; i < wrld->numStrands; i++) { |
169 |
// hack to make the invariant part of the state the same in both copies |
// hack to make the invariant part of the state the same in both copies |
170 |
memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); |
memcpy (wrld->outState[i], wrld->inState[i], Diderot_Strands[0]->stateSzb); |
175 |
fprintf(stderr, "unable to get number of processors\n"); |
fprintf(stderr, "unable to get number of processors\n"); |
176 |
exit (1); |
exit (1); |
177 |
} |
} |
178 |
|
|
179 |
// Start worker threads |
// Start worker threads |
180 |
int nWorkers = cpuInfo.numHWCores; |
int nWorkers = cpuInfo.numHWCores; |
181 |
pthread_t *workers = (pthread_t *) malloc (nWorkers * sizeof(pthread_t)); |
WorkerArg_t *args = (WorkerArg_t *) malloc (nWorkers * sizeof(WorkerArg_t)); |
|
|
|
182 |
printf ("initializing %d workers ...\n", nWorkers); |
printf ("initializing %d workers ...\n", nWorkers); |
183 |
wrld->numThreads = nWorkers; |
wrld->numWorkers = nWorkers; |
184 |
|
wrld->numIdle = 0; |
185 |
for (int i = 0; i < nWorkers; i++) { |
for (int i = 0; i < nWorkers; i++) { |
186 |
pthread_create (&workers[i], NULL, worker_func, wrld); |
pthread_t pid; |
187 |
} |
args[i].wrld = wrld; |
188 |
|
args[i].id = i; |
189 |
int nSteps = 0; |
if (pthread_create (&pid, NULL, Worker, (void *)&(args[i])) != 0) { |
190 |
int nActive = 0; |
fprintf (stderr, "unable to create worker thread\n"); |
191 |
sem_getvalue (&wrld->numActive, &nActive); |
exit (1); |
|
while (nActive > 0) { |
|
|
nSteps++; |
|
|
|
|
|
pthread_mutex_lock(&wrld->lock); |
|
|
|
|
|
// Wait until all workers have finished this iteration |
|
|
while (wrld->nWorkers != wrld->numThreads) { |
|
|
pthread_cond_wait(&wrld->main, &wrld->lock); |
|
|
} |
|
|
wrld->nWorkers = 0; |
|
|
|
|
|
// Handle stabilizing strands |
|
|
int numStabilizing = 0; |
|
|
sem_getvalue (&wrld->stabilizing, &numStabilizing); |
|
|
for (int i = 0; i < wrld->numStrands && numStabilizing > 0; i++) { |
|
|
if (wrld->status[i] == DIDEROT_STABILIZE) { |
|
|
sem_wait(&wrld->stabilizing); |
|
|
memcpy (wrld->inState[i], wrld->outState[i], Diderot_Strands[0]->stateSzb); |
|
|
wrld->status[i] = DIDEROT_STABLE; |
|
|
numStabilizing--; |
|
192 |
} |
} |
193 |
|
pthread_detach (pid); |
194 |
} |
} |
195 |
|
|
196 |
sem_getvalue(&wrld->numActive, &nActive); |
// wait for the computation to finish |
197 |
|
pthread_mutex_lock (&wrld->lock); |
198 |
// Do any global update stuff that needs doing |
pthread_cond_wait (&wrld->mainWait, &wrld->lock); |
|
void **tmp = wrld->inState; |
|
|
wrld->inState = wrld->outState; |
|
|
wrld->outState = tmp; |
|
|
|
|
|
wrld->nextStrand = 0; |
|
|
|
|
|
// Wake up all the workers to process the next step |
|
|
pthread_cond_broadcast(&wrld->workers); |
|
199 |
pthread_mutex_unlock(&wrld->lock); |
pthread_mutex_unlock(&wrld->lock); |
|
} |
|
200 |
|
|
201 |
printf("done: %d steps\n", nSteps); |
fprintf (stderr, "done: %d steps\n", wrld->nSteps); |
202 |
// here we have the final state of all of the strands in the "in" buffer |
// here we have the final state of all of the strands in the "in" buffer |
203 |
FILE *outS = fopen("mip.txt", "w"); |
FILE *outS = fopen("mip.txt", "w"); |
204 |
if (outS == NULL) { |
if (outS == NULL) { |
212 |
} |
} |
213 |
fclose (outS); |
fclose (outS); |
214 |
|
|
215 |
|
Diderot_Shutdown (wrld); |
216 |
|
|
217 |
return 0; |
return 0; |
218 |
|
|
219 |
} |
} |
250 |
wrld->size[i] = size[i]; |
wrld->size[i] = size[i]; |
251 |
} |
} |
252 |
|
|
253 |
printf("AllocInitially: %d", size[0]); |
fprintf(stderr, "AllocInitially: %d", size[0]); |
254 |
for (int i = 1; i < nDims; i++) printf(" x %d", size[i]); |
for (int i = 1; i < nDims; i++) |
255 |
printf("\n"); |
fprintf(stderr, " x %d", size[i]); |
256 |
|
fprintf(stderr, "\n"); |
257 |
|
|
258 |
// allocate the strand state pointers |
// allocate the strand state pointers |
259 |
wrld->numStrands = numStrands; |
wrld->numStrands = numStrands; |
|
sem_init (&wrld->numActive, 0, numStrands); |
|
260 |
wrld->inState = (void **) malloc (numStrands * sizeof(void *)); |
wrld->inState = (void **) malloc (numStrands * sizeof(void *)); |
261 |
wrld->outState = (void **) malloc (numStrands * sizeof(void *)); |
wrld->outState = (void **) malloc (numStrands * sizeof(void *)); |
262 |
wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t)); |
wrld->status = (uint8_t *) malloc (numStrands * sizeof(uint8_t)); |
264 |
fprintf (stderr, "unable to allocate strand states\n"); |
fprintf (stderr, "unable to allocate strand states\n"); |
265 |
exit (1); |
exit (1); |
266 |
} |
} |
267 |
|
wrld->numActive = wrld->numStrands; |
268 |
|
wrld->nSteps = 0; |
269 |
|
wrld->numWorkers = 0; |
270 |
|
|
271 |
// initialize strand state pointers etc. |
// initialize strand state pointers etc. |
272 |
for (size_t i = 0; i < numStrands; i++) { |
for (size_t i = 0; i < numStrands; i++) { |
275 |
wrld->status[i] = DIDEROT_ACTIVE; |
wrld->status[i] = DIDEROT_ACTIVE; |
276 |
} |
} |
277 |
|
|
|
wrld->numThreads = 0; |
|
278 |
pthread_mutex_init(&wrld->lock, NULL); |
pthread_mutex_init(&wrld->lock, NULL); |
279 |
pthread_cond_init(&wrld->workers, NULL); |
pthread_cond_init (&wrld->barrier, NULL); |
280 |
pthread_cond_init(&wrld->main, NULL); |
pthread_cond_init (&wrld->mainWait, NULL); |
|
wrld->nWorkers = 0; |
|
|
sem_init(&wrld->stabilizing, 0, 0); |
|
|
wrld->nextStrand = 0; |
|
281 |
|
|
282 |
return wrld; |
return wrld; |
283 |
|
|