00001 #ifndef TERMINATE_C
00002 #define TERMINATE_C
00003
00004 #include <stdlib.h>
00005 #include <pvm3.h>
00006 #include "Termination.h"
00007 #include "FilterData.h"
00008 #include "../Messages.h"
00009
00010
00011
00012 GlobalTermination *initGlobalTermination(Layout *layout, int numFilterInstances) {
00013 int i, j;
00014 int currentFilterInstance = 0;
00015
00016 #ifdef DEBUG
00017 printf("Begin initGlobalTermination\n");
00018 #endif
00019
00020 GlobalTermination *gt = (GlobalTermination *) malloc(sizeof(GlobalTermination));
00021 if(!gt) {
00022 printf("initGlobalTermination: could not allocate memory\n");
00023 }
00024
00025 gt->currentRoundTag = 0;
00026
00027 gt->numFilterInstances = numFilterInstances;
00028 gt->tfid = (TerminationFilterInstancesData *) malloc(numFilterInstances*sizeof(TerminationFilterInstancesData));
00029
00030
00031 for(i = 0; i < numFilterInstances; i++) {
00032 gt->tfid[i].terminate = NOTTERMINATED;
00033 }
00034
00035
00036 for(i = 0; i < layout->numFilters; i++) {
00037 int breakLoop = 0;
00038 int tagBreakLoop = -1;
00039 FilterSpec *pFilter = layout->filters[i];
00040
00041 for(j = 0; j < pFilter->numInputs; j++) {
00042 if(pFilter->inputs[j]->breakLoop) {
00043 breakLoop = 1;
00044 tagBreakLoop = pFilter->inputs[j]->tag;
00045 break;
00046 }
00047 }
00048 FilterPlacement pfilterPlacement = layout->filters[i]->filterPlacement;
00049 for(j = 0; j < pfilterPlacement.numInstances; j++, currentFilterInstance++) {
00050 gt->tfid[currentFilterInstance].filterInstanceTid = pfilterPlacement.tids[j];
00051 gt->tfid[currentFilterInstance].breakLoop = breakLoop;
00052 gt->tfid[currentFilterInstance].tagBreakLoop = tagBreakLoop;
00053 }
00054 }
00055
00056 #ifdef DEBUG
00057 printf("End initGlobalTermination\n");
00058 #endif
00059
00060 return gt;
00061 }
00062
00063 int findFilterInstId(filterInstTid) {
00064 int i;
00065
00066 for(i = 0; i < gt->numFilterInstances; i++) {
00067 if(gt->tfid[i].filterInstanceTid == filterInstTid)
00068 return i;
00069 }
00070
00071 return -1;
00072 }
00073
00074 int verifyGlobalTermination(int filterInstTid, int filterInstTag) {
00075 int filterInstId, i;
00076 int globalTermination = 1;
00077
00078 #ifdef DEBUG
00079 printf("Begin verifyGlobalTermination\n");
00080 #endif
00081
00082 filterInstId = findFilterInstId(filterInstTid);
00083 if(filterInstId == -1) {
00084 printf("verifyGlobalTermination: error finding tid: %d", filterInstTid);
00085 return -1;
00086 }
00087
00088 if(filterInstTag == gt->currentRoundTag) {
00089 gt->tfid[filterInstId].terminate = TERMINATED;
00090
00091 for(i = 0; i < gt->numFilterInstances; i++) {
00092 if(gt->tfid[i].terminate != TERMINATED) {
00093 globalTermination = 0;
00094 break;
00095 }
00096 }
00097 if(globalTermination) {
00098
00099 #ifdef DEBUG
00100 printf("verifyGlobalTermination: all filterInstances terminated\n");
00101 #endif
00102 for(i = 0; i < gt->numFilterInstances; i++) {
00103 if(gt->tfid[i].breakLoop) {
00104 int msgType = MSGT_EOW;
00105 int taskId = -1;
00106 pvm_initsend(PvmDataRaw);
00107 pvm_pkint(&msgType, 1, 1);
00108 pvm_pkint(&taskId, 1, 1);
00109 pvm_send(gt->tfid[i].filterInstanceTid, gt->tfid[i].tagBreakLoop);
00110 }
00111 }
00112 }
00113 }else if(filterInstTag > gt->currentRoundTag) {
00114 gt->currentRoundTag = filterInstTag;
00115 for(i = 0; i < gt->numFilterInstances; i++) {
00116 gt->tfid[i].terminate = NOTTERMINATED;
00117 }
00118 gt->tfid[filterInstId].terminate = TERMINATED;
00119 #ifdef DEBUG
00120 printf("verifyGlobalTermination: filter instance %d terminated\n", filterInstId);
00121 #endif
00122 }
00123
00124 #ifdef DEBUG
00125 printf("End verifyGlobalTermination\n");
00126 #endif
00127
00128 return -1;
00129 }
00130
00131
00132
00133
00134
00135
00136 TerminationDetection *storeNeighborTids(TerminationDetection *tdd, int numNeighbors, int *neighborTids, int portTag) {
00137 int i, j;
00138
00139 for(i = 0; i < numNeighbors; i++) {
00140 for(j = 0; j < tdd->numNeighbors; j++) {
00141 if(tdd->nd[i].neighborTid == neighborTids[i])
00142 break;
00143 }
00144
00145 if(j == tdd->numNeighbors) {
00146 tdd->nd[tdd->numNeighbors].neighborTid = neighborTids[i];
00147 tdd->nd[tdd->numNeighbors].neighborPortTag = 3;
00148 tdd->nd[tdd->numNeighbors].neighborTerm = NOTTERMINATED;
00149 tdd->nd[tdd->numNeighbors].neighborStreamId = tdd->numNeighbors;
00150 tdd->numNeighbors++;
00151 }
00152 }
00153
00154 return tdd;
00155 }
00156
00157 TerminationDetection *initTerminationDetection() {
00158 int i;
00159 int numPorts = fd->numInputPorts + fd->numOutputPorts;
00160
00161 #ifdef DEBUG
00162 printf("Begin initTerminationDetection\n");
00163 #endif
00164
00165 TerminationDetection *tdd = (TerminationDetection *) malloc(sizeof(TerminationDetection));
00166 if(!tdd) {
00167 printf("initTerminationDetection: could not allocate memory\n");
00168 return NULL;
00169 }
00170
00171 tdd->localTag = 0;
00172 tdd->status = NOTPARTICIPATING;
00173 tdd->numNeighbors = 0;
00174
00175
00176 tdd->nd = (NeighborData *) malloc(numPorts*MAXINSTANCES*sizeof(NeighborData));
00177 for(i = 0; i < fd->numInputPorts; i++) {
00178
00179 tdd = storeNeighborTids(tdd, fd->inputPorts[i]->numSources, fd->inputPorts[i]->tidsSources, fd->inputPorts[i]->tag);
00180 }
00181 for(i = 0; i <fd->numOutputPorts; i++) {
00182
00183 tdd = storeNeighborTids(tdd, fd->outputPorts[i]->numDestinations, fd->outputPorts[i]->tidsDestinations, fd->outputPorts[i]->tag);
00184 }
00185
00186 tdd->numStreams = numPorts;
00187 tdd->stream = (int *) malloc((tdd->numStreams)*sizeof(int));
00188
00189 for(i = 0; i < tdd->numStreams; i++) {
00190 tdd->stream[i] = FULL;
00191 }
00192
00193 #ifdef DEBUG
00194 printf("End initTerminationDetection\n");
00195 #endif
00196
00197 return tdd;
00198 }
00199
00200 void sendTerminationToNeighbors() {
00201 int i;
00202 int type = MSGT_INITTERM;
00203
00204 for(i = 0; i < tdd->numNeighbors; i++) {
00205
00206 pvm_initsend(PvmDataRaw);
00207 pvm_pkint(&type, 1, 1);
00208 pvm_pkint(&tdd->localTag, 1, 1);
00209 pvm_send(tdd->nd[i].neighborTid, tdd->nd[i].neighborPortTag);
00210 }
00211
00212 }
00213
00214 void sendTerminationToLeader() {
00215
00216
00217 pvm_initsend(PvmDataRaw);
00218 pvm_pkint(&tdd->localTag, 1, 1);
00219 pvm_send(pvm_parent(), MSGT_LOCALTERM);
00220 }
00221
00222
00223
00224 int beginTerminationDetection() {
00225
00226 #ifdef DEBUG
00227 printf("Begin Termination Detection\n");
00228 #endif
00229
00230 tdd->localTag++;
00231
00232 sendTerminationToNeighbors();
00233
00234 tdd->status = PARTICIPATING;
00235
00236 return 1;
00237 }
00238
00239 int findNeighborId(int neighborTid) {
00240 int i;
00241
00242 for(i = 0; i < tdd->numNeighbors; i++) {
00243 if(tdd->nd[i].neighborTid == neighborTid) {
00244 return tdd->nd[i].neighborStreamId;
00245 }
00246 }
00247
00248 return -1;
00249 }
00250
00251
00252 int terminationDetectionRound(int neighborTid, int neighborTag) {
00253 int neighborId, i;
00254 int ITerminated = 1;
00255
00256 #ifdef DEBUG
00257 printf("Begin terminationDetectionRound\n");
00258 #endif
00259
00260
00261 neighborId = findNeighborId(neighborTid);
00262 if(neighborId == -1) {
00263 printf("terminationDetectionRound: error finding tid: %d", neighborTid);
00264 return -1;
00265 }
00266
00267 tdd->stream[neighborId] = EMPTY;
00268
00269 if(neighborTag > tdd->localTag) {
00270
00271 tdd->localTag = neighborTag;
00272 for(i = 0; i < tdd->numNeighbors; i++) {
00273 tdd->nd[i].neighborTerm = NOTTERMINATED;
00274 }
00275 tdd->nd[neighborId].neighborTerm = TERMINATED;
00276 tdd->status = PARTICIPATING;
00277 sendTerminationToNeighbors();
00278 #ifdef DEBUG
00279 printf("terminationDetectionRound: I was not participating\n");
00280 #endif
00281 }
00282
00283 if(neighborTag == tdd->localTag) {
00284
00285 tdd->nd[neighborId].neighborTerm = TERMINATED;
00286
00287 #ifdef DEBUG
00288 printf("terminationDetectionRound: I was participating\n");
00289 #endif
00290
00291 for(i = 0; i < tdd->numNeighbors; i++) {
00292 if(tdd->nd[i].neighborTerm == NOTTERMINATED || tdd->stream[i] == FULL) {
00293 ITerminated = 0;
00294 break;
00295 }
00296 }
00297 if(ITerminated) {
00298 #ifdef DEBUG
00299 printf("terminationDetectionRound: I terminated\n");
00300 #endif
00301 sendTerminationToLeader();
00302 }
00303 }
00304
00305 #ifdef DEBUG
00306 printf("End terminationDetectionRound\n");
00307 #endif
00308
00309 return 1;
00310 }
00311
00312 int updateTermStreamToEmpty(int neighborTid) {
00313 int neighborId;
00314
00315 #ifdef DEBUG
00316 printf("updateTermStreamToEmpty: I am not participating anymore\n");
00317 #endif
00318
00319
00320 neighborId = findNeighborId(neighborTid);
00321
00322 if((tdd->status == PARTICIPATING) ) {
00323 tdd->stream[neighborId] = FULL;
00324 tdd->status = NOTPARTICIPATING;
00325 }
00326
00327 return 1;
00328 }
00329
00330
00331
00332
00333 #endif