00001 #include <pvm3.h>
00002 #include <stdlib.h>
00003 #include <assert.h>
00004 #include <signal.h>
00005 #include <sys/time.h>
00006 #include <string.h>
00007 #include <unistd.h>
00008 #include "FilterDev.h"
00009
00010
00011 #include <Task/Task.h>
00012 #include <DataSpace/DataSpace.h>
00013 #include <Cache.h>
00014
00015 #include "../FilterData/FilterData.h"
00016 #include "../FilterData/Termination.h"
00017 #include "../constants.h"
00018 #include "../Messages.h"
00019
00020 #define KEY_INT
00021 #define VAL_VOID
00022 #include "hash.h"
00023 #undef VAL_VOID
00024 #undef KEY_INT
00025
00026
00027 #ifdef VOID_INST
00028 #include "../FilterData/Instrumentation.h"
00029 #endif
00030 #ifdef VOID_TRACER
00031 #include <Tracer/tracer.h>
00032 #include "tracingdefs.h"
00033 extern int msgId;
00034 #endif
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 typedef struct {
00050 char *buffer;
00051 int size;
00052 int maxSize;
00053 } BufSt;
00054
00055 static BufSt bufSend = { NULL, 0, 0 };
00056 static BufSt bufRecv = { NULL, 0, 0 };
00057
00058 InputPortHandler dsGetInputPortByName(const char *name) {
00059 int i=0;
00060
00061 #ifdef VOID_TRACER
00062 trcEnterState( fd->trcData, VT_OH_GETIPORT );
00063 #endif
00064
00065 for (i=0; i<fd->numInputPorts; i++) {
00066 if (0 == strcasecmp(name, fd->inputPorts[i]->name)) {
00067 #ifdef VOID_TRACER
00068 trcLeaveState( fd->trcData );
00069 #endif
00070 return i;
00071 }
00072 }
00073
00074 fprintf(stderr, "FilterDev.c: ERROR, could not find Inputport with name %s\n", name);
00075
00076 #ifdef VOID_TRACER
00077 trcLeaveState( fd->trcData );
00078 #endif
00079
00080 return -1;
00081 }
00082
00083 OutputPortHandler dsGetOutputPortByName(const char *name) {
00084 int i=0;
00085
00086 #ifdef VOID_TRACER
00087 trcEnterState( fd->trcData, VT_OH_GETOPORT );
00088 #endif
00089
00090 for (i=0; i< fd->numOutputPorts; i++) {
00091 if (0 == strcasecmp(name, fd->outputPorts[i]->name)) {
00092
00093 #ifdef VOID_TRACER
00094 trcLeaveState( fd->trcData );
00095 #endif
00096 return i;
00097 }
00098 }
00099
00100 fprintf(stderr, "FilterDev.c: ERROR, could not find Outputport with name %s\n", name);
00101
00102 #ifdef VOID_TRACER
00103 trcLeaveState( fd->trcData );
00104 #endif
00105
00106 return -1;
00107 }
00108
00109 int dsCloseOutputPort(OutputPortHandler oph){
00110
00111 #ifdef VOID_TRACER
00112 trcEnterState( fd->trcData, VT_OH_CLOSEOPORT );
00113 #endif
00114
00115 closeOutputPort(fd->outputPorts[oph]);
00116
00117 #ifdef VOID_TRACER
00118 trcLeaveState( fd->trcData );
00119 #endif
00120
00121 return 1;
00122 }
00123
00124 int dsGetNumWriters(InputPortHandler iph){
00125
00126 #ifdef VOID_TRACER
00127 trcEnterState( fd->trcData, VT_OH_GETNW );
00128 #endif
00129
00130 if ((iph < 0) || (iph >= fd->numInputPorts)){
00131 #ifdef VOID_TRACER
00132 trcLeaveState( fd->trcData );
00133 #endif
00134 return -1;
00135 }
00136 else {
00137 #ifdef VOID_TRACER
00138 trcLeaveState( fd->trcData );
00139 #endif
00140 return fd->inputPorts[iph]->numSources;
00141 }
00142 }
00143
00144 int dsGetNumReaders(OutputPortHandler oph){
00145 #ifdef VOID_TRACER
00146 trcEnterState( fd->trcData, VT_OH_GETNR );
00147 #endif
00148
00149 if ((oph < 0) || (oph >= fd->numOutputPorts)){
00150 #ifdef VOID_TRACER
00151 trcLeaveState( fd->trcData );
00152 #endif
00153 return -1;
00154 }
00155 else {
00156 #ifdef VOID_TRACER
00157 trcLeaveState( fd->trcData );
00158 #endif
00159 return fd->outputPorts[oph]->numDestinations;
00160 }
00161 }
00162
00163 void dsMCast(OutputPort *op, void *buf, int bufSz) {
00164 int i, type = MSGT_F2F;
00165 int taskId = cacheGetCurrentTask();
00166
00167 pvm_initsend(PvmDataInPlace);
00168 pvm_pkint(&type, 1, 1);
00169 pvm_pkint(&taskId, 1, 1);
00170 #ifdef VOID_TRACER
00171 pvm_pkint(&msgId, 1, 1);
00172 #endif
00173 pvm_pkbyte((char *) buf, bufSz, 1);
00174 #ifdef VOID_INST
00175 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE_BLOCKED]);
00176 #endif
00177 for ( i = 0; i < op->numDestinations; i++ )
00178 {
00179 pvm_send( op->tidsDestinations[i], op->tag );
00180 }
00181
00182
00183 #ifdef VOID_INST
00184 instLeaveState(fd->instData);
00185 #endif
00186
00187 }
00188
00189 void dsSend(int tid, int msgtag, void *buf, int bufSz) {
00190
00191 int taskId = cacheGetCurrentTask();
00192
00193 int type = MSGT_F2F;
00194 pvm_initsend(PvmDataInPlace);
00195 pvm_pkint(&type, 1, 1);
00196 pvm_pkint(&taskId, 1, 1);
00197 #ifdef VOID_TRACER
00198 pvm_pkint(&msgId, 1, 1);
00199 #endif
00200
00201 pvm_pkbyte((char *) buf, bufSz, 1);
00202
00203 #ifdef VOID_INST
00204 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE_BLOCKED]);
00205 #endif
00206 pvm_send(tid, msgtag);
00207
00208 #ifdef VOID_INST
00209 instLeaveState(fd->instData);
00210 #endif
00211
00212 }
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245 int dsWriteBuffer(OutputPortHandler oph, void *buf, unsigned int bufSz) {
00246 #ifdef VOID_TRACER
00247 trcEnterState( fd->trcData, VT_OH_WBUFFER );
00248 #endif
00249 #ifdef VOID_INST
00250
00251 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE]);
00252 #endif
00253
00254 int instance = -1;
00255
00256 OutputPort* op = fd->outputPorts[oph];
00257
00258
00259 switch (op->writePolicy){
00260
00261
00262
00263
00264
00265 case BROADCAST:
00266 {
00267 #ifdef VOID_TRACER
00268 trcEnterState(fd->trcData, VT_COMM_WRITE);
00269 #endif
00270
00271 dsMCast(op, buf, bufSz);
00272 #ifdef VOID_TRACER
00273 trcLeaveState(fd->trcData);
00274 trcLeaveState(fd->trcData);
00275 #endif
00276 #ifdef VOID_INST
00277 instLeaveState(fd->instData);
00278 #endif
00279 return bufSz;
00280 }
00281 case MULTICAST_LABELED_STREAM:
00282 {
00283
00284
00285
00286 static char label[MAX_LBL_LENGTH];
00287 static int destArray[MAXINSTANCES], tidsArray[MAXINSTANCES];
00288 static int clearArray[MAXINSTANCES], clearInit =0;
00289 if (!clearInit){
00290 int i;
00291 for (i=0;i<MAXINSTANCES;i++){
00292 clearArray[i] = 0;
00293 }
00294 clearInit = 1;
00295 }
00296
00297
00298
00299
00300
00301
00302
00303 op->lsData.getLabel(buf, bufSz, label);
00304
00305
00306 memcpy(destArray, clearArray, sizeof(int)*MAXINSTANCES);
00307
00308
00309
00310 op->lsData.mlshash(label, op->numDestinations, destArray);
00311
00312 int i, tidsArraySize = 0, lastPosition=0;
00313 for (i=0;i<op->numDestinations;i++){
00314 if (destArray[i] == 1){
00315 tidsArraySize++;
00316 tidsArray[lastPosition++] = op->tidsDestinations[i];
00317 }
00318 }
00319
00320
00321 pvm_initsend(PvmDataInPlace);
00322
00323 int type = MSGT_F2F;
00324 pvm_pkint(&type, 1, 1);
00325 int taskId = cacheGetCurrentTask();
00326 pvm_pkint(&taskId, 1, 1);
00327 #ifdef VOID_TRACER
00328 pvm_pkint(&msgId, 1, 1);
00329 #endif
00330 pvm_pkbyte((char*)buf, bufSz, 1);
00331
00332 #ifdef VOID_INST
00333 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_WRITE_BLOCKED]);
00334 #endif
00335 #ifdef VOID_TRACER
00336 trcEnterState( fd->trcData, VT_COMM_WRITE );
00337 #endif
00338 int info;
00339 for ( i = 0; i < tidsArraySize; i++ )
00340 {
00341 if ( ( info = pvm_send( tidsArray[i], op->tag )) < 0 )
00342 {
00343 printf("FilterDev.c: ERROR, in multicast labeled stream\n");
00344 #ifdef VOID_TRACER
00345 trcLeaveState(fd->trcData);
00346 #endif
00347 return -1;
00348 }
00349 }
00350
00351 #ifdef VOID_TRACER
00352 trcLeaveState( fd->trcData );
00353 #endif
00354
00355 #ifdef VOID_INST
00356
00357 instLeaveState(fd->instData);
00358 #endif
00359 #ifdef VOID_INST
00360
00361 instLeaveState(fd->instData);
00362 #endif
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374 #ifdef VOID_TRACER
00375 trcLeaveState(fd->trcData);
00376 #endif
00377 return bufSz;
00378 }
00379
00380
00381
00382 case RANDOM:
00383 {
00384
00385 instance = lrand48() % op->numDestinations;
00386 break;
00387 }
00388 case LABELED_STREAM:
00389 {
00390
00391
00392
00393 static char label[MAX_LBL_LENGTH];
00394
00395 #ifdef VOID_INST
00396 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_LS]);
00397 #endif
00398
00399
00400
00401
00402
00403 op->lsData.getLabel(buf, bufSz, label);
00404
00405
00406 instance = op->lsData.hash(label, op->numDestinations);
00407
00408 #ifdef VOID_INST
00409
00410 instLeaveState(fd->instData);
00411 #endif
00412
00413
00414 while (instance < 0)
00415 instance += op->numDestinations;
00416 if (instance >= op->numDestinations)
00417 instance %= op->numDestinations;
00418
00419 break;
00420 }
00421 default: {
00422
00423 instance = op->nextToSend;
00424 op->nextToSend = (op->nextToSend+1) % op->numDestinations;
00425 break;
00426 }
00427
00428 }
00429
00430
00431 assert(instance >= 0);
00432
00433
00434
00435
00436 #ifdef VOID_TRACER
00437 trcEnterState(fd->trcData, VT_COMM_WRITE);
00438 #endif
00439 dsSend(op->tidsDestinations[instance], op->tag, buf, bufSz);
00440 #ifdef VOID_TRACER
00441 trcLeaveState(fd->trcData);
00442 #endif
00443
00444 #ifdef VOID_INST
00445
00446 instLeaveState(fd->instData);
00447 #endif
00448 #ifdef VOID_TRACER
00449 trcLeaveState(fd->trcData);
00450 #endif
00451 return bufSz;
00452 }
00453
00454
00455
00456
00457
00458 int dsInitPack(int initSize){
00459 #ifdef VOID_TRACER
00460 trcEnterState( fd->trcData, VT_OH_INITPACK );
00461 #endif
00462
00463
00464 if (bufSend.buffer != NULL){
00465 free(bufSend.buffer);
00466 }
00467
00468 int trueSize;
00469
00470 if (initSize < MINSIZE)
00471 trueSize = MINSIZE;
00472 else
00473 trueSize = initSize;
00474
00475 if ((bufSend.buffer = (char *)malloc(sizeof(char)*trueSize))==NULL){
00476 printf("FilterDev/FilterDev.c: error, could not allocate memory for buffer\n");
00477 #ifdef VOID_TRACER
00478 trcLeaveState(fd->trcData);
00479 #endif
00480 return -1;
00481 }
00482
00483
00484 bufSend.size = 0;
00485 bufSend.maxSize = trueSize;
00486
00487
00488
00489
00490
00491
00492
00493
00494 #ifdef VOID_TRACER
00495 dsPackData(&msgId, sizeof(int));
00496 trcLeaveState(fd->trcData);
00497 #endif
00498
00499 return 1;
00500
00501 }
00502
00503
00504
00505
00506
00507 int dsPackData(void *data, int size){
00508 #ifdef VOID_TRACER
00509 trcEnterState( fd->trcData, VT_OH_PACK );
00510 #endif
00511
00512
00513 if (bufSend.size + size > bufSend.maxSize){
00514
00515
00516 bufSend.maxSize = bufSend.size + size;
00517 bufSend.buffer = (char*)realloc(bufSend.buffer, bufSend.maxSize*sizeof(char));
00518 }
00519
00520 memcpy(&bufSend.buffer[bufSend.size], data, size);
00521 bufSend.size+= size;
00522 #ifdef VOID_TRACER
00523 trcLeaveState( fd->trcData );
00524 #endif
00525 return 1;
00526 }
00527
00528
00529
00530
00531
00532
00533 int dsWritePackedBuffer(OutputPortHandler oph){
00534
00535
00536
00537 #ifdef VOID_TRACER
00538 trcEnterState( fd->trcData, VT_OH_WPBUFFER );
00539 trcLeaveState( fd->trcData );
00540 #endif
00541 return dsWriteBuffer(oph, bufSend.buffer, bufSend.size);
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589 }
00590
00591 void taskSend(int taskId, int *deps, int depSize, char *metadata, int metaSize, int creatorTid){
00592 int i, j;
00593 int type = MSGT_CREATETASK;
00594 OutputPort* op;
00595
00596 for(i = 0; i < fd->numOutputPorts; i++){
00597 op = fd->outputPorts[i];
00598
00599 int willSendToCreator = 0;
00600 for ( j = 0; j < op->numDestinations; j++ ) {
00601 if ( creatorTid == op->tidsDestinations[j] ) {
00602 willSendToCreator = 1;
00603 break;
00604 }
00605 }
00606
00607
00608 if (!willSendToCreator) {
00609 pvm_initsend(PvmDataInPlace);
00610 pvm_pkint(&type, 1, 1);
00611 pvm_pkint(&taskId, 1, 1);
00612 #ifdef VOID_TRACER
00613 pvm_pkint(&msgId, 1, 1);
00614 #endif
00615
00616
00617 pvm_pkint( &creatorTid, 1, 1);
00618
00619 pvm_pkint(&depSize, 1, 1);
00620 if (depSize > 0) pvm_pkint(deps, depSize, 1);
00621
00622 pvm_pkint(&metaSize, 1, 1);
00623 if (metaSize > 0) pvm_pkbyte((char *) metadata, metaSize, 1);
00624
00625 for ( j = 0; j < op->numDestinations; j++ ) {
00626 pvm_send( op->tidsDestinations[j], op->tag );
00627 }
00628
00629
00630 #ifdef DEBUG
00631 int k;
00632 fprintf(stderr, "tid %d, tag %d: Multcast de 8+%d bytes p/ instancias de tids:", pvm_mytid(), op->tag, 2*sizeof(int)+metaSize+(depSize*sizeof(int)));
00633 for (k=0; k<op->numDestinations; k++) {
00634 fprintf(stderr, " %d", op->tidsDestinations[k]);
00635 }
00636 fprintf(stderr, "\n");
00637 #endif
00638 }
00639
00640 }
00641 }
00642
00643 int endTaskSend(int taskId, int enderTid){
00644 int i, j, type = MSGT_ENDTASK;
00645 OutputPort* op = NULL;
00646
00647 for(i = 0; i < fd->numOutputPorts; i++){
00648 op = fd->outputPorts[i];
00649
00650 int willSendToEnder = 0;
00651 for ( j = 0; j < op->numDestinations; j++ ) {
00652 if ( enderTid == op->tidsDestinations[j] ) {
00653 willSendToEnder = 1;
00654 }
00655 }
00656
00657 if (!willSendToEnder) {
00658 pvm_initsend(PvmDataInPlace);
00659 pvm_pkint(&type, 1, 1);
00660 pvm_pkint(&taskId, 1, 1);
00661 #ifdef VOID_TRACER
00662 pvm_pkint(&msgId, 1, 1);
00663 #endif
00664 pvm_pkint(&enderTid, 1, 1);
00665
00666 for ( j = 0; j < op->numDestinations; j++ ) {
00667 pvm_send( op->tidsDestinations[j], op->tag );
00668 }
00669
00670
00671 #ifdef DEBUG
00672 int k;
00673 fprintf(stderr, "tid %d, tag %d: Multcast mesgType %d ", pvm_mytid(), op->tag, type);
00674 for (k=0; k<op->numDestinations; k++) {
00675 fprintf(stderr, " %d", op->tidsDestinations[k]);
00676 }
00677 fprintf(stderr, "\n");
00678 #endif
00679 }
00680
00681 }
00682 return 0;
00683 }
00684
00685
00686
00687
00688
00689 int dsReadBuffer(InputPortHandler iph, void *buf, int szBuf) {
00690
00691 #ifdef VOID_TRACER
00692 trcEnterState( fd->trcData, VT_OH_RBUFFER );
00693 #endif
00694
00695 #ifdef VOID_INST
00696
00697 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_READ]);
00698 #endif
00699
00700 int ret = ERROR;
00701 void *recvBuf = NULL;
00702 int taskId = -1;
00703 int instTid = -1;
00704
00705 InputPort *ip = fd->inputPorts[iph];
00706
00707 if ((iph > fd->numInportsAdded) || (iph > fd->numInputPorts)) {
00708 fprintf(stderr, "%s (%d): iph(%d) > %d or %d\n", __FILE__, __LINE__,
00709 iph, fd->numInportsAdded, fd->numInputPorts);
00710 }
00711
00712
00713 while(ip->numEowRecv < ip->numSources) {
00714 int szMsg = -1;
00715
00716
00717
00718
00719
00720 struct timeval tmout;
00721 tmout.tv_sec = 0;
00722 tmout.tv_usec = 1000;
00723 #ifdef VOID_TRACER
00724 struct timeval iitime, ietime;
00725 gettimeofday( &iitime, NULL );
00726 trcEnterState( fd->trcData, VT_COMM_READ );
00727 int incomingMsgId = -1;
00728 #endif
00729 int bufid=0;
00730
00731 while(!bufid){
00732 #ifdef VOID_TRACER
00733 gettimeofday( &ietime, NULL );
00734 trcResetCurrentState( fd->trcData );
00735 #endif
00736 bufid = pvm_trecv(-1, ip->tag, &tmout);
00737 #ifdef VOID_TERM
00738 if(!bufid) {
00739 if((bufid = pvm_probe(-1, 3))) {
00740
00741 int nBytes, instTid;
00742 pvm_bufinfo(bufid, &nBytes, NULL, &instTid);
00743 pvm_recv(instTid, 3);
00744 } else {
00745 if(tdd->status == NOTPARTICIPATING)
00746 beginTerminationDetection();
00747 }
00748 }
00749 #endif
00750 }
00751
00752
00753 int status = pvm_bufinfo(bufid, &szMsg, NULL, &instTid);
00754 szMsg -= (2 * sizeof(int));
00755
00756 int msgType;
00757 pvm_upkint(&msgType, 1, 1);
00758
00759
00760
00761
00762
00763
00764
00765
00766 #ifdef VOID_TERM
00767 if(msgType == MSGT_INITTERM) {
00768 int neighborTermTag;
00769 pvm_upkint(&neighborTermTag, 1, 1);
00770 terminationDetectionRound(instTid, neighborTermTag);
00771 #ifdef VOID_TRACER
00772 trcLeaveState( fd->trcData, VT_LEAVE_COMM_READ );
00773 trcEnterState( fd->trcData, VT_IDLE_READ );
00774 trcLeaveState( fd->trcData );
00775 #endif
00776 } else {
00777 updateTermStreamToEmpty(instTid);
00778 #endif
00779 pvm_upkint(&taskId, 1, 1);
00780 #ifdef VOID_TRACER
00781 pvm_upkint( &incomingMsgId, 1, 1 );
00782 szMsg -= (1 * sizeof(int));
00783 trcLeaveState( fd->trcData, VT_LEAVE_COMM_READ );
00784 trcEnterState( fd->trcData, VT_IDLE_READ );
00785 trcLeaveState( fd->trcData );
00786 #endif
00787
00788 switch (msgType){
00789
00790 case (MSGT_CREATETASK):{
00791 int creatorTid, depSize, metaSize, createTaskReturn;
00792 char *metadata = NULL;
00793 int *deps = NULL;
00794
00795
00796 pvm_upkint( &creatorTid, 1, 1 );
00797
00798
00799 pvm_upkint(&depSize, 1, 1);
00800 if(depSize > 0){
00801 deps = malloc(sizeof(int) * depSize);
00802 pvm_upkint(deps, depSize,1);
00803 }
00804
00805
00806 pvm_upkint(&metaSize, 1, 1);
00807 if(metaSize > 0){
00808 metadata = malloc(sizeof(char) * metaSize);
00809 pvm_upkbyte(metadata, metaSize,1);
00810 }
00811 createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
00812
00813
00814
00815
00816 if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
00817 taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
00818 }
00819
00820 break;
00821 }
00822 case (MSGT_ENDTASK):{
00823 int enderTid;
00824 pvm_upkint( &enderTid, 1, 1);
00825 int endTaskReturn = cacheEndTask(taskId);
00826
00827 if((endTaskReturn != E_TASK_NOT_RUNNING) && cacheGetForwardTaskMsgs()){
00828 endTaskSend(taskId, enderTid);
00829 }
00830 break;
00831 }
00832 case (MSGT_EOW):
00833 {
00834
00835 ip->numEowRecv++;
00836 break;
00837 }
00838 case (MSGT_F2F):
00839 {
00840 status = dsSetCurrentTask(taskId);
00841 if ((status < 0) && (!cacheGetForwardTaskMsgs()) && (taskId >= 0)) {
00842
00843
00844 dsCreateTask(taskId, NULL, 0, NULL, 0);
00845 dsSetCurrentTask(taskId);
00846 }
00847
00848
00849 if (szMsg > szBuf) {
00850
00851 printf("FilterDev.c: warning, message size(%d) is bigger than buffer(%d), truncating! bufid=%d ip->tag=%d",
00852 szMsg, szBuf, bufid, ip->tag);
00853 recvBuf = malloc(szMsg);
00854 status = pvm_upkbyte((char *) recvBuf, szMsg, 1);
00855 ret = szBuf;
00856 memcpy(buf, recvBuf, szBuf);
00857 free(recvBuf);
00858 } else {
00859
00860 recvBuf = buf;
00861 status = pvm_upkbyte((char *) buf, szMsg, 1);
00862 ret = szMsg;
00863 }
00864
00865 #ifdef VOID_INST
00866 instLeaveState(fd->instData);
00867 #endif
00868 #ifdef VOID_TRACER
00869 trcLeaveState( fd->trcData );
00870 #endif
00871
00872 return ret;
00873 }
00874 default:
00875 {
00876
00877 fprintf(stderr, "FilterDev.c: unknown message type\n");
00878 }
00879 }
00880 #ifdef VOID_TERM
00881 }
00882 #endif
00883 }
00884
00885 #ifdef VOID_INST
00886
00887 instLeaveState(fd->instData);
00888 #endif
00889 #ifdef VOID_TRACER
00890 trcLeaveState( fd->trcData );
00891 #endif
00892
00893 return EOW;
00894 }
00895
00896 int recvNonBlockingData(InputPort *ip) {
00897 int bufId = 0;
00898 struct timeval tmout;
00899 tmout.tv_sec = 0;
00900 tmout.tv_usec = 1;
00901
00902 bufId = pvm_trecv(-1, ip->tag, &tmout);
00903
00904 return bufId;
00905 }
00906
00907
00908
00909 int dsReadNonBlockingBuffer(InputPortHandler iph, void *buf, int szBuf) {
00910
00911 #ifdef VOID_TRACER
00912 trcEnterState( fd->trcData, VT_OH_RBUFFER );
00913 trcEnterState( fd->trcData, VT_COMM_READ );
00914 int incomingMsgId = -1;
00915 #endif
00916
00917 #ifdef VOID_INST
00918
00919 instEnterState(fd->instData, &fd->instData->voidStates[TIMER_READ]);
00920 #endif
00921
00922 int ret = ERROR;
00923 void *recvBuf = NULL;
00924 int taskId = -1;
00925
00926 InputPort *ip = fd->inputPorts[iph];
00927
00928 if ((iph > fd->numInportsAdded) || (iph > fd->numInputPorts)) {
00929 fprintf(stderr, "%s (%d): iph(%d) > %d or %d\n", __FILE__, __LINE__,
00930 iph, fd->numInportsAdded, fd->numInputPorts);
00931 }
00932
00933
00934 while(ip->numEowRecv < ip->numSources) {
00935 int szMsg = -1;
00936 int bufid = recvNonBlockingData(ip);
00937 if(bufid == 0) return 0;
00938 int instTid;
00939
00940 int status = pvm_bufinfo(bufid, &szMsg, NULL, &instTid);
00941 szMsg -= (2 * sizeof(int));
00942
00943 int msgType;
00944 pvm_upkint(&msgType, 1, 1);
00945 #ifdef VOID_TERM
00946 if(msgType == MSGT_INITTERM) {
00947 int neighborTermTag;
00948 pvm_upkint(&neighborTermTag, 1, 1);
00949 terminationDetectionRound(instTid, neighborTermTag);
00950 }else {
00951 updateTermStreamToEmpty(instTid);
00952 #endif
00953 pvm_upkint(&taskId, 1, 1);
00954
00955 switch (msgType){
00956 case (MSGT_CREATETASK):{
00957 int creatorTid, depSize, metaSize, createTaskReturn;
00958 char *metadata = NULL;
00959 int *deps = NULL;
00960
00961
00962 pvm_upkint( &creatorTid, 1, 1 );
00963
00964
00965 pvm_upkint(&depSize, 1, 1);
00966 if(depSize > 0){
00967 deps = malloc(sizeof(int) * depSize);
00968 pvm_upkint(deps, depSize,1);
00969 }
00970
00971
00972 pvm_upkint(&metaSize, 1, 1);
00973 if(metaSize > 0){
00974 metadata = malloc(sizeof(char) * metaSize);
00975 pvm_upkbyte(metadata, metaSize,1);
00976 }
00977 createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
00978
00979
00980
00981
00982 if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
00983 taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
00984 }
00985
00986 break;
00987 }
00988
00989 case (MSGT_ENDTASK):{
00990 dsEndTask(taskId);
00991 break;
00992 }
00993 case (MSGT_EOW):
00994 {
00995
00996 ip->numEowRecv++;
00997 break;
00998 }
00999 case (MSGT_F2F):
01000 {
01001 #ifdef VOID_TRACER
01002 pvm_upkint( &incomingMsgId, 1, 1 );
01003 #endif
01004 dsSetCurrentTask(taskId);
01005
01006
01007 if (szMsg > szBuf) {
01008
01009 printf("FilterDev.c: warning, message size(%d) is bigger than buffer(%d), truncating! bufid=%d ip->tag=%d",
01010 szMsg, szBuf, bufid, ip->tag);
01011 recvBuf = malloc(szMsg);
01012 status = pvm_upkbyte((char *) recvBuf, szMsg, 1);
01013 ret = szBuf;
01014 memcpy(buf, recvBuf, szBuf);
01015 free(recvBuf);
01016 } else {
01017
01018 recvBuf = buf;
01019 status = pvm_upkbyte((char *) buf, szMsg, 1);
01020 ret = szMsg;
01021 }
01022
01023 #ifdef VOID_INST
01024 instLeaveState(fd->instData);
01025 #endif
01026 #ifdef VOID_TRACER
01027 trcLeaveState( fd->trcData, instTid, incomingMsgId );
01028 trcLeaveState( fd->trcData );
01029 #endif
01030
01031 return ret;
01032 }
01033 default:
01034 {
01035
01036 fprintf(stderr, "FilterDev.c: unknown message type\n");
01037 }
01038 }
01039 #ifdef VOID_TERM
01040 }
01041 #endif
01042 }
01043
01044 #ifdef VOID_INST
01045
01046 instLeaveState(fd->instData);
01047 #endif
01048 #ifdef VOID_TRACER
01049 trcLeaveState( fd->trcData, incomingMsgId );
01050 trcLeaveState( fd->trcData );
01051 #endif
01052
01053 return EOW;
01054
01055 }
01056
01057
01058
01059
01060
01061 int dsInitReceive(InputPortHandler iph){
01062
01063 #ifdef VOID_TRACER
01064
01065 trcEnterState( fd->trcData, VT_OH_IRECEIVE );
01066 trcLeaveState( fd->trcData );
01067 #endif
01068
01069
01070
01071
01072
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110
01111
01112
01113
01114
01115
01116
01117
01118
01119
01120
01121
01122
01123
01124
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135
01136
01137
01138
01139
01140
01141
01142
01143
01144
01145
01146
01147
01148
01149
01150
01151
01152
01153
01154
01155
01156
01157
01158
01159
01160
01161
01162
01163
01164
01165
01166
01167
01168
01169
01170
01171
01172
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189
01190
01191
01192
01193
01194
01195
01196
01197
01198
01199
01200
01201
01202
01203
01204
01205
01206
01207
01208
01209
01210
01211
01212
01213
01214
01215
01216
01217
01218
01219
01220
01221
01222
01223
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234
01235
01236
01237
01238
01239 return EOW;
01240 }
01241
01242
01243
01244
01245
01246 int dsUnpackData(void *buf, int size){
01247
01248 #ifdef VOID_TRACER
01249 trcEnterState( fd->trcData, VT_OH_UNPACK );
01250 #endif
01251
01252 if(bufRecv.size+size > bufRecv.maxSize) {
01253 printf("FilterDev/FilterDev.c: Reading beyond the end of the receive buffer.\n");
01254 #ifdef VOID_TRACER
01255 trcLeaveState( fd->trcData );
01256 #endif
01257 return -1;
01258 }
01259
01260 memcpy(buf, &(bufRecv.buffer[bufRecv.size]), size);
01261
01262 bufRecv.size += size;
01263 #ifdef VOID_TRACER
01264 trcLeaveState( fd->trcData );
01265 #endif
01266
01267 return size;
01268 }
01269
01270
01271 char *dsGetFilterName() {
01272 return fd->name;
01273 }
01274
01275
01276 int dsGetFilterId() {
01277 return fd->id;
01278 }
01279
01280
01281 int dsGetNumInputPorts() {
01282 return fd->numInputPorts;
01283 }
01284
01285
01286 char **dsGetInputPortNames() {
01287 char **imputPortNames = (char **) malloc(fd->numInputPorts);
01288 int i, nameSz;
01289
01290 for(i = 0; i < fd->numInputPorts; i++) {
01291 nameSz = strlen(fd->inputPorts[i]->name);
01292 imputPortNames[i] = (char *) malloc(nameSz+1);
01293 strcpy (imputPortNames[i], fd->inputPorts[i]->name);
01294 }
01295
01296 return imputPortNames;
01297 }
01298
01299
01300 int dsGetNumOutputPorts() {
01301 return fd->numOutputPorts;
01302 }
01303
01304
01305 char **dsGetOutputPortNames() {
01306 char **outputPortNames = (char **) malloc(fd->numOutputPorts);
01307 int i, nameSz;
01308
01309 for(i = 0; i < fd->numOutputPorts; i++) {
01310 nameSz = strlen(fd->outputPorts[i]->name);
01311 outputPortNames[i] = (char *) malloc(nameSz+1);
01312 strcpy (outputPortNames[i], fd->outputPorts[i]->name);
01313 }
01314
01315 return outputPortNames;
01316 }
01317
01318
01319 int dsGetNumUpStreamsRunning() {
01320 int i;
01321 int numUpStreamsRunning = 0;
01322
01323 for(i = 0; i < fd->numInputPorts; i++) {
01324 if(fd->inputPorts[i]->numEowRecv < fd->inputPorts[i]->numSources)
01325 numUpStreamsRunning++;
01326 }
01327
01328 return numUpStreamsRunning;
01329 }
01330
01331
01332 int dsGetMyRank(){
01333 #ifdef VOID_TRACER
01334 trcEnterState( fd->trcData, VT_OH_GETMYRANK );
01335 #endif
01336 int a = fd->myRank;
01337 #ifdef VOID_TRACER
01338 trcLeaveState( fd->trcData );
01339 #endif
01340 return a;
01341 }
01342
01343
01344 int dsGetTotalInstances(){
01345 #ifdef VOID_TRACER
01346 trcEnterState( fd->trcData, VT_OH_GETTI );
01347 trcLeaveState( fd->trcData );
01348 #endif
01349
01350 return fd->numInstances;
01351 }
01352
01353
01354 int dsExit(char *userMesg){
01355 #ifdef VOID_TRACER
01356 trcEnterState( fd->trcData, VT_OH_EXIT );
01357 #endif
01358
01359 int length = 0;
01360 char voidMesg[1001];
01361
01362 length = strlen(userMesg);
01363 if(length > 1000){
01364 printf("FilterDev.c: Warning: message size bigger than 1000 chars, truncating\n");
01365 length = 1000;
01366 }
01367 strncpy(voidMesg, userMesg, length);
01368 voidMesg[length] = '\0';
01369
01370 pvm_initsend(PvmDataRaw);
01371 pvm_pkbyte(voidMesg, strlen(voidMesg)+1, 1);
01372 pvm_send(pvm_parent(), MSGT_AEXIT);
01373 pvm_exit();
01374 #ifdef VOID_TRACER
01375 trcLeaveState( fd->trcData );
01376 trcDestroyData( &(fd->trcData) );
01377 #endif
01378 exit(-1);
01379 }
01380
01381
01382 int dsProbe(InputPortHandler iph){
01383 #ifdef VOID_TRACER
01384 trcEnterState( fd->trcData, VT_OH_PROBE );
01385 #endif
01386
01387 int tag, tid = -1, size=0;
01388 int tag2, tid2;
01389
01390
01391 InputPort *ip = fd->inputPorts[iph];
01392
01393
01394 tag = ip->tag;
01395
01396 int probRes = pvm_probe(tid, tag);
01397 if (probRes == 0){
01398 #ifdef VOID_TRACER
01399 trcLeaveState( fd->trcData );
01400 #endif
01401 return 0;
01402 } else{
01403 if (pvm_bufinfo(probRes, &size, &tag2, &tid2) < 0){
01404
01405 printf("probe error, return 0\n");
01406 #ifdef VOID_TRACER
01407 trcLeaveState( fd->trcData );
01408 #endif
01409 return 0;
01410 }
01411 #ifdef VOID_TRACER
01412 trcLeaveState( fd->trcData );
01413 #endif
01414 return size - sizeof(int)*2;
01415 }
01416 }
01417
01418 int dsGetMachineMemory(){
01419 #ifdef VOID_TRACER
01420 trcEnterState( fd->trcData, VT_OH_GETMEM );
01421 trcLeaveState( fd->trcData );
01422 #endif
01423 return getFDMachineMem(fd);
01424 }
01425
01426 int dsGetLocalInstances(){
01427 #ifdef VOID_TRACER
01428 trcEnterState( fd->trcData, VT_OH_GETLI );
01429 trcLeaveState( fd->trcData );
01430 #endif
01431 return getFDLocalInstances(fd);
01432 }
01433
01434
01435
01436
01437 void dsUseTasks() {
01438 cacheSetUseTasks();
01439 }
01440
01441 int dsGetCurrentTask(){
01442 #ifdef VOID_TRACER
01443 trcEnterState( fd->trcData, VT_OH_GETTASK );
01444 #endif
01445 int ret = cacheGetCurrentTask();
01446 #ifdef VOID_TRACER
01447 trcLeaveState( fd->trcData );
01448 #endif
01449 return ret;
01450 }
01451
01452 int dsSetCurrentTask(int taskId) {
01453 #ifdef VOID_TRACER
01454 trcEnterState( fd->trcData, VT_OH_SETTASK );
01455 #endif
01456 int rvalue = cacheSetCurrentTask(taskId);
01457 #ifdef VOID_TRACER
01458 trcLeaveState( fd->trcData );
01459 #endif
01460 return rvalue;
01461 }
01462
01463 int *dsGetTaskDeps(int taskId, int *depsSz) {
01464 return cacheGetTaskDeps(taskId, depsSz);
01465 }
01466
01467
01468 int dsCreateTask(int taskId, int *deps, int depSize, char *metadata, int metaSize){
01469
01470 #ifdef VOID_TRACER
01471 trcEnterState( fd->trcData, VT_OH_CREATETASK );
01472 #endif
01473 int createTaskReturn = cacheCreateTask(taskId, deps, depSize, metadata, metaSize);
01474
01475
01476
01477
01478 if ((createTaskReturn == 0) && (cacheGetForwardTaskMsgs())){
01479 int creatorTid = pvm_mytid();
01480 taskSend(taskId, deps, depSize, metadata, metaSize, creatorTid);
01481 }
01482 #ifdef VOID_TRACER
01483 trcLeaveState( fd->trcData );
01484 #endif
01485
01486 return createTaskReturn;
01487 }
01488
01489
01490 int dsEndTask(int taskId){
01491 #ifdef VOID_TRACER
01492 trcEnterState( fd->trcData, VT_OH_ENDTASK );
01493 #endif
01494 int endTaskReturn = cacheEndTask(taskId);
01495
01496 int enderTid = pvm_mytid();
01497 if((endTaskReturn != E_TASK_NOT_RUNNING) && cacheGetForwardTaskMsgs()){
01498 endTaskSend(taskId, enderTid);
01499 }
01500 #ifdef VOID_TRACER
01501 trcLeaveState( fd->trcData );
01502 #endif
01503
01504 return endTaskReturn;
01505 }
01506
01507
01508
01509 int dsPutData(char *id, void *val, int valSize){
01510 return cachePutData(id, val, valSize);
01511 }
01512
01513 void *dsGetData(int taskId, char *id, int *valSz){
01514 return cacheGetData(taskId, id, valSz);
01515 }
01516
01517 int dsRemoveData(char *id){
01518 return cacheRemoveData(id);
01519 }
01520
01521
01522
01523
01524
01525 void dsInstSetStates(char **states, int numStates){
01526 #ifdef VOID_INST
01527 instSetUserStates(fd->instData, states, numStates);
01528 #endif
01529 }
01530
01531 void dsInstSwitchState(int stateId){
01532 #ifdef VOID_INST
01533 instSwitchState(fd->instData, &fd->instData->userStates[stateId]);
01534 #endif
01535 }
01536
01537 void dsInstEnterState(int stateId){
01538 #ifdef VOID_INST
01539
01540 instEnterState(fd->instData, &fd->instData->userStates[stateId]);
01541 #endif
01542 }
01543
01544 void dsInstLeaveState(){
01545 #ifdef VOID_INST
01546 instLeaveState(fd->instData);
01547 #endif
01548 }
01549
01550 int dsGetRunningTasks( int * numTasks, int ** taskList )
01551 {
01552 return cacheGetRunningTasks( numTasks, taskList );
01553 }
01554
01555 int *dsGetFinishedTasks(int *numTasks) {
01556 return cacheGetFinishedTasks(numTasks);
01557 }
01558
01559 int dsRegisterRecoverCallback( RecoverCallback_t *callback ) {
01560 return cacheRegisterRecoverCallback(callback);
01561 }