FilterDev.c

Go to the documentation of this file.
00001 #include <pvm3.h>
00002 #include <stdlib.h>
00003 #include <assert.h>
00004 #include <signal.h>
00005 #include <sys/time.h>
00006 #include <string.h>
00007 #include <unistd.h>
00008 #include "FilterDev.h"
00009 
00010 //abstract data types
00011 #include <Task/Task.h>
00012 #include <DataSpace/DataSpace.h>
00013 #include <Cache.h>
00014 
00015 #include "../FilterData/FilterData.h"
00016 #include "../FilterData/Termination.h"
00017 #include "../constants.h"
00018 #include "../Messages.h"
00019 
00020 #define KEY_INT
00021 #define VAL_VOID
00022 #include "hash.h"
00023 #undef VAL_VOID
00024 #undef KEY_INT
00025 
00026 
00027 #ifdef VOID_INST
00028 #include "../FilterData/Instrumentation.h"
00029 #endif
00030 #ifdef VOID_TRACER
00031 #include <Tracer/tracer.h>
00032 #include "tracingdefs.h"
00033 extern int msgId;
00034 #endif
00035 
00036 
00037 /** \file FilterDev.c This file implements all functions called by filters.
00038  *  
00039  *  \todo Clean this file, It should be like the linux VFS, inly redirecting
00040  *  calls to the apropriate Anthill module.
00041  *
00042  *  \todo Change functions prefix to Anthill prefix: ahXxx().
00043  *
00044  *  \todo Purge message packing and create a wrapper to it.
00045  */
00046 
00047 
00048 /// This is the send buffer.
00049 typedef struct {
00050         char *buffer;
00051         int size;       ///< number of bytes unpacked
00052         int maxSize;    ///< number of bytes received
00053 } BufSt;
00054 
00055 static BufSt bufSend = { NULL, 0, 0 };
00056 static BufSt bufRecv = { NULL, 0, 0 };
00057 
00058 InputPortHandler dsGetInputPortByName(const char *name) {
00059         int i=0;
00060 
00061 #ifdef VOID_TRACER
00062         trcEnterState( fd->trcData, VT_OH_GETIPORT );
00063 #endif
00064 
00065         for (i=0; i<fd->numInputPorts; i++) {
00066                 if (0 == strcasecmp(name, fd->inputPorts[i]->name)) {
00067 #ifdef VOID_TRACER
00068                         trcLeaveState( fd->trcData );
00069 #endif
00070                         return i;
00071                 }
00072         }
00073 
00074         fprintf(stderr, "FilterDev.c: ERROR, could not find Inputport with name %s\n", name);
00075 
00076 #ifdef VOID_TRACER
00077         trcLeaveState( fd->trcData );
00078 #endif
00079 
00080         return -1;
00081 }
00082 
00083 OutputPortHandler dsGetOutputPortByName(const char *name) {
00084         int i=0;
00085 
00086 #ifdef VOID_TRACER
00087         trcEnterState( fd->trcData, VT_OH_GETOPORT );
00088 #endif
00089 
00090         for (i=0; i< fd->numOutputPorts; i++) {
00091                 if (0 == strcasecmp(name, fd->outputPorts[i]->name)) {
00092 
00093 #ifdef VOID_TRACER
00094                         trcLeaveState( fd->trcData );
00095 #endif
00096                         return i;
00097                 }
00098         }
00099 
00100         fprintf(stderr, "FilterDev.c: ERROR, could not find Outputport with name %s\n", name);
00101 
00102 #ifdef VOID_TRACER
00103         trcLeaveState( fd->trcData );
00104 #endif
00105 
00106         return -1;
00107 }
00108 
00109 int dsCloseOutputPort(OutputPortHandler oph){
00110 
00111 #ifdef VOID_TRACER
00112         trcEnterState( fd->trcData, VT_OH_CLOSEOPORT );
00113 #endif
00114 
00115         closeOutputPort(fd->outputPorts[oph]);
00116 
00117 #ifdef VOID_TRACER
00118         trcLeaveState( fd->trcData );
00119 #endif
00120 
00121         return 1;
00122 }
00123 
00124 int dsGetNumWriters(InputPortHandler iph){
00125 
00126 #ifdef VOID_TRACER
00127         trcEnterState( fd->trcData, VT_OH_GETNW );
00128 #endif
00129 
00130         if ((iph < 0) || (iph >= fd->numInputPorts)){
00131 #ifdef VOID_TRACER
00132                 trcLeaveState( fd->trcData );
00133 #endif
00134                 return -1;
00135         }
00136         else {
00137 #ifdef VOID_TRACER
00138                 trcLeaveState( fd->trcData );
00139 #endif
00140                 return fd->inputPorts[iph]->numSources;
00141         }
00142 }
00143 
00144 int dsGetNumReaders(OutputPortHandler oph){
00145 #ifdef VOID_TRACER
00146         trcEnterState( fd->trcData, VT_OH_GETNR );
00147 #endif
00148 
00149         if ((oph < 0) || (oph >= fd->numOutputPorts)){
00150 #ifdef VOID_TRACER
00151                 trcLeaveState( fd->trcData );
00152 #endif
00153                 return -1;
00154         }
00155         else {
00156 #ifdef VOID_TRACER
00157                 trcLeaveState( fd->trcData );
00158 #endif
00159                 return fd->outputPorts[oph]->numDestinations;
00160         }
00161 }
00162 
00163 void dsMCast(OutputPort *op, void *buf, int bufSz) {
00164         int i, type = MSGT_F2F;
00165         int taskId = cacheGetCurrentTask();
00166         
00167         pvm_initsend(PvmDataInPlace);
00168         pvm_pkint(&type, 1, 1); //the type of the message
00169         pvm_pkint(&taskId, 1, 1); //task information
00170 #ifdef VOID_TRACER
00171         pvm_pkint(&msgId, 1, 1); //msgId
00172 #endif
00173         pvm_pkbyte((char *) buf, bufSz, 1);
00174 #ifdef VOID_INST
00175         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE_BLOCKED]);
00176 #endif
00177         for ( i = 0; i < op->numDestinations; i++ )
00178         {
00179                 pvm_send( op->tidsDestinations[i], op->tag );
00180         }
00181 //      pvm_mcast(op->tidsDestinations, op->numDestinations, op->tag);
00182 
00183 #ifdef VOID_INST
00184         instLeaveState(fd->instData);
00185 #endif
00186 
00187 }
00188 
00189 void dsSend(int tid, int msgtag, void *buf, int bufSz) {
00190 
00191         int taskId = cacheGetCurrentTask();
00192 
00193         int type = MSGT_F2F;    
00194         pvm_initsend(PvmDataInPlace);
00195         pvm_pkint(&type, 1, 1); //the type of the message
00196         pvm_pkint(&taskId, 1, 1); //task id
00197 #ifdef VOID_TRACER
00198         pvm_pkint(&msgId, 1, 1); //msgId
00199 #endif
00200 
00201         pvm_pkbyte((char *) buf, bufSz, 1);
00202 
00203 #ifdef VOID_INST
00204         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE_BLOCKED]);
00205 #endif  
00206         pvm_send(tid, msgtag);
00207         
00208 #ifdef VOID_INST
00209         instLeaveState(fd->instData);
00210 #endif
00211 
00212 }
00213 
00214 /*int recvData(InputPort *ip) {
00215         int bufId = 0;
00216         struct timeval tmout;
00217         tmout.tv_sec = 10;
00218         tmout.tv_usec = 1;
00219         
00220         // wait for data
00221         while(!bufId){
00222                 bufId = pvm_trecv(-1, ip->tag, &tmout);
00223 #ifdef VOID_TERM
00224                 if(!bufId) {
00225                         if((bufId = pvm_probe(-1, 3))) {
00226                                 // There is filter to filter termination msg to recv
00227                                 int nBytes, instTid;
00228                                 pvm_bufinfo(bufId, &nBytes, NULL, &instTid);
00229                                 pvm_recv(instTid, 3);
00230                         } else
00231                                 if(tdd->status == NOTPARTICIPATING)
00232                                         beginTerminationDetection();
00233                 }
00234 #endif
00235         }
00236         //bufId = pvm_recv(-1, ip->tag);
00237 
00238         return bufId;
00239 }
00240 */
00241 
00242 
00243 // -------------------------------------------------------------------------------//
00244 // send related functions --------------------------------------------------------//
00245 int dsWriteBuffer(OutputPortHandler oph, void *buf, unsigned int bufSz) {
00246 #ifdef VOID_TRACER
00247         trcEnterState( fd->trcData, VT_OH_WBUFFER );
00248 #endif
00249 #ifdef VOID_INST
00250         //enter write state
00251         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE]);
00252 #endif
00253 
00254         int instance = -1;
00255 
00256         OutputPort* op = fd->outputPorts[oph];  
00257         
00258         // BROADCAST policy
00259         switch (op->writePolicy){
00260                 /// 2 things can happen here. Either the message destination is a single instance
00261                 /// or message has multiple destinations. we treat the multicast first, then the 
00262                 /// others
00263 
00264                 /* Multiple destinations --------------------------------*/
00265                 case BROADCAST:
00266                 {
00267 #ifdef VOID_TRACER
00268                         trcEnterState(fd->trcData, VT_COMM_WRITE);
00269 #endif
00270                         //send to all instances
00271                         dsMCast(op, buf, bufSz);
00272 #ifdef VOID_TRACER
00273                         trcLeaveState(fd->trcData);
00274                         trcLeaveState(fd->trcData);
00275 #endif
00276 #ifdef VOID_INST
00277                         instLeaveState(fd->instData);
00278 #endif
00279                         return bufSz;
00280                 }
00281                 case MULTICAST_LABELED_STREAM:
00282                 {
00283                         //controled by the user, multiple destinations
00284 
00285                         // this is the label
00286                         static char label[MAX_LBL_LENGTH];
00287                         static int destArray[MAXINSTANCES], tidsArray[MAXINSTANCES];
00288                         static int clearArray[MAXINSTANCES], clearInit =0;
00289                         if (!clearInit){
00290                                 int i;
00291                                 for (i=0;i<MAXINSTANCES;i++){
00292                                         clearArray[i] = 0;
00293                                 }
00294                                 clearInit = 1;
00295                         }
00296         
00297 
00298                         // gets the label
00299                         // this function extracts the label from a message, user must provide it
00300                         // buf is the message buffer
00301                         // bufSz is the size
00302                         // label is a char[]
00303                         op->lsData.getLabel(buf, bufSz, label);
00304 
00305                         //zero the destArray
00306                         memcpy(destArray, clearArray, sizeof(int)*MAXINSTANCES);
00307 
00308                         // the mlshash function gets a label and fills
00309                         // the destArray with 1 in positions which should receive the message
00310                         op->lsData.mlshash(label, op->numDestinations, destArray);
00311 
00312                         int i, tidsArraySize = 0, lastPosition=0;
00313                         for (i=0;i<op->numDestinations;i++){
00314                                 if (destArray[i] == 1){
00315                                         tidsArraySize++;
00316                                         tidsArray[lastPosition++] = op->tidsDestinations[i];
00317                                 }
00318                         }
00319 
00320                         //we multicast buffer to the tids in tidsArray
00321                         pvm_initsend(PvmDataInPlace);
00322 
00323                         int type = MSGT_F2F;
00324                         pvm_pkint(&type, 1, 1); //message type
00325                         int taskId = cacheGetCurrentTask();
00326                         pvm_pkint(&taskId, 1, 1); //task
00327 #ifdef VOID_TRACER
00328                         pvm_pkint(&msgId, 1, 1); //msgId
00329 #endif
00330                         pvm_pkbyte((char*)buf, bufSz, 1);
00331 
00332 #ifdef VOID_INST
00333                         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE_BLOCKED]);
00334 #endif
00335 #ifdef VOID_TRACER
00336                         trcEnterState( fd->trcData, VT_COMM_WRITE );
00337 #endif
00338                         int info;
00339                         for ( i = 0; i < tidsArraySize; i++ )
00340                         {
00341                                 if ( ( info = pvm_send( tidsArray[i], op->tag )) < 0 )
00342                                 {
00343                                         printf("FilterDev.c: ERROR, in multicast labeled stream\n");
00344 #ifdef VOID_TRACER
00345                                         trcLeaveState(fd->trcData);
00346 #endif
00347                                         return -1;
00348                                 }
00349                         }
00350 //                      int info = pvm_mcast(tidsArray, tidsArraySize, op->tag);
00351 #ifdef VOID_TRACER
00352                         trcLeaveState( fd->trcData );
00353 #endif
00354                         
00355 #ifdef VOID_INST
00356                         //leave blocked
00357                         instLeaveState(fd->instData);
00358 #endif
00359 #ifdef VOID_INST
00360                         //leave write
00361                         instLeaveState(fd->instData);
00362 #endif
00363                 
00364 /*                      if (info < 0){
00365                                 printf("FilterDev.c: ERROR, in multicast labeled stream\n");
00366 
00367 #ifdef VOID_TRACER
00368                         trcLeaveState(fd->trcData);
00369 #endif
00370                                 return -1;
00371                         }
00372 */
00373                         
00374 #ifdef VOID_TRACER
00375                         trcLeaveState(fd->trcData);
00376 #endif
00377                         return bufSz;
00378                 }
00379                 /* end mutiple */
00380 
00381                 /* single destinations -----------------------------------------------*/
00382                 case RANDOM:
00383                 {
00384                         //chose an instance randomly
00385                         instance = lrand48() % op->numDestinations;
00386                         break;
00387                 }
00388                 case LABELED_STREAM:
00389                 {
00390                         //controled by the user, single destination
00391 
00392                         // this is the label
00393                         static char label[MAX_LBL_LENGTH];
00394 
00395 #ifdef VOID_INST
00396                         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_LS]);      
00397 #endif
00398                         // gets the label
00399                         // this function extracts the label from a message, user must provide it
00400                         // buf is the message buffer
00401                         // bufSz is the size
00402                         // label is a char[]
00403                         op->lsData.getLabel(buf, bufSz, label);
00404                         // the hash function gets which instance will receive the message
00405                         // it receives the label, and the number of destinations
00406                         instance = op->lsData.hash(label, op->numDestinations);
00407 
00408 #ifdef VOID_INST
00409                         //leave labeled stream
00410                         instLeaveState(fd->instData);
00411 #endif
00412 
00413                         // we dont trust user return, and check if the instance is valid
00414                         while (instance < 0)
00415                                 instance += op->numDestinations;
00416                         if (instance >= op->numDestinations)
00417                                 instance %= op->numDestinations;
00418 
00419                         break;
00420                 }
00421                 default: {
00422                         //round robin, default policy
00423                         instance = op->nextToSend;
00424                         op->nextToSend = (op->nextToSend+1) % op->numDestinations;
00425                         break;
00426                 }
00427                 /* end single */
00428         }
00429 
00430         // sends the buffer
00431         assert(instance >= 0);
00432 
00433         /* Put this tracing entrance here, instead of in dsSend body, 
00434          * because os the state attributes ( writePolicy );
00435          */
00436 #ifdef VOID_TRACER
00437         trcEnterState(fd->trcData, VT_COMM_WRITE);
00438 #endif
00439         dsSend(op->tidsDestinations[instance], op->tag, buf, bufSz);
00440 #ifdef VOID_TRACER
00441         trcLeaveState(fd->trcData);
00442 #endif
00443 
00444 #ifdef VOID_INST
00445         //leave write
00446         instLeaveState(fd->instData);
00447 #endif
00448 #ifdef VOID_TRACER
00449         trcLeaveState(fd->trcData);
00450 #endif
00451         return bufSz;
00452 }
00453 
00454 /// init packing, size in bytes
00455 /// \warning Multithread apps will crash if they use pack and unpack.
00456 /// \deprecated Packing and unpacking is deprecated, you should use only 
00457 /// dsWriteBuffer() and dsReadBuffer().
00458 int dsInitPack(int initSize){
00459 #ifdef VOID_TRACER
00460         trcEnterState( fd->trcData, VT_OH_INITPACK );
00461 #endif
00462 
00463         //we free the buffer if its not null
00464         if (bufSend.buffer != NULL){
00465                 free(bufSend.buffer);
00466         }
00467         
00468         int trueSize;
00469 
00470         if (initSize < MINSIZE)
00471                 trueSize = MINSIZE;
00472         else
00473                 trueSize = initSize;
00474         
00475         if ((bufSend.buffer = (char *)malloc(sizeof(char)*trueSize))==NULL){
00476                 printf("FilterDev/FilterDev.c: error, could not allocate memory for buffer\n");
00477 #ifdef VOID_TRACER
00478                 trcLeaveState(fd->trcData);
00479 #endif
00480                 return -1;
00481         }
00482 
00483 
00484         bufSend.size = 0;
00485         bufSend.maxSize = trueSize;
00486         // Coutinho: dsWritePackedBuffer() will put the message type and the taskId
00487 /*      //the message type
00488         int type = MSGT_F2F;
00489         dsPackData(&type, sizeof(int));
00490         //the message task
00491         int taskId = cacheGetCurrentTask();
00492         dsPackData(&taskId, sizeof(int));*/
00493 
00494 #ifdef VOID_TRACER
00495         dsPackData(&msgId, sizeof(int));
00496         trcLeaveState(fd->trcData);
00497 #endif
00498 
00499         return 1;
00500         
00501 }
00502 
00503 /// function to pack data to the buffer
00504 /// \warning Multithread apps will crash if they use pack and unpack.
00505 /// \deprecated Packing and unpacking is deprecated, you should use only 
00506 /// dsWriteBuffer() and dsReadBuffer().
00507 int dsPackData(void *data, int size){
00508 #ifdef VOID_TRACER
00509         trcEnterState( fd->trcData, VT_OH_PACK );
00510 #endif
00511 
00512         //we only pack void*
00513         if (bufSend.size + size > bufSend.maxSize){
00514                 //DANGER!!!!!!!!!!!!!!!!!!!!!!!!!!!
00515                 //careful with realloc
00516                 bufSend.maxSize = bufSend.size + size;
00517                 bufSend.buffer = (char*)realloc(bufSend.buffer, bufSend.maxSize*sizeof(char));
00518         }
00519 
00520         memcpy(&bufSend.buffer[bufSend.size], data, size);
00521         bufSend.size+= size;
00522 #ifdef VOID_TRACER
00523         trcLeaveState( fd->trcData );
00524 #endif
00525         return 1;
00526 }
00527 
00528 
00529 /// function to send the packed buffer
00530 /// \warning Multithread apps will crash if they use pack and unpack.
00531 /// \deprecated Packing and unpacking is deprecated, you should use only 
00532 /// dsWriteBuffer() and dsReadBuffer().
00533 int dsWritePackedBuffer(OutputPortHandler oph){
00534         /* Yes: WEIRD !
00535          * Just for log
00536          * */
00537 #ifdef VOID_TRACER
00538         trcEnterState( fd->trcData, VT_OH_WPBUFFER );
00539         trcLeaveState( fd->trcData );
00540 #endif
00541         return dsWriteBuffer(oph, bufSend.buffer, bufSend.size);
00542 
00543         //why the hell we were doing all this again(commented)?? 
00544 
00545 /*      int instance = -1;
00546 
00547         OutputPort* op = fd->outputPorts[oph];  
00548         
00549         // BROADCAST policy, we treat it differently
00550         if (op->writePolicy == BROADCAST) {
00551                 dsMCast(op, bufSend.buffer, bufSend.size);
00552                 return bufSend.size;
00553         }
00554 
00555         //now the usual policies, we need to get the destination instance first
00556         if (op->writePolicy == RANDOM) {
00557                 instance = lrand48() % op->numDestinations;
00558         }
00559         else if (op->writePolicy == LABELED_STREAM){
00560                 // this is the label
00561                 static char label[MAX_LBL_LENGTH];
00562 
00563                 // gets the label
00564                 // this function extracts the label from a message, user must provide it
00565                 // buf is the message buffer
00566                 // bufSz is the size
00567                 // label is a char[]
00568                 op->lsData.getLabel(bufSend.buffer, bufSend.size, label);
00569                 // the hash function gets which instance will receive the message
00570                 // it receives the label, and the number of destinations
00571                 instance = op->lsData.hash(label, op->numDestinations);
00572                 // we dont trust user return, and check if the instance is valid
00573                 while (instance < 0)
00574                         instance += op->numDestinations;
00575                 if (instance >= op->numDestinations)
00576                         instance %= op->numDestinations;
00577         } 
00578         else {
00579                 //round robin
00580                 instance = op->nextToSend;
00581                 op->nextToSend = (op->nextToSend+1) % op->numDestinations;
00582         }
00583 
00584         // sends the buffer
00585         assert(instance >= 0);
00586         dsSend(op->tidsDestinations[instance], op->tag, bufSend.buffer, bufSend.size);
00587         //pvm_psend(op->tidsDestinations[instance], op->tag, bufSend.buffer, bufSend.size, PVM_BYTE);
00588         return bufSend.size;*/
00589 }
00590 
00591 void taskSend(int taskId, int *deps, int depSize, char *metadata, int metaSize, int creatorTid){
00592         int i, j;
00593         int type = MSGT_CREATETASK;
00594         OutputPort* op;
00595 
00596         for(i = 0; i < fd->numOutputPorts; i++){
00597                 op = fd->outputPorts[i];
00598 
00599                 int willSendToCreator = 0;
00600                 for ( j = 0; j < op->numDestinations; j++ ) {
00601                         if ( creatorTid == op->tidsDestinations[j] ) {
00602                                 willSendToCreator = 1;
00603                                 break;
00604                         }
00605                 }
00606                 
00607                 // Do not resend task creation message to creator filter instances
00608                 if (!willSendToCreator) {       
00609                         pvm_initsend(PvmDataInPlace);
00610                         pvm_pkint(&type, 1, 1); //message type
00611                         pvm_pkint(&taskId, 1, 1);
00612 #ifdef VOID_TRACER
00613                         pvm_pkint(&msgId, 1, 1);
00614 #endif
00615                         // Hack from Coutos to avoid resending 
00616                         // task creation message to creator filter instances
00617                         pvm_pkint( &creatorTid, 1, 1);
00618         
00619                         pvm_pkint(&depSize, 1, 1);
00620                         if (depSize > 0) pvm_pkint(deps, depSize, 1);
00621                         
00622                         pvm_pkint(&metaSize, 1, 1);
00623                         if (metaSize > 0) pvm_pkbyte((char *) metadata, metaSize, 1);
00624 
00625                         for ( j = 0; j < op->numDestinations; j++ ) {
00626                                         pvm_send( op->tidsDestinations[j], op->tag );
00627                         }
00628                         //pvm_mcast(op->tidsDestinations, op->numDestinations, op->tag);                        
00629                         
00630 #ifdef DEBUG
00631                         int k;  
00632                         fprintf(stderr, "tid %d, tag %d: Multcast de 8+%d bytes p/ instancias de tids:", pvm_mytid(), op->tag, 2*sizeof(int)+metaSize+(depSize*sizeof(int)));
00633                         for (k=0; k<op->numDestinations; k++) {
00634                                 fprintf(stderr, " %d", op->tidsDestinations[k]);
00635                         }
00636                         fprintf(stderr, "\n");
00637 #endif                  
00638                 }
00639 
00640         }
00641 }
00642 
00643 int endTaskSend(int taskId, int enderTid){
00644         int i, j, type = MSGT_ENDTASK;
00645         OutputPort* op = NULL;
00646 
00647         for(i = 0; i < fd->numOutputPorts; i++){
00648                 op = fd->outputPorts[i];
00649         
00650                 int willSendToEnder = 0;
00651                 for ( j = 0; j < op->numDestinations; j++ ) {
00652                         if ( enderTid == op->tidsDestinations[j] ) {
00653                                 willSendToEnder = 1;
00654                         }
00655                 }       
00656         
00657                 if (!willSendToEnder) {
00658                         pvm_initsend(PvmDataInPlace);
00659                         pvm_pkint(&type, 1, 1); //message type
00660                         pvm_pkint(&taskId, 1, 1);
00661 #ifdef VOID_TRACER
00662                         pvm_pkint(&msgId, 1, 1);
00663 #endif
00664                         pvm_pkint(&enderTid, 1, 1);
00665         
00666                         for ( j = 0; j < op->numDestinations; j++ ) {
00667                                         pvm_send( op->tidsDestinations[j], op->tag );
00668                         }
00669                         //pvm_mcast(op->tidsDestinations, op->numDestinations, op->tag);                
00670                 
00671 #ifdef DEBUG
00672                         int k;  
00673                         fprintf(stderr, "tid %d, tag %d: Multcast mesgType %d ", pvm_mytid(), op->tag, type);
00674                         for (k=0; k<op->numDestinations; k++) {
00675                                 fprintf(stderr, " %d", op->tidsDestinations[k]);
00676                         }
00677                         fprintf(stderr, "\n");
00678 #endif
00679                 }
00680 
00681         }
00682         return 0;
00683 }
00684 
00685 //----------------------------------------------------------------------------------------//
00686 // recv related functions ----------------------------------------------------------------//
00687 
00688 // this function reads a buffer, of size szBuf
00689 int dsReadBuffer(InputPortHandler iph, void *buf, int szBuf) {
00690 
00691 #ifdef VOID_TRACER
00692         trcEnterState( fd->trcData, VT_OH_RBUFFER );
00693 #endif
00694 
00695 #ifdef VOID_INST
00696         //enter read state
00697         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_READ]);
00698 #endif
00699 
00700         int ret = ERROR;
00701         void *recvBuf = NULL;
00702         int taskId = -1;
00703         int instTid = -1;
00704 
00705         InputPort *ip = fd->inputPorts[iph];
00706 
00707         if ((iph > fd->numInportsAdded) || (iph > fd->numInputPorts)) {
00708                 fprintf(stderr, "%s (%d): iph(%d) > %d or %d\n", __FILE__, __LINE__, 
00709                         iph, fd->numInportsAdded, fd->numInputPorts);
00710         }
00711         
00712         // We try to receive from all instances, and get EOW from all. only then we give our EOW
00713         while(ip->numEowRecv < ip->numSources) {
00714                 int szMsg  = -1;
00715 
00716 /* Rui: recvData is DEAD ! Its code is here now:
00717  * ---------------------------------------------
00718  * WARNING! : Now the recvData's old code is mixed with ReadBuffer's code !
00719  */
00720                 struct timeval tmout;
00721                 tmout.tv_sec = 0;
00722                 tmout.tv_usec = 1000;
00723 #ifdef VOID_TRACER
00724                 struct timeval iitime, ietime;
00725                 gettimeofday( &iitime, NULL );
00726                 trcEnterState( fd->trcData, VT_COMM_READ );
00727                 int incomingMsgId = -1;
00728 #endif
00729                 int bufid=0;
00730                 // wait for data
00731                 while(!bufid){
00732 #ifdef VOID_TRACER
00733                         gettimeofday( &ietime, NULL );
00734                         trcResetCurrentState( fd->trcData );
00735 #endif
00736                         bufid = pvm_trecv(-1, ip->tag, &tmout);
00737 #ifdef VOID_TERM
00738                         if(!bufid) {
00739                                 if((bufid = pvm_probe(-1, 3))) {
00740                                         // There is filter to filter termination msg to recv
00741                                         int nBytes, instTid;
00742                                         pvm_bufinfo(bufid, &nBytes, NULL, &instTid);
00743                                         pvm_recv(instTid, 3);
00744                                 } else {
00745                                         if(tdd->status == NOTPARTICIPATING)
00746                                                 beginTerminationDetection();
00747                                 }
00748                         }
00749 #endif
00750                 }
00751                 
00752                 //we take the msgId
00753                 int status = pvm_bufinfo(bufid, &szMsg, NULL, &instTid);
00754                 szMsg -= (2 * sizeof(int)); //we take the msg type and the tasdId out
00755 
00756                 int msgType;
00757                 pvm_upkint(&msgType, 1, 1); //get the type of message
00758 
00759 /* ---------------------------------------------
00760  * End of old recvData's code
00761  * Here is its old call:
00762  * int bufid = recvData(ip);
00763  * WARNING! : Now the recvData's old code is mixed with ReadBuffer's code !
00764  */
00765                 
00766 #ifdef VOID_TERM
00767                 if(msgType == MSGT_INITTERM) {
00768                         int neighborTermTag; // neighbor's termination detection round tag
00769                         pvm_upkint(&neighborTermTag, 1, 1);
00770                         terminationDetectionRound(instTid, neighborTermTag);
00771 #ifdef VOID_TRACER
00772                         trcLeaveState( fd->trcData, VT_LEAVE_COMM_READ );
00773                         trcEnterState( fd->trcData, VT_IDLE_READ );
00774                         trcLeaveState( fd->trcData );
00775 #endif
00776                 } else {
00777                         updateTermStreamToEmpty(instTid);
00778 #endif
00779                         pvm_upkint(&taskId, 1, 1); //get the task id
00780 #ifdef VOID_TRACER
00781                         pvm_upkint( &incomingMsgId, 1, 1 );
00782                         szMsg -= (1 * sizeof(int)); //if tracer is on, we also take the incomingMsgId out
00783                         trcLeaveState( fd->trcData, VT_LEAVE_COMM_READ );
00784                         trcEnterState( fd->trcData, VT_IDLE_READ );
00785                         trcLeaveState( fd->trcData );
00786 #endif
00787 
00788                         switch (msgType){
00789 
00790                                 case (MSGT_CREATETASK):{
00791                                                 int creatorTid, depSize, metaSize, createTaskReturn;
00792                                                 char *metadata = NULL;
00793                                                 int *deps = NULL;
00794 
00795                                                 // Get creator pvm_tid
00796                                                 pvm_upkint( &creatorTid, 1, 1 );
00797                                                 
00798                                                 // get deps
00799                                                 pvm_upkint(&depSize, 1, 1);
00800                                                 if(depSize > 0){
00801                                                         deps = malloc(sizeof(int) * depSize);
00802                                                         pvm_upkint(deps, depSize,1);
00803                                                 }
00804 
00805                                                 // get metadata
00806                                                 pvm_upkint(&metaSize, 1, 1);
00807                                                 if(metaSize > 0){
00808                                                         metadata = malloc(sizeof(char) * metaSize);
00809                                                         pvm_upkbyte(metadata, metaSize,1);
00810                                                 }
00811                                                 createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
00812                                                 
00813                                                 // marreta do coutos pra criacao de tarefas
00814                                                 // nao ficar dando volta no loop de filtros.. entao ela
00815                                                 // deve morrer no filtro q criou a mesma
00816                                                 if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
00817                                                         taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
00818                                                 }
00819 
00820                                                 break;
00821                                         }
00822                                 case (MSGT_ENDTASK):{
00823                                                 int enderTid;
00824                                                 pvm_upkint( &enderTid, 1, 1);
00825                                                 int endTaskReturn = cacheEndTask(taskId);
00826                                                 // end task and send that to next filter
00827                                                 if((endTaskReturn != E_TASK_NOT_RUNNING) && cacheGetForwardTaskMsgs()){
00828                                                 endTaskSend(taskId, enderTid);
00829                                                 }
00830                                                 break;
00831                                 }
00832                                 case (MSGT_EOW):
00833                                         {
00834                                                 //we received an EOW
00835                                                 ip->numEowRecv++;
00836                                                 break;
00837                                         }
00838                                 case (MSGT_F2F):
00839                                         {
00840                                                 status = dsSetCurrentTask(taskId);
00841                                                 if ((status < 0) && (!cacheGetForwardTaskMsgs()) && (taskId >= 0)) {
00842                                                         // If this filter don't use tasks, create a dummy task
00843                                                         // and change to it (the filter don't use them anyway).
00844                                                         dsCreateTask(taskId, NULL, 0, NULL, 0);
00845                                                         dsSetCurrentTask(taskId);
00846                                                 }
00847 
00848                                                 //filter to filter message
00849                                                 if (szMsg > szBuf) {
00850                                                         // if message is bigger than buffer size we truncate
00851                                                         printf("FilterDev.c: warning, message size(%d) is bigger than buffer(%d), truncating! bufid=%d ip->tag=%d", 
00852                                                                         szMsg, szBuf, bufid, ip->tag);
00853                                                         recvBuf = malloc(szMsg);
00854                                                         status = pvm_upkbyte((char *) recvBuf, szMsg, 1);
00855                                                         ret = szBuf;
00856                                                         memcpy(buf, recvBuf, szBuf);
00857                                                         free(recvBuf);
00858                                                 } else {
00859                                                         //message fits in the buffer
00860                                                         recvBuf = buf;
00861                                                         status = pvm_upkbyte((char *) buf, szMsg, 1);
00862                                                         ret = szMsg;
00863                                                 }
00864 
00865 #ifdef VOID_INST
00866                                                 instLeaveState(fd->instData);
00867 #endif
00868 #ifdef VOID_TRACER
00869                                                 trcLeaveState( fd->trcData );
00870 #endif
00871 
00872                                                 return ret;
00873                                         }
00874                                 default:
00875                                         {
00876                                                 //shouldn't get here
00877                                                 fprintf(stderr, "FilterDev.c: unknown message type\n");
00878                                         }
00879                         }
00880 #ifdef VOID_TERM
00881                 }
00882 #endif
00883         }
00884 
00885 #ifdef VOID_INST
00886         //leave read
00887         instLeaveState(fd->instData);
00888 #endif
00889 #ifdef VOID_TRACER
00890         trcLeaveState( fd->trcData );
00891 #endif
00892         //if we got here, we received all instances EOW. Return it to the user
00893         return EOW;
00894 }
00895 
00896 int recvNonBlockingData(InputPort *ip) {
00897         int bufId = 0;
00898         struct timeval tmout;
00899         tmout.tv_sec = 0;
00900         tmout.tv_usec = 1;
00901         
00902         bufId = pvm_trecv(-1, ip->tag, &tmout);
00903 
00904         return bufId;
00905 }
00906 
00907 
00908 /// returns 0 if there is no data to be received
00909 int dsReadNonBlockingBuffer(InputPortHandler iph, void *buf, int szBuf) {
00910 
00911 #ifdef VOID_TRACER
00912         trcEnterState( fd->trcData, VT_OH_RBUFFER );
00913         trcEnterState( fd->trcData, VT_COMM_READ );
00914         int incomingMsgId = -1;
00915 #endif
00916 
00917 #ifdef VOID_INST
00918         //enter read state
00919         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_READ]);
00920 #endif
00921 
00922         int ret = ERROR;
00923         void *recvBuf = NULL;
00924         int taskId = -1;
00925 
00926         InputPort *ip = fd->inputPorts[iph];
00927 
00928         if ((iph > fd->numInportsAdded) || (iph > fd->numInputPorts)) {
00929                 fprintf(stderr, "%s (%d): iph(%d) > %d or %d\n", __FILE__, __LINE__, 
00930                         iph, fd->numInportsAdded, fd->numInputPorts);
00931         }
00932         
00933         // We try to receive from all instances, and get EOW from all. only then we give our EOW
00934         while(ip->numEowRecv < ip->numSources) {
00935                 int szMsg  = -1;
00936                 int bufid = recvNonBlockingData(ip);
00937                 if(bufid == 0) return 0;
00938                 int instTid;
00939 
00940                 int status = pvm_bufinfo(bufid, &szMsg, NULL, &instTid);
00941                 szMsg -= (2 * sizeof(int)); //we take the msg type and the tasdId out
00942                 
00943                 int msgType;
00944                 pvm_upkint(&msgType, 1, 1); //get the type of message
00945 #ifdef VOID_TERM
00946                 if(msgType == MSGT_INITTERM) {
00947                         int neighborTermTag; // neighbor's termination detection round tag
00948                         pvm_upkint(&neighborTermTag, 1, 1);
00949                         terminationDetectionRound(instTid, neighborTermTag);
00950                 }else {
00951                         updateTermStreamToEmpty(instTid);
00952 #endif
00953                         pvm_upkint(&taskId, 1, 1); //get the task id
00954 
00955                         switch (msgType){
00956                                 case (MSGT_CREATETASK):{
00957                                                 int creatorTid, depSize, metaSize, createTaskReturn;
00958                                                 char *metadata = NULL;
00959                                                 int *deps = NULL;
00960 
00961                                                 // Get creator pvm_tid
00962                                                 pvm_upkint( &creatorTid, 1, 1 );
00963                                                 
00964                                                 // get deps
00965                                                 pvm_upkint(&depSize, 1, 1);
00966                                                 if(depSize > 0){
00967                                                         deps = malloc(sizeof(int) * depSize);
00968                                                         pvm_upkint(deps, depSize,1);
00969                                                 }
00970 
00971                                                 // get metadata
00972                                                 pvm_upkint(&metaSize, 1, 1);
00973                                                 if(metaSize > 0){
00974                                                         metadata = malloc(sizeof(char) * metaSize);
00975                                                         pvm_upkbyte(metadata, metaSize,1);
00976                                                 }
00977                                                 createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
00978                                                 
00979                                                 // marreta do coutos pra criacao de tarefas
00980                                                 // nao ficar dando volta no loop de filtros.. entao ela
00981                                                 // deve morrer no filtro q criou a mesma
00982                                                 if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
00983                                                         taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
00984                                                 }
00985 
00986                                                 break;
00987                                         }
00988         
00989                                 case (MSGT_ENDTASK):{
00990                                                 dsEndTask(taskId);      
00991                                                 break;
00992                                 }
00993                                 case (MSGT_EOW):
00994                                         {
00995                                                 //we received an EOW
00996                                                 ip->numEowRecv++;
00997                                                 break;
00998                                         }
00999                                 case (MSGT_F2F):
01000                                         {
01001 #ifdef VOID_TRACER
01002                                                 pvm_upkint( &incomingMsgId, 1, 1 );
01003 #endif
01004                                                 dsSetCurrentTask(taskId);
01005 
01006                                                 //filter to filter message
01007                                                 if (szMsg > szBuf) {
01008                                                         // if message is bigger than buffer size we truncate
01009                                                         printf("FilterDev.c: warning, message size(%d) is bigger than buffer(%d), truncating! bufid=%d ip->tag=%d", 
01010                                                                         szMsg, szBuf, bufid, ip->tag);
01011                                                         recvBuf = malloc(szMsg);
01012                                                         status = pvm_upkbyte((char *) recvBuf, szMsg, 1);
01013                                                         ret = szBuf;
01014                                                         memcpy(buf, recvBuf, szBuf);
01015                                                         free(recvBuf);
01016                                                 } else {
01017                                                         //message fits in the buffer
01018                                                         recvBuf = buf;
01019                                                         status = pvm_upkbyte((char *) buf, szMsg, 1);
01020                                                         ret = szMsg;
01021                                                 }
01022 
01023 #ifdef VOID_INST
01024                                                 instLeaveState(fd->instData);
01025 #endif
01026 #ifdef VOID_TRACER
01027                                                 trcLeaveState( fd->trcData, instTid, incomingMsgId );
01028                                                 trcLeaveState( fd->trcData );
01029 #endif
01030 
01031                                                 return ret;
01032                                         }
01033                                 default:
01034                                         {
01035                                                 //shouldn't get here
01036                                                 fprintf(stderr, "FilterDev.c: unknown message type\n");
01037                                         }
01038                         }
01039 #ifdef VOID_TERM
01040                 }
01041 #endif
01042         }
01043 
01044 #ifdef VOID_INST
01045         //leave read
01046         instLeaveState(fd->instData);
01047 #endif
01048 #ifdef VOID_TRACER
01049         trcLeaveState( fd->trcData, incomingMsgId );
01050         trcLeaveState( fd->trcData );
01051 #endif
01052         //if we got here, we received all instances EOW. Return it to the user
01053         return EOW;
01054 
01055 }
01056 
01057 /// function to receive a buffer, and unpack later, returning the size
01058 /// \warning Multithread apps will crash if they use pack and unpack.
01059 /// \deprecated Packing and unpacking is deprecated, you should use only 
01060 /// dsWriteBuffer() and dsReadBuffer().
01061 int dsInitReceive(InputPortHandler iph){
01062         
01063 #ifdef VOID_TRACER
01064         /* Another weird one... */
01065         trcEnterState( fd->trcData, VT_OH_IRECEIVE );
01066         trcLeaveState( fd->trcData );
01067 #endif
01068 
01069 /* #ifdef VOID_INST
01070         //enter read state
01071         instEnterState(fd->instData, &fd->instData->voidStates[TIMER_READ]);
01072 #endif
01073         
01074         //return dsReadBuffer(iph, bufRecv.buffer, bufRecv.maxSize);
01075         // Coutinho: doing only a dsREadBuffer() doesn't work because buffer size isn't known apriori
01076 
01077         int taskId = -1;
01078         InputPort *ip = fd->inputPorts[iph];
01079 
01080         if ((iph > fd->numInportsAdded) || (iph > fd->numInputPorts)) {
01081                 fprintf(stderr, "%s (%d): iph(%d) > %d or %d\n", __FILE__, __LINE__, 
01082                         iph, fd->numInportsAdded, fd->numInputPorts);
01083         }
01084         
01085         // We try to receive from all instances, and get EOW from all
01086         while(ip->numEowRecv < ip->numSources) {
01087                 int szMsg  = -1;
01088                 int bufId = recvData(ip);
01089                 int bufTag, tidSrc;
01090                 char *err;
01091 
01092                 switch (bufId){
01093                         case PvmBadParam:
01094                                 printf("FilterDev/FilterDev.c: error receiving buffer(bad param)\n");
01095                                 return -1;
01096                                 break;
01097                         case PvmSysErr:
01098                                 printf("FilterDev/FilterDev.c: error receiving buffer(sys error)\n");
01099                                 return -1;
01100                                 break;
01101                 }
01102 
01103                 pvm_bufinfo(bufId, &szMsg, &bufTag, &tidSrc);
01104                 szMsg -= (2 * sizeof(int)); //we take the msg type and the tasdId out
01105                 
01106                 int msgType;
01107                 pvm_upkint(&msgType, 1, 1); //get the type of message
01108 #ifdef VOID_TERM
01109                 if(msgType == MSGT_INITTERM) {
01110                         int neighborTermTag; // neighbor's termination detection round tag
01111                         pvm_upkint(&neighborTermTag, 1, 1);
01112                         terminationDetectionRound(instTid, neighborTermTag);
01113                 }else {
01114                         updateTermStreamToEmpty(instTid);
01115 #endif
01116                         pvm_upkint(&taskId, 1, 1); //get the task id
01117 
01118                         switch (msgType){
01119 
01120                                 case (MSGT_CREATETASK):{
01121                                                 int creatorTid, depSize, *deps = NULL, metaSize, createTaskReturn;
01122                                                 char *metadata;
01123 
01124                                                 // Get creator pvm Tid
01125                                                 
01126                                                 pvm_upkint(&creatorTid, 1, 1);
01127 
01128                                                 // get deps
01129                                                 pvm_upkint(&depSize, 1, 1);
01130                                                 if(depSize > 0){
01131                                                         deps = malloc(sizeof(int) * depSize);
01132                                                 }
01133                                                 pvm_upkint(deps, depSize,1);
01134 
01135                                                 // get metadata
01136                                                 pvm_upkint(&metaSize, 1, 1);
01137                                                 if(metaSize > 0){
01138                                                         metadata = malloc(sizeof(char) * metaSize);
01139                                                 }
01140                                                 pvm_upkbyte(metadata, metaSize,1);
01141                                                 createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
01142                                                 
01143                                                 // marreta do coutos pra criacao de tarefas
01144                                                 // nao ficar dando volta no loop de filtros.. entao ela
01145                                                 // deve morrer no filtro q criou a mesma
01146                                                 if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
01147                                                         taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
01148                                                 }
01149 
01150                                                 break;
01151                                         }
01152                                 case (MSGT_ENDTASK):{
01153                                                 int enderTid;
01154                                                 pvm_upkint( &enderTid, 1, 1);
01155                                                 int endTaskReturn = cacheEndTask(taskId);
01156                                                 // end task and send that to next filter
01157                                                 if((endTaskReturn != E_TASK_NOT_RUNNING) && cacheGetForwardTaskMsgs()){
01158                                                 endTaskSend(taskId, enderTid);
01159                                                 }
01160                                                 break;
01161                                 }
01162                                 case (MSGT_EOW):
01163                                         {
01164                                                 //we received an EOW
01165                                                 ip->numEowRecv++;
01166                                                 break;
01167                                         }
01168                                 case (MSGT_F2F):
01169                                         {
01170                                                 //filter to filter message
01171                                                 
01172                                                 int status = dsSetCurrentTask(taskId);
01173                                                 if ((status < 0) && (!cacheGetForwardTaskMsgs()) && (taskId >= 0)) {
01174                                                         // If this filter don't use tasks, create a dummy task
01175                                                         // and change to it (the filter don't use them anyway).
01176                                                         dsCreateTask(taskId, NULL, 0, NULL, 0);
01177                                                         dsSetCurrentTask(taskId);
01178                                                 }
01179         
01180                                                 //we free the buffer if its not null
01181                                                 if (bufRecv.buffer != NULL){
01182                                                         free(bufRecv.buffer);
01183                                                 }
01184 
01185                                                 bufRecv.buffer = (char *) malloc(szMsg*sizeof(char));
01186                                                 bufRecv.size = 0;
01187                                                 bufRecv.maxSize = szMsg;
01188 
01189                                                 status = pvm_upkbyte((char *) bufRecv.buffer, szMsg, 1);
01190                                                 if (status < 0) {
01191                                                         switch (status){
01192                                                                 case PvmNoData:
01193                                                                         err = strdup("no data to receive");
01194                                                                         break;
01195                                                                 case PvmBadMsg:
01196                                                                         err = strdup("incompatible decoding");
01197                                                                         break;
01198                                                                 case PvmNoBuf:
01199                                                                         err = strdup("no active buffer, call dsInitReceive first");
01200                                                                         break;
01201                                                                 default:
01202                                                                         err = calloc(1, 1); // Empty string
01203                                                         }
01204                                         
01205                                                         printf("FilterDev/FilterDev.c: error unpacking PVM data: %s\n", err);
01206                                                         free(err);
01207                                                         return -1;
01208                                                 }
01209 
01210 #ifdef VOID_INST
01211                                                 instLeaveState(fd->instData);
01212 #endif
01213 #ifdef VOID_TRACER
01214                                                 trcLeaveState( fd->trcData );
01215 #endif
01216 
01217                                                 return szMsg;
01218                                         }
01219                                 default:
01220                                         {
01221                                                 //shouldn't get here
01222                                                 fprintf(stderr, "FilterDev.c: unknown message type\n");
01223                                         }
01224                         }
01225 #ifdef VOID_TERM
01226                 }
01227 #endif
01228         }
01229 
01230 #ifdef VOID_INST
01231         //leave read
01232         instLeaveState(fd->instData);
01233 #endif
01234 #ifdef VOID_TRACER
01235         trcLeaveState( fd->trcData );
01236 #endif
01237         //if we got here, we received all instances EOW. Return it to the user
01238 */
01239         return EOW;
01240 }
01241 
01242 /// function to unpack data
01243 /// \warning Multithread apps will crash if they use pack and unpack.
01244 /// \deprecated Packing and unpacking is deprecated, you should use only 
01245 /// dsWriteBuffer() and dsReadBuffer().
01246 int dsUnpackData(void *buf, int size){
01247 
01248 #ifdef VOID_TRACER
01249         trcEnterState( fd->trcData, VT_OH_UNPACK );
01250 #endif
01251 
01252         if(bufRecv.size+size > bufRecv.maxSize) {
01253                 printf("FilterDev/FilterDev.c: Reading beyond the end of the receive buffer.\n");
01254 #ifdef VOID_TRACER      
01255                 trcLeaveState( fd->trcData );
01256 #endif
01257                 return -1;
01258         }
01259 
01260         memcpy(buf, &(bufRecv.buffer[bufRecv.size]), size);
01261 
01262         bufRecv.size += size;
01263 #ifdef VOID_TRACER      
01264         trcLeaveState( fd->trcData );
01265 #endif
01266         /// \todo Modify this to return what it really read
01267         return size;
01268 }
01269 
01270 /// Returns filter name
01271 char *dsGetFilterName() {
01272         return fd->name;
01273 }
01274 
01275 /// Returns filter id
01276 int dsGetFilterId() {
01277         return fd->id;
01278 }
01279 
01280 /// function to get the number of one filter input ports
01281 int dsGetNumInputPorts() {
01282         return fd->numInputPorts;
01283 }
01284 
01285 /// function to get all input ports names of one filter
01286 char **dsGetInputPortNames() {
01287         char **imputPortNames = (char **) malloc(fd->numInputPorts);
01288         int i, nameSz;
01289 
01290         for(i = 0; i < fd->numInputPorts; i++) {
01291                 nameSz = strlen(fd->inputPorts[i]->name);
01292                 imputPortNames[i] = (char *) malloc(nameSz+1);
01293                 strcpy (imputPortNames[i], fd->inputPorts[i]->name);
01294         }
01295 
01296         return imputPortNames;
01297 }
01298 
01299 /// function to get the number of one filter output ports
01300 int dsGetNumOutputPorts() {
01301         return fd->numOutputPorts;
01302 }
01303 
01304 /// function to get all output ports names of one filter
01305 char **dsGetOutputPortNames() {
01306         char **outputPortNames = (char **) malloc(fd->numOutputPorts);
01307         int i, nameSz;
01308 
01309         for(i = 0; i < fd->numOutputPorts; i++) {
01310                 nameSz = strlen(fd->outputPorts[i]->name);
01311                 outputPortNames[i] = (char *) malloc(nameSz+1);
01312                 strcpy (outputPortNames[i], fd->outputPorts[i]->name);
01313         }
01314 
01315         return outputPortNames;
01316 }
01317 
01318 /// get the number of writers to me that are still running
01319 int dsGetNumUpStreamsRunning() {
01320         int i;
01321         int numUpStreamsRunning = 0;
01322 
01323         for(i = 0; i < fd->numInputPorts; i++) {
01324                 if(fd->inputPorts[i]->numEowRecv < fd->inputPorts[i]->numSources)
01325                         numUpStreamsRunning++;
01326         }
01327 
01328         return numUpStreamsRunning;
01329 }
01330 
01331 /// which instance of the filter am I?
01332 int dsGetMyRank(){
01333 #ifdef VOID_TRACER
01334         trcEnterState( fd->trcData, VT_OH_GETMYRANK );
01335 #endif
01336         int a = fd->myRank;
01337 #ifdef VOID_TRACER
01338         trcLeaveState( fd->trcData );
01339 #endif
01340         return a;
01341 }
01342 
01343 /// Returns the amount of this filter's instances
01344 int dsGetTotalInstances(){
01345 #ifdef VOID_TRACER
01346         trcEnterState( fd->trcData, VT_OH_GETTI );
01347         trcLeaveState( fd->trcData );
01348 #endif
01349 
01350           return fd->numInstances;
01351 }
01352 
01353 /// Exit the system and shutdown the pipeline. 
01354 int dsExit(char *userMesg){
01355 #ifdef VOID_TRACER
01356         trcEnterState( fd->trcData, VT_OH_EXIT );
01357 #endif
01358 
01359         int length = 0;
01360         char voidMesg[1001];
01361   
01362         length = strlen(userMesg);
01363         if(length > 1000){
01364                 printf("FilterDev.c: Warning: message size bigger than 1000 chars, truncating\n");
01365                 length = 1000;
01366         }
01367         strncpy(voidMesg, userMesg, length);
01368         voidMesg[length] = '\0'; 
01369 
01370         pvm_initsend(PvmDataRaw);
01371         pvm_pkbyte(voidMesg, strlen(voidMesg)+1, 1);
01372         pvm_send(pvm_parent(), MSGT_AEXIT); //we send with the right tag
01373         pvm_exit();     
01374 #ifdef VOID_TRACER
01375         trcLeaveState( fd->trcData );
01376         trcDestroyData( &(fd->trcData) );
01377 #endif
01378         exit(-1);
01379 }
01380 
01381 /// probe the port for message, returns 0 if no message, message size otherwise
01382 int dsProbe(InputPortHandler iph){
01383 #ifdef VOID_TRACER
01384         trcEnterState( fd->trcData, VT_OH_PROBE );
01385 #endif
01386 
01387         int tag, tid = -1, size=0;
01388         int tag2, tid2;
01389         
01390         //gets the port
01391         InputPort *ip = fd->inputPorts[iph];
01392 
01393         //get the stream tag
01394         tag = ip->tag;
01395         
01396         int probRes = pvm_probe(tid, tag);
01397         if (probRes == 0){
01398 #ifdef VOID_TRACER
01399                 trcLeaveState( fd->trcData );
01400 #endif
01401                 return 0;
01402         } else{
01403                 if (pvm_bufinfo(probRes, &size, &tag2, &tid2) < 0){
01404                         //error
01405                         printf("probe error, return 0\n");
01406 #ifdef VOID_TRACER
01407                         trcLeaveState( fd->trcData );
01408 #endif
01409                         return 0;
01410                 }
01411 #ifdef VOID_TRACER
01412                 trcLeaveState( fd->trcData );
01413 #endif
01414                 return size - sizeof(int)*2; //we dont return the msg type nor taskId
01415         }
01416 }
01417 
01418 int dsGetMachineMemory(){
01419 #ifdef VOID_TRACER
01420         trcEnterState( fd->trcData, VT_OH_GETMEM );
01421         trcLeaveState( fd->trcData );
01422 #endif
01423         return getFDMachineMem(fd);
01424 }
01425 
01426 int dsGetLocalInstances(){
01427 #ifdef VOID_TRACER
01428         trcEnterState( fd->trcData, VT_OH_GETLI );
01429         trcLeaveState( fd->trcData );
01430 #endif
01431         return getFDLocalInstances(fd);
01432 }
01433 
01434 
01435 /** Task functions *******************************************/
01436 
01437 void dsUseTasks() {
01438         cacheSetUseTasks();     
01439 }
01440 
01441 int dsGetCurrentTask(){
01442 #ifdef VOID_TRACER
01443         trcEnterState( fd->trcData, VT_OH_GETTASK );
01444 #endif
01445         int ret = cacheGetCurrentTask();
01446 #ifdef VOID_TRACER
01447         trcLeaveState( fd->trcData );
01448 #endif
01449         return ret;
01450 }
01451 
01452 int dsSetCurrentTask(int taskId) {
01453 #ifdef VOID_TRACER
01454         trcEnterState( fd->trcData, VT_OH_SETTASK );
01455 #endif
01456         int rvalue = cacheSetCurrentTask(taskId);
01457 #ifdef VOID_TRACER
01458         trcLeaveState( fd->trcData );
01459 #endif
01460         return rvalue;
01461 }
01462 
01463 int *dsGetTaskDeps(int taskId, int *depsSz) {
01464         return cacheGetTaskDeps(taskId, depsSz);
01465 }
01466 
01467 
01468 int dsCreateTask(int taskId, int *deps, int depSize, char *metadata, int metaSize){
01469 
01470 #ifdef VOID_TRACER
01471         trcEnterState( fd->trcData, VT_OH_CREATETASK );
01472 #endif
01473         int createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
01474                         
01475         // marreta do coutos pra criacao de tarefas
01476         // nao ficar dando volta no loop de filtros.. entao ela
01477         // deve morrer no filtro q criou a mesma
01478         if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
01479                 int creatorTid = pvm_mytid();
01480                 taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
01481         }
01482 #ifdef VOID_TRACER
01483         trcLeaveState( fd->trcData );
01484 #endif
01485 
01486         return createTaskReturn;
01487 }
01488 
01489 
01490 int dsEndTask(int taskId){
01491 #ifdef VOID_TRACER
01492         trcEnterState( fd->trcData, VT_OH_ENDTASK );
01493 #endif
01494         int endTaskReturn = cacheEndTask(taskId);
01495         // end task and send that to next filter
01496         int enderTid = pvm_mytid();
01497         if((endTaskReturn != E_TASK_NOT_RUNNING) && cacheGetForwardTaskMsgs()){
01498                 endTaskSend(taskId, enderTid);
01499         }
01500 #ifdef VOID_TRACER
01501         trcLeaveState( fd->trcData );
01502 #endif
01503 
01504         return endTaskReturn;
01505 }
01506 
01507 
01508 /** Data functions *************************************************/
01509 int dsPutData(char *id, void *val, int valSize){
01510         return cachePutData(id, val, valSize);
01511 }
01512 
01513 void *dsGetData(int taskId, char *id, int *valSz){
01514         return cacheGetData(taskId, id, valSz);
01515 }
01516 
01517 int dsRemoveData(char *id){
01518         return cacheRemoveData(id);
01519 }
01520 
01521 // Coutinho: in C its not necessary to declare a global variable as static, the
01522 // static of C isn't equal the object orientation concept of a static class member
01523 //int filterDevVoidInstState;
01524 
01525 void dsInstSetStates(char **states, int numStates){
01526 #ifdef VOID_INST
01527                 instSetUserStates(fd->instData, states, numStates);
01528 #endif
01529 }
01530 
01531 void dsInstSwitchState(int stateId){
01532 #ifdef VOID_INST
01533         instSwitchState(fd->instData, &fd->instData->userStates[stateId]);
01534 #endif
01535 }
01536 
01537 void dsInstEnterState(int stateId){
01538 #ifdef VOID_INST
01539         //we save the void state if its first time calling this
01540         instEnterState(fd->instData, &fd->instData->userStates[stateId]);
01541 #endif
01542 }
01543 
01544 void dsInstLeaveState(){
01545 #ifdef VOID_INST
01546         instLeaveState(fd->instData);
01547 #endif
01548 }
01549 
01550 int dsGetRunningTasks( int * numTasks, int ** taskList )
01551 {
01552         return cacheGetRunningTasks( numTasks, taskList );
01553 }
01554 
01555 int *dsGetFinishedTasks(int *numTasks) {
01556         return cacheGetFinishedTasks(numTasks); 
01557 }
01558 
01559 int dsRegisterRecoverCallback( RecoverCallback_t *callback ) {
01560         return cacheRegisterRecoverCallback(callback);
01561 }

Generated on Tue Jan 17 19:18:38 2006 for Void by  doxygen 1.4.6