FilterData.c

Go to the documentation of this file.
00001 #ifndef FILTERDATA_C
00002 #define FILTERDATA_C
00003 
00004 #ifdef VOID_INST
00005 #include <sys/stat.h>
00006 #include <sys/types.h>
00007 #include <errno.h>
00008 #endif
00009 #include <stdio.h>
00010 #include <stdlib.h>
00011 #include <unistd.h>
00012 #include <string.h>
00013 #include <dlfcn.h>
00014 #include <pvm3.h>
00015 
00016 //abstract data types
00017 #include <Task/Task.h>
00018 #include <Cache/Cache.h>
00019 
00020 #include "FilterData.h"
00021 #include "Ports.h"
00022 #include "../constants.h"
00023 #include "../Messages.h"
00024 #ifdef VOID_INST
00025 #include "Instrumentation.h"
00026 #endif
00027 #ifdef VOID_TERM
00028 #include "Termination.h"
00029 #endif
00030 #ifdef VOID_TRACER
00031 #include <Tracer/tracer.h>
00032 #include "tracingdefs.h"
00033 int msgId;
00034 int incomingMsgId;
00035 #endif
00036 
00037 
00038 //these functions just call the init process and finalize, theyre here to make easier to debug
00039 static int callFDInit(FilterData *fd, void *work, int worksize){
00040         return fd->libdata.init(work, worksize);
00041 }
00042 
00043 static int callFDProcess(FilterData *fd, void* work, int worksize){
00044         return fd->libdata.process(work, worksize);
00045 }
00046 
00047 static int callFDFinalize(FilterData *fd){
00048         return fd->libdata.finalize();
00049 }
00050 
00051 FilterData *createFilterData(){
00052         FilterData *fd = (FilterData *) malloc(sizeof(FilterData));
00053         if(!fd){
00054                 printf("createFilterData: could not allocate memory\n");
00055                 return NULL;
00056         }
00057         // init num ports
00058         fd->numInputPorts = 0;
00059         fd->numOutputPorts = 0;
00060 
00061         // Used to now where a new port can be added
00062         fd->numInportsAdded = 0;
00063         fd->numOutportsAdded = 0; 
00064 
00065 #ifdef VOID_INST
00066         //we beign at void state
00067         fd->instData = instCreate();
00068 #endif
00069 #ifdef VOID_TRACER
00070         char tracefilename[ MAX_HNAME_LENGTH + 8 + 100 ];
00071         char * hostname = malloc( sizeof( char )*MAX_HNAME_LENGTH );
00072         gethostname( hostname, MAX_HNAME_LENGTH );
00073         sprintf( tracefilename, "trace.%s.%d", hostname, pvm_mytid() );
00074         free( hostname );
00075         msgId = 0;
00076         fd->trcData = trcCreateData( tracefilename );
00077 #endif
00078         return fd;
00079 }
00080 
00081 /// \todo Only free FilterData. Needs to free input e output port.
00082 void destroyFilterData(FilterData *fd){
00083 
00084         //free the library
00085         dlclose(fd->libdata.libHandler);
00086         //free ports data
00087         int i;
00088         for (i=0;i<fd->numInputPorts;i++){
00089                 destroyInputPort(fd->inputPorts[i]);
00090         }
00091         for (i=0;i<fd->numOutputPorts;i++){
00092                 destroyOutputPort(fd->outputPorts[i]);
00093         }
00094 
00095 #ifdef VOID_TRACER
00096         trcDestroyData( &(fd->trcData) );
00097 #endif
00098 
00099 #ifdef VOID_INST
00100         instSwitchState(fd->instData, &fd->instData->voidStates[TIMER_END]);
00101         //we save the data to a file named filtername.instance
00102         char filename[MAX_FNAME_LENGTH + 100];
00103         sprintf(filename, "%s.%d", fd->name, fd->myRank);
00104         //for now, we use filename for label also
00105         instSaveTimings(fd->instData, filename, filename);
00106         instDestroy(&fd->instData);
00107         //we dont leave state, as data is destroyed...
00108 #endif
00109 
00110         
00111         if(!fd){
00112                 printf("destroyFilterData: FilterData is NULL.. I cant free it\n");
00113         }else{
00114                 free(fd);
00115         }
00116 }
00117 
00118 int setFDNumInputs(FilterData *fd, int numInp){
00119         if (numInp >= MAXINPSTREAMS)
00120                 return 0;
00121 
00122         fd->numInputPorts = numInp;
00123         return 1;               
00124 }
00125 
00126 int setFDNumOutputs(FilterData *fd, int numOut){
00127         fd->numOutputPorts = numOut;
00128         return 0;
00129 }
00130 
00131 // get a copy of filter name
00132 char *getFDName(FilterData *fd){
00133         return(strdup(fd->name));
00134 }
00135 
00136 // set filter name and free name
00137 int setFDName(FilterData *fd, char *name){
00138         int nameSize = strlen(name);
00139 
00140         // case the name be bigger than our name limit
00141         if((nameSize) > MAX_FNAME_LENGTH){
00142                 printf("Warning: Filter name out of bounds\n");
00143                 memcpy(fd->name, name, MAX_FNAME_LENGTH);
00144                 fd->name[MAX_FNAME_LENGTH] = '\0';
00145                 return 0;
00146         }else{
00147                 memcpy(fd->name, name, nameSize +1); // +1 to copy \0
00148                 return 1;
00149         }
00150 }
00151 
00152 // set the host where this filter is and free name
00153 int setFDHostName(FilterData *fd, char *hostName){
00154         int hostNameSize = strlen(hostName);
00155 
00156         // case the host name be bigger than our host name limit
00157         if((hostNameSize) >  MAX_HNAME_LENGTH){
00158                 printf("Warning: Host name out of bounds\n");
00159                 memcpy(fd->hostName, hostName,  MAX_HNAME_LENGTH);
00160                 fd->hostName[MAX_HNAME_LENGTH] = '\0';
00161                 return 0;
00162         }else{
00163                 memcpy(fd->hostName, hostName, hostNameSize +1);
00164                 return 1;
00165         }
00166 }
00167 
00168 int getFDNumInstances(FilterData *fd) {
00169         return  fd->numInstances;
00170 }
00171 
00172 int setFDNumInstances(FilterData *fd, int numInstances){
00173         if (numInstances >= MAXINSTANCES)
00174                         return -1;
00175         
00176         fd->numInstances = numInstances;
00177         return 0;
00178 }
00179 
00180 void setFDRank(FilterData *fd, int rank){
00181         fd->myRank = rank;
00182 }
00183 
00184 // Set the filter id ... Id is unique for all set of filters
00185 void setFDIdFilter(FilterData *fd, int id){
00186         fd->id = id;
00187 }
00188 
00189 //sets the tids
00190 void setFDTids(FilterData *fd, int *tids){
00191         memcpy(fd->tids, tids, sizeof(int)*fd->numInstances);
00192 }
00193 
00194 #ifdef VOID_INST
00195 //sets FilterData instdir
00196 void setFDInstDir(FilterData *fd, char *dir){
00197         instSetDir(fd->instData, dir);
00198 }
00199 #endif
00200 
00201 // only add a InputPort in FilterData
00202 int addFDInputPort(FilterData *fd, InputPort *p){
00203         if((fd->numInportsAdded +1) < MAXINPSTREAMS){
00204                 fd->inputPorts[fd->numInportsAdded] = p;
00205                 fd->numInportsAdded++;
00206                 return 1;
00207         }else{
00208                 printf("Warning: inputPorts number out of bounds\n");
00209                 return 0;
00210         }
00211 }
00212 
00213 // only add a OutputPort in FilterData
00214 int addFDOutputPort(FilterData *fd, OutputPort *p){
00215         if((fd->numOutportsAdded + 1) < MAXOUTSTREAMS){
00216                 fd->outputPorts[fd->numOutportsAdded] = p;
00217                 fd->numOutportsAdded++;
00218                 return 1;
00219         }else{
00220                 printf("Warning: outputPorts number out of bounds\n");
00221                 return 0;
00222         }
00223 }
00224 
00225 int setFDLibName(FilterData *fd, char *libName){
00226         int libNameSize = strlen(libName);
00227 
00228         // case the host name be bigger than our host name limit
00229         if((libNameSize) >  MAX_LNAME_LENGTH){
00230                 printf("FilterData.c: warning, lib name(%s) out of bounds, truncating\n", libName);
00231                 memcpy(fd->libdata.name, libName, MAX_LNAME_LENGTH);
00232                 fd->libdata.name[MAX_LNAME_LENGTH] = '\0';
00233                 return 0;
00234         }else{
00235                 memcpy(fd->libdata.name, libName, libNameSize +1);
00236                 return 1;
00237         }
00238 }
00239 
00240 //this function loads the filter libraries(init process finalize)
00241 int loadFDLibFunctions(FilterData *fd){
00242   
00243         char *error = NULL;
00244         char *libnameLocal;
00245 
00246         // try load lib in local directory, so we need 2 strigns, 
00247         // the first with ./, em the second whithout
00248         libnameLocal = (char*)malloc(strlen(fd->libdata.name) + 3);
00249         sprintf(libnameLocal, "./%s", fd->libdata.name);
00250         
00251         // get the library handler
00252         if      (((fd->libdata.libHandler = dlopen(libnameLocal, RTLD_NOW)) == NULL) && 
00253                 ((fd->libdata.libHandler = dlopen(fd->libdata.name, RTLD_NOW)) == NULL )) {
00254                         fprintf(stderr, "FilterData.c: could not load filter %s library, %s\n", fd->libdata.name, dlerror());
00255                         return -1;
00256         }
00257         free(libnameLocal);
00258 
00259         // load filter from dynamic library : initFilter - processFilter - finalizeFilter
00260         // warnnig: The function simbols can be in C or C++
00261         fd->libdata.init = (initialize_t *)dlsym(fd->libdata.libHandler, "initFilter");
00262         if ((error = dlerror()) != NULL)  {
00263                 fd->libdata.init = (initialize_t *)dlsym(fd->libdata.libHandler, "initFilterCpp"); // codifica??o (mangling, g++ 3.2+) do s?mbolo em c++ 
00264                 //fd->libdata.init = (initialize_t *)dlsym(fd->libdata.libHandler, "_ZN7filtroA7initCppEv"); // codifica??o (mangling, g++ 3.2+) do s?mbolo em c++ 
00265                 if ((error = dlerror()) != NULL)  {
00266                         fprintf (stderr, "Erro no dlsym: %s\n", error);
00267                         return -1;
00268                 }
00269         }
00270 
00271         fd->libdata.process = (process_t *)dlsym(fd->libdata.libHandler, "processFilter");
00272         if ((error = dlerror()) != NULL)  {
00273                 fd->libdata.process  = (process_t *)dlsym(fd->libdata.libHandler, "processFilterCpp"); // codifica??o (mangling, g++ 3.2+) do s?mbolo em c++
00274                 //fd->libdata.process  = (process_t *)dlsym(fd->libdata.libHandler, "_ZN7filtroA10processCppEv"); // codifica??o (mangling, g++ 3.2+) do s?mbolo em c++
00275 
00276                 if ((error = dlerror()) != NULL)  {
00277                         fprintf (stderr, "Erro no dlsym: %s\n", error);
00278                         return -1;
00279                 }
00280         }
00281 
00282         fd->libdata.finalize   = (finalize_t *)dlsym(fd->libdata.libHandler, "finalizeFilter");
00283         if ((error = dlerror()) != NULL)  {
00284                 fd->libdata.finalize = (finalize_t *)dlsym(fd->libdata.libHandler, "finalizeFilterCpp"); // codifica??o (mangling, g++ 3.2+) do s?mbolo em c++
00285                 //fd->libdata.finalize = (finalize_t *)dlsym(fd->libdata.libHandler, "_ZN7filtroA7finiCppEv"); // codifica??o (mangling, g++ 3.2+) do s?mbolo em c++
00286 
00287                 if ((error = dlerror()) != NULL)  {
00288                         fprintf (stderr, "Erro no dlsym: %s\n", error);
00289                         return -1;
00290                 }
00291         }
00292         return 1;
00293 }
00294 
00295 int setFDMachineMem(FilterData *fd, int mem){
00296         //if mem = -1, we autodetect
00297         if (mem == -1){
00298                 /// \todo Read memory. Now its hardcoded to 512MB
00299                 printf("FilterData.c: Warning, no mem declared using 512MB as memory value\n");
00300                 fd->memory = 512;
00301         }
00302         else {
00303                 fd->memory = mem;
00304         }
00305         return 1;
00306 }
00307 
00308 int getFDMachineMem(FilterData *fd){
00309         return fd->memory;
00310 }
00311 
00312 int setFDNumLocalInstances(FilterData *fd, int numLocalInstances){
00313         fd->numLocalInstances = numLocalInstances;
00314         return 1;
00315 }
00316 
00317 int getFDLocalInstances(FilterData *fd){
00318         return fd->numLocalInstances;
00319 }
00320 
00321 int recvFilterData(FilterData *fData) {
00322         int bufId;
00323         int i, l;
00324         char *cwd = (char *)malloc(MAX_CWD_LENGTH+1);
00325         int num = 0;
00326         int *tids = (int *)malloc(sizeof(int)*MAXINSTANCES);
00327         char *filterName         = (char *)malloc(MAX_FNAME_LENGTH);
00328         char *libName            = (char *)malloc(MAX_LNAME_LENGTH);
00329         char *hostname           = (char *)malloc(MAX_HNAME_LENGTH);
00330         char *labelStreamLibname = (char *)malloc(MAX_LNAME_LENGTH);
00331         
00332         int parentTid = pvm_parent();
00333 
00334         //We receive one message with all data in it
00335         bufId = pvm_recv(parentTid, 0);
00336 
00337         //get the current working directory
00338         pvm_upkint(&l, 1, 1);
00339         pvm_upkbyte(cwd, l, 1);
00340         cwd[l] = '\0';
00341 
00342         // filter id, useful for debugging
00343         pvm_upkint(&num, 1, 1);
00344         setFDIdFilter(fData, num);
00345         // get my rank
00346         pvm_upkint(&num, 1, 1);
00347         setFDRank(fData, num);
00348         // total number of instances of this filter
00349         pvm_upkint(&num, 1, 1);
00350         setFDNumInstances(fData, num);
00351         // get my brothers tids
00352         pvm_upkint(tids, num, 1);
00353         setFDTids(fData, tids);
00354 
00355         // filtername
00356         pvm_upkint(&l, 1, 1);
00357         pvm_upkbyte(filterName, l, 1);
00358         filterName[l] = '\0';
00359         setFDName(fData, filterName);
00360 
00361         //machine declared memory: -1 autodetect, declared on XML
00362         pvm_upkint(&num, 1, 1);
00363         setFDMachineMem(fd, num);
00364 
00365         //number of brothers(+ me) I have on this machine, useful for memory management
00366         pvm_upkint(&num, 1, 1);
00367         setFDNumLocalInstances(fd, num);
00368 
00369 #ifdef VOID_INST
00370         char instDir[MAX_IDIR_LENGTH];
00371         pvm_upkint(&l, 1, 1);
00372         pvm_upkbyte(instDir, l, 1);
00373         instDir[l] = '\0';
00374         setFDInstDir(fd, instDir);
00375 #endif
00376         
00377         // receives shared lib name
00378         pvm_upkint(&l, 1, 1);
00379         pvm_upkbyte(libName, l, 1);
00380         libName[l] = '\0';
00381         setFDLibName(fData, libName);
00382         if (loadFDLibFunctions(fData) == -1){
00383                 char msg[1000];
00384                 sprintf(msg, "could not load shared library %s", libName);
00385                 pvm_initsend(PvmDataRaw);
00386 
00387                 pvm_pkbyte(msg, strlen(msg), 1);
00388                 pvm_send(pvm_parent(), MSGT_FERROR);
00389                 return -1;
00390         }
00391 
00392         // set hostname
00393         gethostname(hostname, MAX_HNAME_LENGTH);
00394         setFDHostName(fData, hostname);
00395 
00396         // data received till now
00397         fprintf(stderr,"filter %s (rank: %d): pvm_tid:%d hostname:%s\n",
00398                 fData->name, fData->myRank, pvm_mytid(), fData->hostName);
00399 
00400         //port data
00401         //Receive numOutputs
00402         pvm_upkint(&num, 1, 1);
00403         setFDNumOutputs(fData, num);
00404         //receive numInputs
00405         pvm_upkint(&num, 1, 1);
00406         setFDNumInputs(fData, num);
00407 
00408         // for each OutputPort
00409         for(i = 0; i < fData->numOutputPorts; i++) {
00410                 int nOutHosts = 0, tag = 0;
00411                 int *outTids  = NULL;
00412                 int firstInstanceToWrite;
00413                 char *portName = (char *)malloc(MAX_PTNAME_LENGTH + 1);
00414                 char *writePolicyName = (char *)malloc(100);
00415                 writePolicy_t wp;
00416 
00417                 OutputPort *outputPort;
00418 
00419                 //now we can create the port
00420                 outputPort = createOutputPort();
00421 
00422                 //port data
00423                 pvm_upkint(&l, 1, 1);
00424                 pvm_upkbyte(portName, l, 1);  //portname
00425                 portName[l] = '\0';
00426                 setOPName(outputPort, portName);
00427 
00428                 pvm_upkint(&nOutHosts, 1, 1); //number of tids it is connected
00429 
00430                 setOPNumDestinations(outputPort, nOutHosts);
00431                 outTids = (int *) malloc(sizeof(int)*nOutHosts);
00432                 pvm_upkint(outTids, nOutHosts, 1); //get tids
00433                 setOPTidsDestinations(outputPort, outTids);
00434                 pvm_upkint(&tag, 1, 1); //get tag
00435                 setOPTag(outputPort, tag);
00436 
00437                 pvm_upkint(&l, 1, 1);
00438                 pvm_upkbyte(writePolicyName, l, 1); // get write policy
00439                 writePolicyName[l] = '\0';
00440                 wp = getWritePolicyByName(writePolicyName);
00441                 setOPWritePolicy(outputPort, wp);
00442 
00443                 // get LS sharedlib if policy is LS
00444                 if (wp  == LABELED_STREAM ){
00445                         pvm_upkint(&l, 1, 1);
00446                         pvm_upkbyte(labelStreamLibname, l, 1);
00447                         labelStreamLibname[l] = '\0';
00448                         //set output port library name
00449                         setOPLibName(outputPort, labelStreamLibname);
00450                         //load output port library for ls
00451                         if (loadOPLSData(outputPort) == -1 ){
00452                                         char msg[1000];
00453                                         sprintf(msg, "could not load LS shared library %s", labelStreamLibname);
00454                                         pvm_initsend(PvmDataRaw);
00455 
00456                                         pvm_pkbyte(msg, strlen(msg), 1);
00457                                         pvm_send(pvm_parent(), MSGT_FERROR);
00458                                         return -1;      
00459                         }
00460                 }
00461                 else if (wp == MULTICAST_LABELED_STREAM) {
00462                         pvm_upkint(&l, 1, 1);
00463                         pvm_upkbyte(labelStreamLibname, l, 1);
00464                         labelStreamLibname[l] = '\0';
00465                         //set output port library name
00466                         setOPLibName(outputPort, labelStreamLibname);
00467                         //load output port library for ls
00468                         if (loadOPMLSData(outputPort) == -1){
00469                                         char msg[1000];
00470                                         sprintf(msg,"could not load MLS shared library %s", labelStreamLibname);
00471                                         pvm_initsend(PvmDataRaw);
00472 
00473                                         pvm_pkbyte(msg, strlen(msg), 1);
00474                                         pvm_send(pvm_parent(), MSGT_FERROR);
00475                                         return -1;
00476                         }
00477                 }
00478                 else {
00479                         //if not LS, we needa know who will be the first instance to receive msgs
00480                         pvm_upkint(&firstInstanceToWrite, 1, 1); //the first instance to write
00481                         setOPNextToSend(outputPort, firstInstanceToWrite);
00482                 }
00483 
00484                 // and we finally add the port to our filterData structure
00485                 addFDOutputPort(fData, outputPort);
00486 
00487                 //free pointers
00488                 free(outTids);
00489                 free(portName);
00490                 free(writePolicyName);
00491         }
00492 
00493         // foreach InputPort
00494         for(i = 0; i < fData->numInputPorts; i++) {
00495                 int nInHosts = 0;
00496                 int *inTids  = NULL;
00497                 int inTag;
00498                 char portName[MAX_PTNAME_LENGTH + 1];
00499                 int l;
00500 
00501                 InputPort *inputPort =  createInputPort();
00502 
00503                 //get the portName
00504                 pvm_upkint(&l, 1, 1);
00505                 pvm_upkbyte(portName, l, 1);
00506                 portName[l] = '\0';
00507                 setIPName(inputPort, portName);
00508 
00509                 // receive the number of tids of this port
00510                 pvm_upkint(&nInHosts, 1, 1); // number of instances connected to this port
00511                 setIPNumSources(inputPort, nInHosts);
00512 
00513                 // get the tids
00514                 inTids = (int *) malloc(sizeof(int)*nInHosts);
00515                 pvm_upkint(inTids, nInHosts, 1); //get tids
00516                 setIPTidsSources(inputPort, inTids);
00517                 free(inTids);
00518 
00519                 pvm_upkint(&inTag, 1, 1); //the port tag
00520                 setIPTag(inputPort, inTag);
00521 
00522                 //finally add the port to our filterData
00523                 addFDInputPort(fData, inputPort);
00524         }
00525 
00526         
00527         free(tids);
00528         free(hostname);
00529         free(filterName);
00530         free(libName);
00531         free(labelStreamLibname);
00532 
00533 #ifdef VOID_TRACER
00534 
00535 #define sub_times( a, b, result ) if ( a.tv_usec < b.tv_usec ){ result.tv_usec = 1000000 + a.tv_usec - b.tv_usec; result.tv_sec = (a.tv_sec - 1) - b.tv_sec; } else { result.tv_usec = a.tv_usec - b.tv_usec; result.tv_sec = a.tv_sec - b.tv_sec; };
00536 
00537 #define add_times( a, b, result ) result.tv_usec = a.tv_usec + b.tv_usec; if ( result.tv_usec > 1000000 ){ result.tv_usec -= 1000000; result.tv_sec = a.tv_sec + b.tv_sec + 1; } else { result.tv_sec = a.tv_sec + b.tv_sec; };
00538 
00539 
00540         // Clock synchronization
00541         
00542         struct timeval t1, t3, t4;
00543         pvm_recv(parentTid, 666);
00544         pvm_initsend(PvmDataRaw);
00545         gettimeofday( &t1, NULL );
00546         pvm_pkint( (int*)(&(t1.tv_sec)) , 1, 1 );
00547         pvm_send(parentTid, 666); 
00548         pvm_recv(parentTid, 666);
00549         pvm_upkint( (int*)(&(t3.tv_sec)), 1, 1 );
00550         pvm_upkint( (int*)(&(t3.tv_usec)), 1, 1 );
00551         gettimeofday( &t4, NULL );
00552         sub_times( t4, t3, fData->trcData->initTime );
00553         sub_times( t4, t1, t4 );
00554         t4.tv_sec = t4.tv_sec / 2;
00555         t4.tv_usec = t4.tv_usec / 2;
00556         add_times( fData->trcData->initTime, t4, fData->trcData->initTime );
00557 #endif
00558 
00559         return 1;
00560 }
00561 
00562 /// Run the filter
00563 void runFilter() {
00564         int i;
00565         char eowMsg[] = "EOW";
00566         int currentWork = 0;
00567 
00568         //manager tid
00569         int parentTid = pvm_parent();
00570         
00571         fd = createFilterData();
00572         if (recvFilterData(fd) == -1){
00573                 /// \todo An error ocourred! We should notify manager of this!!
00574                 return;
00575         }
00576 
00577 #ifdef VOID_TERM
00578         tdd = initTerminationDetection();
00579         if(tdd == NULL) {
00580                 /// \todo An error ocourred! We should notify manager of this!!
00581                 return;
00582         }
00583 #endif
00584 
00585         //initCache(fd->id, fd->myRank);
00586 
00587         // now, we start working!
00588         // loop getting work, till we get EOF.
00589         // for each appendWork received, calls init, process, finalize
00590         while (1) {
00591                 // before init, we must reset all ports, so user get valid data
00592                 // Coutinho: this must be done before the UOW recieve because after that,
00593                 // the other filters are already running AND sending vaid data.
00594 
00595                 /// \bug Resetting the ports even before receiving the UOW ISN'T SAFE. 
00596                 /// Before we even recieve the UOW, the other filters can already
00597                 /// started to run and sending valid data to this filter. The only safe 
00598                 /// way is: send EOW at the end of finalizeFilter() and only drain 
00599                 /// data from the channels that doesn't recieved a EOW until recieve it
00600 
00601                 // reset all input ports. They may be used in the next work
00602                 for(i = 0; i < fd->numInputPorts; i++) {
00603                         resetInputPort(fd->inputPorts[i]);
00604                 }
00605                 // reset all output ports. They may be used in the next work
00606                 for(i = 0; i < fd->numOutputPorts; i++) {
00607                         resetOutputPort(fd->outputPorts[i]);
00608                 }
00609 
00610 
00611                 //get message from manager, hopefully a work
00612                 int bufId = pvm_recv(parentTid, 0);
00613                 int msgSize, msgType;
00614                 pvm_bufinfo(bufId, &msgSize, NULL, NULL);
00615                 pvm_upkint(&msgType, 1, 1);
00616 
00617                 //msg type can be either WORK, EOF or FT
00618                 //leave if we get EOF
00619                 if(msgType == MSGT_EOF) {
00620                         break;
00621                 }
00622                 
00623                 //get and set work
00624                 int workSize = msgSize - sizeof(int);
00625                 void *work = malloc(workSize);
00626                 pvm_upkbyte((char *)work, workSize, 1);
00627 
00628                 
00629                 // For now, restart the cache for each work, but to do fault tolerance 
00630                 // with multiple appendWorks(), we will need to make the cache become 
00631                 // capable of saving/recovering multiple works. When the cache became
00632                 // capable of that we will init it only once and use cacheSetCurrentWork().
00633 
00634                 // Restarting the cache should be done after the main lopp break, 
00635                 // otherwise we will finalize this process without cleaning the cache.
00636                 initCache(fd->id, fd->myRank);
00637                 cacheSetCurrentWork(currentWork); 
00638                 
00639                 /// \todo put a signal handler to clean the cache on SIGTERM recievement
00640 
00641                 // lets run
00642 #ifdef VOID_INST
00643                 InstData *inst = fd->instData;
00644                 //enter init state
00645                 instSwitchState(inst, &inst->voidStates[TIMER_INIT]);
00646 #endif
00647 #ifdef VOID_TRACER
00648                 // Enter init state
00649                 trcEnterState( fd->trcData, VT_PROC_INIT );
00650 #endif
00651 
00652 
00653                 callFDInit(fd, work, workSize);         
00654                 int weUseTasks = cacheGetUseTasks();
00655 #ifdef VOID_FT
00656                 if (weUseTasks) {
00657                         // one fault has occurred: filters must begin in a consistent global state
00658                         
00659                         // Tell manager that we use tasks.
00660                         pvm_initsend(PvmDataDefault);
00661                         pvm_pkint(&weUseTasks, 1, 1);
00662                         // Piggback :-) in this message the terminated tasks list
00663                         TaskIdList *finishedTaskIdList = getFinishedTasks();
00664                         packTaskIdList(finishedTaskIdList);
00665                         taskIdListDestroy(finishedTaskIdList);
00666                         pvm_send(parentTid, 0);
00667 
00668                         // get if we need to forward task creation and finalize messages
00669                         pvm_recv(parentTid, 0);
00670                         int needToForwardTaskMsgs = -1;
00671                         pvm_upkint(&needToForwardTaskMsgs, 1, 1);
00672                         cacheSetForwardTaskMsgs(needToForwardTaskMsgs);
00673                         // Receive global state (global terminated tasks list)
00674                         TaskIdList *globalTaskIdList = (TaskIdList *)unpackTaskIdList();
00675 
00676                         // Recover tasks checkpoints and restart interrupted tasks (recovery callbacks must be registered)
00677                         cacheRecoverTasks(globalTaskIdList);
00678                         taskIdListDestroy(globalTaskIdList);
00679                 } else {
00680 #endif                  
00681                         // Tell manager that we don't use tasks
00682                         pvm_initsend(PvmDataDefault);
00683                         pvm_pkint(&weUseTasks, 1, 1);
00684                         pvm_send(parentTid, 0);
00685                         
00686                         // It will respond if we need to forward task creation messages
00687                         pvm_recv(parentTid, 0);
00688                         int needToForwardTaskMsgs = -1;
00689                         pvm_upkint(&needToForwardTaskMsgs, 1, 1);
00690                         cacheSetForwardTaskMsgs(needToForwardTaskMsgs);
00691 #ifdef VOID_FT
00692                 }
00693 #endif
00694                 
00695                 
00696 #ifdef VOID_INST
00697                 //leave init and enter process
00698                 instSwitchState(inst, &inst->voidStates[TIMER_PROC]);
00699 #endif
00700 #ifdef VOID_TRACER
00701                 // Leave init, enter process
00702                 trcLeaveState( fd->trcData );
00703                 trcEnterState( fd->trcData, VT_PROC_PROC );
00704 #endif
00705 
00706                 callFDProcess(fd, work, workSize);
00707 #ifdef VOID_INST
00708                 //leave process and enter VOID
00709                 instSwitchState(inst, &inst->voidStates[TIMER_VOID]);   
00710 #endif
00711 
00712                 // here we have run init and process. We gotta close all doors before finalizing
00713                 // send eow to all outputStream
00714                 for(i = 0; i < fd->numOutputPorts; i++) {
00715                         if (fd->outputPorts[i]->state == OP_STATE_OPEN){
00716                                 closeOutputPort(fd->outputPorts[i]);
00717                         }
00718                 }
00719 
00720                 // and for the manager too, but here we use the type as a tag
00721                 pvm_initsend(PvmDataRaw);
00722                 pvm_pkbyte(eowMsg, strlen(eowMsg)+1, 1);
00723                 pvm_send(pvm_parent(), MSGT_EOW);
00724 
00725                 // finalize
00726 #ifdef VOID_INST
00727                 instSwitchState(fd->instData, &inst->voidStates[TIMER_FINALIZE]);
00728 #endif
00729 #ifdef VOID_TRACER
00730                 // Leave process, enter finalize
00731                 trcLeaveState( fd->trcData );
00732                 trcEnterState( fd->trcData, VT_PROC_FINALIZE );
00733 #endif
00734 
00735                 callFDFinalize(fd);
00736 
00737 #ifdef VOID_TRACER
00738                 trcLeaveState( fd->trcData );
00739 #endif
00740 #ifdef VOID_INST
00741                 instSwitchState(fd->instData, &inst->voidStates[TIMER_VOID]);
00742 #endif
00743         
00744                 // With multiple works we will modify this
00745                 currentWork++;
00746                 //cacheSetCurrentWork(currentWork);
00747                 destroyCache();
00748         }
00749 
00750         //destroyCache();
00751         
00752         destroyFilterData(fd);
00753         pvm_exit();
00754 }
00755 
00756 #endif

Generated on Tue Jan 17 19:18:38 2006 for Void by  doxygen 1.4.6