00001 #ifndef MANAGER_C
00002 #define MANAGER_C
00003
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <strings.h>
00007 #include <signal.h>
00008 #include <assert.h>
00009
00010 #include "Messages.h"
00011 #include "Manager.h"
00012 #include "Layout.h"
00013 #include "parser.h"
00014 #include "FilterSpec.h"
00015 #include "FilterData/Policies.h"
00016 #include "FilterData/Ports.h"
00017 #include "FilterData/FilterData.h"
00018 #include "FilterData/Termination.h"
00019 #include "constants.h"
00020 #include <TaskIdList/TaskIdList.h>
00021 #ifdef VOID_TRACER
00022 struct timeval trcInitTime;
00023 #endif
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 int recievedSignal = 0;
00042
00043
00044
00045
00046 void handleSigterm(int signal) {
00047 printf("%s(%d): Manager recieved signal %d, exiting.\n", __FILE__, __LINE__, signal);
00048 recievedSignal = signal;
00049 }
00050
00051
00052
00053
00054
00055 static void spawnAllFilter(Layout *layout){
00056 int i;
00057 for(i = 0; i < layout->numFilters; i++) {
00058 FilterSpec *pfilter = layout->filters[i];
00059
00060
00061 printf("Manager.c: spawning filter %s instances\n", pfilter->name);
00062 if (fsSpawnInstances(pfilter, layout->command, layout->argvSpawn) == -1){
00063 printf("manager.c: error spawning filter %s, aborting\n", pfilter->name);
00064 exit(-1);
00065 }
00066 }
00067 printf("Manager.c: all filters spawned successfully\n");
00068 }
00069
00070
00071
00072
00073
00074
00075 static void killAllFilters(Layout *layout){
00076 int x, y;
00077 for(x = 0; x < layout->numFilters; x++) {
00078 for(y = 0; y < layout->filters[x]->filterPlacement.numInstances; y++) {
00079 if (pvm_kill(layout->filters[x]->filterPlacement.tids[y]) < 0) {
00080 fprintf(stderr, "%s(%d): Error killing filter %s instance: %d (pvm tid:t%x)\n",
00081 __FILE__, __LINE__, layout->filters[x]->name, y,
00082 layout->filters[x]->filterPlacement.tids[y]);
00083 }
00084 }
00085 }
00086 }
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134 static void sendFiltersData(Layout *layout){
00135 FilterSpec *pFilter;
00136 int i, j, k;
00137 printf("Manager.c: sending filter data now\n");
00138
00139 #ifdef VOID_INST
00140
00141
00142 char instDir[MAX_IDIR_LENGTH];
00143 sprintf(instDir, "%s/", INST_DIR);
00144 for (i=0; i < layout->numFilters-1; i++){
00145 if (strlen(instDir) >= (MAX_IDIR_LENGTH - 3)){
00146
00147 fprintf(stderr, "%s %d: warning, instrumentation directory name too big, truncating to %s\n", __FILE__, __LINE__, instDir);
00148 break;
00149 }
00150 sprintf(instDir, "%s%d-", instDir, layout->filters[i]->filterPlacement.numInstances);
00151 }
00152 sprintf(instDir, "%s%d", instDir, layout->filters[layout->numFilters-1]->filterPlacement.numInstances);
00153 #endif
00154
00155 for(i = 0; i < layout->numFilters; i++) {
00156 pFilter = layout->filters[i];
00157
00158
00159 for (j=0; j < pFilter->filterPlacement.numInstances; j++){
00160 int l1, l2, l3, l4, l5, l6, l7, res;
00161
00162
00163 pvm_initsend(PvmDataRaw);
00164
00165
00166 l1 = strlen(layout->cwd);
00167 pvm_pkint(&l1, 1, 1);
00168 pvm_pkbyte(layout->cwd, l1, 1);
00169
00170
00171 pvm_pkint(&i, 1, 1);
00172
00173 pvm_pkint(&j, 1, 1);
00174
00175 pvm_pkint(&pFilter->filterPlacement.numInstances, 1, 1);
00176
00177 pvm_pkint(pFilter->filterPlacement.tids, pFilter->filterPlacement.numInstances, 1);
00178
00179
00180 l2 = strlen(pFilter->name);
00181 pvm_pkint(&l2, 1, 1);
00182 pvm_pkbyte(pFilter->name, l2, 1);
00183
00184
00185 HostsStruct *h = layout->hostsStruct;
00186 int hostIndex = hostsGetIndexByName(h, pFilter->filterPlacement.hosts[j]);
00187 int memory = hostsGetMemory(h, hostIndex);
00188 pvm_pkint(&memory, 1, 1);
00189
00190
00191 int z, numLocalInstances = 0;
00192 for (z=0; z < pFilter->filterPlacement.numInstances; z++){
00193 if (strncmp(pFilter->filterPlacement.hosts[z], pFilter->filterPlacement.hosts[j], MAX_HNAME_LENGTH) == 0){
00194 numLocalInstances++;
00195 }
00196 }
00197 pvm_pkint(&numLocalInstances, 1, 1);
00198
00199 #ifdef VOID_INST
00200 int lInst;
00201
00202
00203 lInst = strlen(instDir);
00204 pvm_pkint(&lInst, 1, 1);
00205 pvm_pkbyte(instDir, lInst, 1);
00206 #endif
00207
00208 l3 = strlen(pFilter->libname);
00209 pvm_pkint(&l3, 1, 1);
00210 pvm_pkbyte(pFilter->libname, l3, 1);
00211
00212
00213
00214 pvm_pkint(&pFilter->numOutputs, 1, 1);
00215
00216 pvm_pkint(&pFilter->numInputs, 1, 1);
00217
00218
00219
00220 for(k = 0; k < pFilter->numOutputs; k++) {
00221
00222 l4 = strlen(pFilter->outputs[k]->fromPortName);
00223 pvm_pkint(&l4, 1, 1);
00224 pvm_pkbyte(pFilter->outputs[k]->fromPortName, l4, 1);
00225
00226
00227 pvm_pkint(&pFilter->outputs[k]->toFilter->filterPlacement.numInstances, 1, 1);
00228
00229 pvm_pkint(pFilter->outputs[k]->toFilter->filterPlacement.tids,
00230 pFilter->outputs[k]->toFilter->filterPlacement.numInstances, 1);
00231
00232 pvm_pkint(&pFilter->outputs[k]->tag, 1, 1);
00233
00234
00235 l5 = strlen(pFilter->outputs[k]->writePolicyName);
00236 pvm_pkint(&l5, 1, 1);
00237 pvm_pkbyte(pFilter->outputs[k]->writePolicyName, l5, 1);
00238
00239 writePolicy_t wp = getWritePolicyByName(pFilter->outputs[k]->writePolicyName);
00240
00241
00242 if ( (wp == LABELED_STREAM) || (wp == MULTICAST_LABELED_STREAM) ){
00243 l6 = strlen(pFilter->outputs[k]->lsLibName);
00244 pvm_pkint(&l6, 1, 1);
00245 pvm_pkbyte(pFilter->outputs[k]->lsLibName, l6, 1);
00246 }
00247 else {
00248
00249
00250
00251 res = j % pFilter->outputs[k]->toFilter->filterPlacement.numInstances;
00252 pvm_pkint(&res, 1, 1);
00253 }
00254 }
00255
00256
00257
00258 for(k = 0; k < pFilter->numInputs; k++) {
00259
00260 l7 = strlen(pFilter->inputs[k]->toPortName);
00261 pvm_pkint(&l7, 1, 1);
00262 pvm_pkbyte(pFilter->inputs[k]->toPortName, l7, 1);
00263
00264
00265 pvm_pkint(&pFilter->inputs[k]->fromFilter->filterPlacement.numInstances, 1, 1);
00266 pvm_pkint(pFilter->inputs[k]->fromFilter->filterPlacement.tids,
00267 pFilter->inputs[k]->fromFilter->filterPlacement.numInstances, 1);
00268 pvm_pkint(&pFilter->inputs[k]->tag, 1, 1);
00269 }
00270 pvm_send(pFilter->filterPlacement.tids[j], 0);
00271 }
00272 }
00273 #ifdef VOID_TRACER
00274
00275 #define sub_times( a, b, result ) if ( a.tv_usec < b.tv_usec ){ result.tv_usec = 1000000 + a.tv_usec - b.tv_usec; result.tv_sec = (a.tv_sec - 1) - b.tv_sec; } else { result.tv_usec = a.tv_usec - b.tv_usec; result.tv_sec = a.tv_sec - b.tv_sec; };
00276
00277 #define add_times( a, b, result ) result.tv_usec = a.tv_usec + b.tv_usec; if ( result.tv_usec > 1000000 ){ result.tv_usec -= 1000000; result.tv_sec = a.tv_sec + b.tv_sec + 1; } else { result.tv_sec = a.tv_sec + b.tv_sec; };
00278
00279
00280
00281 struct timeval time, ohtime, t3;
00282
00283 fprintf( stderr, "Manager.c: syncing clocks (initTime: %lu):\n", trcInitTime.tv_usec + ( trcInitTime.tv_sec * 1000000 ) );
00284 for(i = 0; i < layout->numFilters; i++)
00285 {
00286 pFilter = layout->filters[i];
00287 for (j=0; j < pFilter->filterPlacement.numInstances; j++)
00288 {
00289
00290 gettimeofday( &time, NULL );
00291 pvm_initsend( PvmDataRaw );
00292 gettimeofday( &t3, NULL );
00293 sub_times( t3, trcInitTime, t3);
00294 add_times( t3, ohtime, t3 );
00295 pvm_pkint( (int*)(&(t3.tv_sec)), 1, 1 );
00296 pvm_pkint( (int*)(&(t3.tv_usec)), 1, 1 );
00297 gettimeofday( &ohtime, NULL );
00298
00299 sub_times( ohtime, time, ohtime );
00300
00301
00302 pvm_initsend( PvmDataRaw );
00303 pvm_pkint( &j, 1, 1 );
00304 pvm_send( pFilter->filterPlacement.tids[j], 666 );
00305 pvm_recv( pFilter->filterPlacement.tids[j], 666 );
00306 pvm_initsend( PvmDataRaw );
00307 gettimeofday( &t3, NULL );
00308 sub_times( t3, trcInitTime, t3);
00309 add_times( t3, ohtime, t3 );
00310 pvm_pkint( (int*)(&(t3.tv_sec)), 1, 1 );
00311 pvm_pkint( (int*)(&(t3.tv_usec)), 1, 1 );
00312 pvm_send( pFilter->filterPlacement.tids[j], 666 );
00313 fprintf( stderr, "\t%d.%d: %d,%.6d secs\n", i, j, (int)t3.tv_sec, (int)t3.tv_usec );
00314 }
00315 }
00316 fprintf( stderr, "Manager.c: synchronization done.\n" );
00317
00318 #endif
00319
00320 }
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330 static Layout *initManager(char *confFile, int argc, char **argv) {
00331 char hostname[50];
00332 int i,j;
00333 int numFilterInstances = 0;
00334
00335
00336 Layout *layout;
00337 layout = createLayout();
00338 layout->argvSpawn = (char **)malloc(sizeof(char *) * (argc));
00339
00340
00341 gethostname(hostname, 50);
00342 fprintf(stderr, "manager: pvm tid:t%x hostname:%s\n", pvm_mytid(), hostname);
00343
00344
00345 printf("\n====================================\n");
00346 printf("Manager.c: start parsing the file...\n");
00347 if (readConfig(confFile, layout) == -1){
00348 printf("Manager.c: parse error, aborting\n");
00349 exit(1);
00350 }
00351 printf("Manager.c: parser ended successfully\n");
00352 printf("====================================\n\n");
00353
00354
00355 for(i = 0; i < layout->numFilters; i++) {
00356 FilterSpec *pFilter = layout->filters[i];
00357
00358
00359 for (j=0; j<argc-1; j++) {
00360 layout->argvSpawn[j] = argv[j+1];
00361 }
00362 layout->argvSpawn[argc-1] = NULL;
00363
00364
00365 printf("Manager.c: spawning filter %s instances\n",
00366 pFilter->name);
00367
00368 if (fsSpawnInstances(pFilter, layout->command, layout->argvSpawn) == -1){
00369 printf("Manager.c: error spawning filter %s, aborting\n", pFilter->name);
00370 exit(1);
00371 }
00372
00373 numFilterInstances += pFilter->filterPlacement.numInstances;
00374 }
00375
00376 #ifdef VOID_TRACER
00377 fprintf(stderr, "Manager.c: Getting initial time ( for time synchronization ).\n");
00378 gettimeofday( &trcInitTime, NULL );
00379 #endif
00380
00381 printf("All process started, sending data now....\n\n");
00382 sendFiltersData(layout);
00383
00384 #ifdef VOID_TERM
00385 gt = initGlobalTermination(layout, numFilterInstances);
00386 #endif
00387
00388 return layout;
00389 }
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400 Layout *initDs(char *confFile, int argc, char **argv) {
00401 int err;
00402
00403
00404 pvm_setopt(PvmRoute, PvmRouteDirect);
00405
00406
00407 err = pvm_start_pvmd(0, NULL, 0);
00408 switch (err){
00409 case PvmSysErr:
00410 printf("Manager.c: error starting PVM, aborting\n");
00411 exit(1);
00412 break;
00413 }
00414
00415
00416 srandom(getpid());
00417
00418
00419
00420
00421
00422
00423
00424
00425 if (I_AM_THE_MANAGER) {
00426 return initManager(confFile, argc, argv);
00427 } else {
00428 runFilter();
00429 exit(0);
00430 }
00431 }
00432
00433
00434 int compareTaskId(const void *a, const void *b) {
00435 if (( *(int *)a) < ( *(int *)b)) return -1;
00436 if (( *(int *)a) > ( *(int *)b)) return 1;
00437 return 0;
00438 }
00439
00440
00441
00442 int replaceCrashedHost(Layout *layout, FilterSpec *pCrashedFilter, int crashedInstance) {
00443 HostsStruct *pHostStruct = layout->hostsStruct;
00444
00445 char *crashedHostName = pCrashedFilter->filterPlacement.hosts[crashedInstance];
00446 int hostDeletedIdx = hostsGetIndexByName(pHostStruct, crashedHostName);
00447 if(strcmp(crashedHostName, hostsGetName(pHostStruct, hostDeletedIdx))) {
00448
00449
00450 assert(0);
00451
00452 printf("Manager.c: Problem detecting dead host. PVM detected: %s. Void detected: %s.\n",
00453 crashedHostName, hostsGetName(pHostStruct, hostDeletedIdx));
00454 exit(-1);
00455 }
00456
00457
00458 hostsSetStatus(pHostStruct, hostDeletedIdx, NOTAVAIL);
00459
00460
00461
00462
00463
00464 int newHostIdx = hostsGetIndex(pHostStruct);
00465 pCrashedFilter->filterPlacement.hosts[crashedInstance] = hostsGetName(pHostStruct, newHostIdx);
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478 return 0;
00479 }
00480
00481
00482
00483
00484
00485
00486
00487 int appendWork(Layout *layout, void *work, unsigned int workSize){
00488 #ifdef NO_BARRIER
00489
00490
00491 pvm_initsend(PvmDataRaw);
00492
00493 int msgType = MSGT_WORK;
00494 pvm_pkint(&msgType, 1, 1);
00495
00496 pvm_pkbyte((char *)work, workSize, 1);
00497
00498
00499 for(i = 0; i < layout->numFilters; i++) {
00500 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00501
00502 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00503 }
00504
00505 #else
00506 int i,j;
00507 int totalEows = 0, numEowsReceived;
00508 int reconf = 0 , remainingReconfs = 3;
00509
00510
00511 int bufid = pvm_probe(-1, MSGT_FERROR);
00512 if (bufid != 0){
00513 int bytes, tag, tid;
00514 char *msg;
00515 pvm_bufinfo(bufid, &bytes, &tag, &tid);
00516 msg = (char*)malloc(bytes+1);
00517 pvm_recv(tid, MSGT_FERROR);
00518 pvm_upkbyte(msg, bytes, 1);
00519 msg[bytes] = '\0';
00520
00521 fprintf(stderr, "Manager.c: Error, received death notification\n");
00522 fprintf(stderr, "Manager.c: %s\n", msg);
00523 free(msg);
00524 killAllFilters(layout);
00525 exit(-1);
00526 }
00527
00528 printf("Manager.c: starting work...\n");
00529
00530
00531 for(i = 0; i < layout->numFilters; i++) {
00532 totalEows += layout->filters[i]->filterPlacement.numInstances;
00533 }
00534
00535
00536
00537 do{
00538
00539
00540 pvm_initsend(PvmDataRaw);
00541 int msgType;
00542 #ifdef VOID_FT
00543 if(!reconf){
00544 #endif
00545
00546 msgType = MSGT_WORK;
00547 #ifdef VOID_FT
00548 } else {
00549
00550 msgType = MSGT_FT;
00551 }
00552 #endif
00553 pvm_pkint(&msgType, 1, 1);
00554
00555 pvm_pkbyte((char *)work, workSize, 1);
00556
00557 reconf = 0;
00558
00559
00560
00561 for(i = 0; i < layout->numFilters; i++) {
00562 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00563
00564 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00565 }
00566
00567
00568 TaskIdList *finalTaskIdList = NULL, *currentTaskIdList;
00569 int filtersThatUseTasks = 0;
00570
00571 for(i = 0; i < layout->numFilters; i++) {
00572 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00573 for(j = 0; j < pFilterP->numInstances; j++) {
00574 int instanceUseTasks = -1;
00575
00576
00577 pvm_recv(pFilterP->tids[j], 0);
00578 pvm_upkint(&instanceUseTasks, 1, 1);
00579 layout->filters[i]->useTasks = instanceUseTasks;
00580
00581 #ifdef VOID_FT
00582 if (instanceUseTasks) {
00583 currentTaskIdList = (TaskIdList *)unpackTaskIdList();
00584
00585
00586 qsort(currentTaskIdList->vetor, currentTaskIdList->size, sizeof(int), compareTaskId);
00587 if(finalTaskIdList == NULL) {
00588 finalTaskIdList = currentTaskIdList;
00589 } else {
00590
00591 finalTaskIdList = taskIdListIntersection(finalTaskIdList, currentTaskIdList);
00592 taskIdListDestroy(currentTaskIdList);
00593 }
00594 }
00595 #endif
00596
00597 }
00598
00599 if (layout->filters[i]->useTasks) filtersThatUseTasks++;
00600 }
00601
00602
00603 for(i = 0; i < layout->numFilters; i++) {
00604 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00605
00606 int needForwardTaskMessages = 1;
00607 if (filtersThatUseTasks < 2) needForwardTaskMessages = 0;
00608
00609 #ifdef VOID_FT
00610 if (layout->filters[i]->useTasks) {
00611
00612
00613 pvm_initsend(PvmDataDefault);
00614 pvm_pkint(&needForwardTaskMessages, 1, 1);
00615
00616 packTaskIdList(finalTaskIdList);
00617 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00618 } else {
00619 #endif
00620
00621 pvm_initsend(PvmDataDefault);
00622 pvm_pkint(&needForwardTaskMessages, 1, 1);
00623 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00624 #ifdef VOID_FT
00625 }
00626 #endif
00627 }
00628 taskIdListDestroy(finalTaskIdList);
00629
00630
00631
00632
00633 numEowsReceived = 0;
00634
00635
00636 while(numEowsReceived < totalEows){
00637
00638
00639 struct timeval timeout = {MANAGER_MAIN_RECV_TIMEOUT, 0};
00640 int szMsg = -1;
00641 int inst_tid = -1;
00642 int msgTag = -1;
00643 int bufid = pvm_trecv(-1, -1, &timeout);
00644 printf("saiu do recv, bufId = %d\n", bufid);
00645
00646 if (bufid <= 0) {
00647
00648
00649
00650 if (recievedSignal) {
00651 time_t now = time(NULL);
00652 struct tm brokenNow;
00653
00654 localtime_r(&now, &brokenNow);
00655
00656 fprintf(stderr, "%4d/%2d/%2d %2d:%2d:%2d; ERROR; Anthill Manager; the only one; Recieved signal %d",
00657 brokenNow.tm_year, brokenNow.tm_mon, brokenNow.tm_mday,
00658 brokenNow.tm_hour, brokenNow.tm_min, brokenNow.tm_sec,
00659 recievedSignal);
00660
00661
00662 killAllFilters(layout);
00663 exit(1);
00664 }
00665
00666
00667
00668
00669 continue;
00670 }
00671
00672 pvm_bufinfo(bufid, &szMsg, &msgTag, &inst_tid);
00673
00674 switch (msgTag){
00675 case (MSGT_EOW): {
00676
00677 int instance = -1;
00678 FilterSpec *pFilter = NULL;
00679
00680 getFilterByTid(layout, inst_tid, &pFilter, &instance);
00681
00682 if ((pFilter != NULL) && (instance != -1)){
00683 printf("Manager.c: EOW received from %s, instance %d\n",
00684 pFilter->name, instance);
00685 } else {
00686 fprintf(stderr, "Manager.c: unknown EOW received! Shouldnt get here!\n");
00687 }
00688 numEowsReceived++;
00689 break;
00690 }
00691 case (MSGT_AEXIT):
00692 case (MSGT_FERROR): {
00693
00694
00695 char *message = (char*)malloc(sizeof(char)*szMsg+1);
00696 pvm_upkbyte(message, szMsg, 1);
00697 message[szMsg] = '\0';
00698
00699
00700 FilterSpec *fp = NULL;
00701 int instance = -1;
00702 getFilterByTid(layout, inst_tid, &fp, &instance);
00703
00704 if (msgTag == MSGT_AEXIT){
00705 printf("Manager.c: Filter %s, instance %d(tid %x) called dsExit: %s\n",
00706 fp->name, instance, inst_tid, message);
00707 } else {
00708 printf("Manager.c: Filter %s error, instance %d(tid %x) called exit: %s\n",
00709 fp->name, instance, inst_tid, message);
00710 }
00711 free(message);
00712
00713 killAllFilters(layout);
00714 exit(-1);
00715 break;
00716 }
00717
00718 case (MSGT_TEXIT): case (MSGT_HDEL): {
00719
00720 if (remainingReconfs <= 0){
00721
00722 fprintf(stderr, "Manager.c: max reconfigurations reached, aborting...\n");
00723 reconf = 0;
00724
00725
00726 killAllFilters(layout);
00727 exit(-1);;
00728
00729 }
00730
00731 remainingReconfs--;
00732 reconf = 1;
00733
00734 int notifiesRecv = 1;
00735 int deadFilterTid = -1;
00736 FilterSpec *pFilter = NULL;
00737 int instanceDead = -1;
00738
00739
00740 int info = pvm_upkint(&deadFilterTid, 1, 1);
00741 if (info < 0) pvm_perror("Manager.c: error calling pvm_upkint");
00742
00743
00744 getFilterByTid(layout, deadFilterTid, &pFilter, &instanceDead);
00745
00746 if((pFilter != NULL) && (instanceDead != -1)){
00747 if (msgTag == MSGT_TEXIT) {
00748 fprintf(stderr, "Manager.c: filter %s: instance %d (tid t%x) of %d is dead!!!\n",
00749 pFilter->name, instanceDead, deadFilterTid, pFilter->filterPlacement.numInstances);
00750 } else {
00751 fprintf(stderr, "Manager.c: filter %s: instance %d (tid t%x) of %d machine's crashed!!!\n",
00752 pFilter->name, instanceDead, deadFilterTid, pFilter->filterPlacement.numInstances);
00753 }
00754 }
00755 printf("Manager.c: starting reconfiguration\n");
00756
00757
00758 killAllFilters(layout);
00759
00760 if (msgTag == MSGT_HDEL) {
00761
00762 replaceCrashedHost(layout, pFilter, instanceDead);
00763 }
00764
00765
00766
00767
00768 while (notifiesRecv < totalEows) {
00769 int newMsgTag = -1;
00770 bufid = pvm_recv(-1, MSGT_TEXIT);
00771 info = pvm_bufinfo(bufid, NULL, &newMsgTag, &inst_tid);
00772 info = pvm_upkint(&deadFilterTid, 1, 1);
00773 if (info < 0) pvm_perror("Manager.c: error calling pvm_upkint");
00774
00775 fprintf(stderr, "Manager.c: WARNING: received notification (tag %d) about pvm tid t%x death\n", newMsgTag, deadFilterTid);
00776 notifiesRecv++;
00777 }
00778
00779 while (pvm_probe(-1, MSGT_HDEL) > 0) {
00780 int newMsgTag = -1;
00781 bufid = pvm_recv(-1, MSGT_HDEL);
00782 info = pvm_bufinfo(bufid, NULL, &newMsgTag, &inst_tid);
00783 info = pvm_upkint(&deadFilterTid, 1, 1);
00784 if (info < 0) pvm_perror("Manager.c: error calling pvm_upkint");
00785
00786 fprintf(stderr, "Manager.c: WARNING: received notification (tag %d) about pvm tid t%x machine's crash\n", newMsgTag, deadFilterTid);
00787
00788
00789 FilterSpec *pCrashedFilter = NULL;
00790 int crashedInstance = -1;
00791 getFilterByTid(layout, deadFilterTid, &pCrashedFilter, &crashedInstance);
00792 replaceCrashedHost(layout, pCrashedFilter, crashedInstance);
00793 }
00794
00795
00796 spawnAllFilter(layout);
00797
00798 sendFiltersData(layout);
00799
00800 numEowsReceived = 0;
00801 break;
00802 }
00803 #ifdef VOID_TERM
00804
00805 case (MSGT_LOCALTERM): {
00806 int localTermTag;
00807 pvm_upkint(&localTermTag, 1, 1);
00808 verifyGlobalTermination(inst_tid, localTermTag);
00809 break;
00810 }
00811 #endif
00812 default: {
00813 fprintf(stderr, "Manager.c: error receiving EOW, unknown tag!!!\n");
00814 }
00815 }
00816 if((msgTag == MSGT_TEXIT) || (msgTag == MSGT_HDEL)) {
00817
00818 break;
00819 }
00820 }
00821 } while(reconf == 1);
00822
00823 printf("Manager.c: Work ended\n\n");
00824 return 0;
00825 #endif
00826 }
00827
00828
00829
00830 int finalizeDs(Layout *layout) {
00831 #ifdef NO_BARRIER
00832
00833 #else
00834 int i;
00835
00836
00837
00838 pvm_initsend(PvmDataRaw);
00839 int tipo_msg = MSGT_EOF;
00840 pvm_pkint(&tipo_msg, 1, 1);
00841
00842 for(i = 0; i < layout->numFilters; i++) {
00843 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00844 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00845 }
00846 destroyLayout(layout);
00847 pvm_exit();
00848 return 0;
00849 #endif
00850 }
00851
00852 #endif