Termination.c

Go to the documentation of this file.
00001 #ifndef TERMINATE_C
00002 #define TERMINATE_C
00003 
00004 #include <stdlib.h>
00005 #include <pvm3.h>
00006 #include "Termination.h"
00007 #include "FilterData.h"
00008 #include "../Messages.h"
00009 
00010 /**** Beging leader functions ****/
00011 
00012 GlobalTermination *initGlobalTermination(Layout *layout, int numFilterInstances) {
00013         int i, j;
00014         int currentFilterInstance = 0;
00015 
00016 #ifdef DEBUG
00017         printf("Begin initGlobalTermination\n");
00018 #endif
00019 
00020         GlobalTermination *gt = (GlobalTermination *) malloc(sizeof(GlobalTermination));
00021         if(!gt) {
00022                 printf("initGlobalTermination: could not allocate memory\n");
00023         }
00024 
00025         gt->currentRoundTag = 0;
00026         
00027         gt->numFilterInstances = numFilterInstances;
00028         gt->tfid = (TerminationFilterInstancesData *) malloc(numFilterInstances*sizeof(TerminationFilterInstancesData));
00029         
00030         // Init filter instances termination status
00031         for(i = 0; i < numFilterInstances; i++) {
00032                 gt->tfid[i].terminate = NOTTERMINATED;
00033         }
00034         
00035         // Init filter instances tids and 
00036         for(i = 0; i < layout->numFilters; i++) {
00037                 int breakLoop = 0;
00038                 int tagBreakLoop = -1;
00039                 FilterSpec *pFilter = layout->filters[i];
00040                 // Verify if the filter has a stream begining a loop
00041                 for(j = 0; j < pFilter->numInputs; j++) {
00042                         if(pFilter->inputs[j]->breakLoop) {
00043                                 breakLoop = 1;
00044                                 tagBreakLoop = pFilter->inputs[j]->tag;
00045                                 break;
00046                         }
00047                 }
00048                 FilterPlacement pfilterPlacement = layout->filters[i]->filterPlacement;
00049                 for(j = 0; j < pfilterPlacement.numInstances; j++, currentFilterInstance++) {
00050                         gt->tfid[currentFilterInstance].filterInstanceTid = pfilterPlacement.tids[j];
00051                         gt->tfid[currentFilterInstance].breakLoop = breakLoop;
00052                         gt->tfid[currentFilterInstance].tagBreakLoop = tagBreakLoop;
00053                 }
00054         }
00055 
00056 #ifdef DEBUG
00057         printf("End initGlobalTermination\n");
00058 #endif
00059         
00060         return gt;
00061 }
00062 
00063 int findFilterInstId(filterInstTid) {
00064         int i;
00065 
00066         for(i = 0; i < gt->numFilterInstances; i++) {
00067                 if(gt->tfid[i].filterInstanceTid == filterInstTid)
00068                         return i;
00069         }
00070 
00071         return -1;
00072 }
00073 
00074 int verifyGlobalTermination(int filterInstTid, int filterInstTag) {
00075         int filterInstId, i;
00076         int globalTermination = 1;
00077 
00078 #ifdef DEBUG
00079         printf("Begin verifyGlobalTermination\n");
00080 #endif
00081 
00082         filterInstId = findFilterInstId(filterInstTid);
00083         if(filterInstId == -1) {
00084                 printf("verifyGlobalTermination: error finding tid: %d", filterInstTid);
00085                 return -1;
00086         }
00087         
00088         if(filterInstTag == gt->currentRoundTag) {
00089                 gt->tfid[filterInstId].terminate = TERMINATED;
00090                 // Verify if all filterInstances terminated
00091                 for(i = 0; i < gt->numFilterInstances; i++) {
00092                         if(gt->tfid[i].terminate != TERMINATED) {
00093                                 globalTermination = 0;
00094                                 break;
00095                         }
00096                 }
00097                 if(globalTermination) {
00098                         // Send EOW to specified filter instances
00099 #ifdef DEBUG
00100                         printf("verifyGlobalTermination: all filterInstances terminated\n");
00101 #endif
00102                         for(i = 0; i < gt->numFilterInstances; i++) {
00103                                 if(gt->tfid[i].breakLoop) {
00104                                         int msgType = MSGT_EOW;
00105                                         int taskId = -1;
00106                                         pvm_initsend(PvmDataRaw);
00107                                         pvm_pkint(&msgType, 1, 1);
00108                                         pvm_pkint(&taskId, 1, 1);
00109                                         pvm_send(gt->tfid[i].filterInstanceTid, gt->tfid[i].tagBreakLoop);
00110                                 }
00111                         }
00112                 }
00113         }else if(filterInstTag > gt->currentRoundTag) {
00114                 gt->currentRoundTag = filterInstTag;
00115                 for(i = 0; i < gt->numFilterInstances; i++) {
00116                         gt->tfid[i].terminate = NOTTERMINATED;
00117                 }
00118                 gt->tfid[filterInstId].terminate = TERMINATED;
00119 #ifdef DEBUG
00120                 printf("verifyGlobalTermination: filter instance %d terminated\n", filterInstId);
00121 #endif
00122         }
00123 
00124 #ifdef DEBUG
00125         printf("End verifyGlobalTermination\n");
00126 #endif
00127         
00128         return -1;
00129 }
00130 
00131 /**** End leader functions ****/
00132 
00133 
00134 /**** Begin filter functions ****/
00135 
00136 TerminationDetection *storeNeighborTids(TerminationDetection *tdd, int numNeighbors, int *neighborTids, int portTag) {
00137         int i, j;
00138         
00139         for(i = 0; i < numNeighbors; i++) {
00140                 for(j = 0; j < tdd->numNeighbors; j++) {
00141                         if(tdd->nd[i].neighborTid == neighborTids[i])
00142                                 break;
00143                 }
00144                 // Diferent tid, store it.
00145                 if(j == tdd->numNeighbors) {
00146                         tdd->nd[tdd->numNeighbors].neighborTid = neighborTids[i];
00147                         tdd->nd[tdd->numNeighbors].neighborPortTag = 3;
00148                         tdd->nd[tdd->numNeighbors].neighborTerm = NOTTERMINATED;
00149                         tdd->nd[tdd->numNeighbors].neighborStreamId = tdd->numNeighbors;
00150                         tdd->numNeighbors++;
00151                 }
00152         }
00153 
00154         return tdd;
00155 }
00156 
00157 TerminationDetection *initTerminationDetection() {
00158         int i;
00159         int numPorts = fd->numInputPorts + fd->numOutputPorts;
00160 
00161 #ifdef DEBUG
00162         printf("Begin initTerminationDetection\n");
00163 #endif
00164         
00165         TerminationDetection *tdd = (TerminationDetection *) malloc(sizeof(TerminationDetection));
00166         if(!tdd) {
00167                 printf("initTerminationDetection: could not allocate memory\n");
00168                 return NULL;
00169         }
00170         
00171         tdd->localTag = 0;
00172         tdd->status = NOTPARTICIPATING; 
00173         tdd->numNeighbors = 0;
00174 
00175         // Find out the number of neighbors
00176         tdd->nd = (NeighborData *) malloc(numPorts*MAXINSTANCES*sizeof(NeighborData));
00177         for(i = 0; i < fd->numInputPorts; i++) {
00178                 // Input neighbors
00179                 tdd = storeNeighborTids(tdd, fd->inputPorts[i]->numSources, fd->inputPorts[i]->tidsSources, fd->inputPorts[i]->tag);
00180         }
00181         for(i = 0; i <fd->numOutputPorts; i++) {
00182                 // Output neighbors
00183                 tdd = storeNeighborTids(tdd, fd->outputPorts[i]->numDestinations, fd->outputPorts[i]->tidsDestinations, fd->outputPorts[i]->tag);
00184         }
00185 
00186         tdd->numStreams = numPorts;
00187         tdd->stream = (int *) malloc((tdd->numStreams)*sizeof(int));
00188         // Init all streams as FULL
00189         for(i = 0; i < tdd->numStreams; i++) {
00190                 tdd->stream[i] = FULL;
00191         }
00192 
00193 #ifdef DEBUG
00194         printf("End initTerminationDetection\n");
00195 #endif
00196         
00197         return tdd;
00198 }
00199 
00200 void sendTerminationToNeighbors() {
00201         int i;
00202         int type = MSGT_INITTERM;
00203 
00204         for(i = 0; i < tdd->numNeighbors; i++) {
00205                 // send a new detection round to all neighbors
00206                 pvm_initsend(PvmDataRaw);
00207                 pvm_pkint(&type, 1, 1); // message type
00208                 pvm_pkint(&tdd->localTag, 1, 1); // local tag
00209                 pvm_send(tdd->nd[i].neighborTid, tdd->nd[i].neighborPortTag); //we send with the right tag
00210         }
00211 
00212 }
00213 
00214 void sendTerminationToLeader() {
00215 
00216         // send local termination to leader
00217         pvm_initsend(PvmDataRaw);
00218         pvm_pkint(&tdd->localTag, 1, 1); // local tag
00219         pvm_send(pvm_parent(), MSGT_LOCALTERM); //we send with the right tag
00220 }
00221 
00222 
00223 /// Begin a new termination detection round
00224 int beginTerminationDetection() {
00225 
00226 #ifdef DEBUG
00227         printf("Begin Termination Detection\n");
00228 #endif
00229         
00230         tdd->localTag++;
00231 
00232         sendTerminationToNeighbors();
00233 
00234         tdd->status = PARTICIPATING;
00235 
00236         return 1;
00237 }
00238 
00239 int findNeighborId(int neighborTid) {
00240         int i;
00241 
00242         for(i = 0; i < tdd->numNeighbors; i++) {
00243                 if(tdd->nd[i].neighborTid == neighborTid) {
00244                         return tdd->nd[i].neighborStreamId;
00245                 }
00246         }
00247 
00248         return -1;
00249 }
00250 
00251 // There is a termination detection round created.
00252 int terminationDetectionRound(int neighborTid, int neighborTag) {
00253         int     neighborId, i;
00254         int ITerminated = 1;
00255 
00256 #ifdef DEBUG
00257         printf("Begin terminationDetectionRound\n");
00258 #endif
00259         
00260         // Find wicth neighbor sent a termination detection round
00261         neighborId = findNeighborId(neighborTid);
00262         if(neighborId == -1) {
00263                 printf("terminationDetectionRound: error finding tid: %d", neighborTid);
00264                 return -1;
00265         }
00266         // Mark stream as empty
00267         tdd->stream[neighborId] = EMPTY;
00268 
00269         if(neighborTag > tdd->localTag) {
00270                 // I am not participating of the current termination detection round
00271                 tdd->localTag = neighborTag;
00272                 for(i = 0; i < tdd->numNeighbors; i++) {
00273                         tdd->nd[i].neighborTerm = NOTTERMINATED;
00274                 }
00275                 tdd->nd[neighborId].neighborTerm = TERMINATED;
00276                 tdd->status = PARTICIPATING;
00277                 sendTerminationToNeighbors();
00278 #ifdef DEBUG
00279                 printf("terminationDetectionRound: I was not participating\n");
00280 #endif
00281         } 
00282         
00283         if(neighborTag == tdd->localTag) {
00284                 // I was participating. Am I terminated?
00285                 tdd->nd[neighborId].neighborTerm = TERMINATED;
00286 
00287 #ifdef DEBUG
00288                 printf("terminationDetectionRound: I was participating\n");     
00289 #endif
00290                 
00291                 for(i = 0; i < tdd->numNeighbors; i++) {
00292                         if(tdd->nd[i].neighborTerm == NOTTERMINATED || tdd->stream[i] == FULL) {
00293                                 ITerminated = 0;
00294                                 break;
00295                         }
00296                 }
00297                 if(ITerminated) {
00298 #ifdef DEBUG
00299                         printf("terminationDetectionRound: I terminated\n");
00300 #endif
00301                         sendTerminationToLeader();
00302                 }
00303         }
00304 
00305 #ifdef DEBUG
00306         printf("End terminationDetectionRound\n");
00307 #endif
00308 
00309         return 1;
00310 }
00311 
00312 int updateTermStreamToEmpty(int neighborTid) {
00313         int     neighborId;
00314 
00315 #ifdef DEBUG
00316         printf("updateTermStreamToEmpty: I am not participating anymore\n");
00317 #endif
00318         
00319         // Find witch stream is empty
00320         neighborId = findNeighborId(neighborTid);
00321 
00322         if((tdd->status == PARTICIPATING) /*&& (tdd->nd[neighborId].neighborTerm == NOTTERMINATED)*/) {
00323                 tdd->stream[neighborId] = FULL;
00324                 tdd->status = NOTPARTICIPATING;
00325         }
00326 
00327         return 1;
00328 }
00329 
00330 
00331 /**** End filter functions ****/
00332 
00333 #endif

Generated on Tue Jan 17 19:18:39 2006 for Void by  doxygen 1.4.6