Manager.c

Go to the documentation of this file.
00001 #ifndef MANAGER_C
00002 #define MANAGER_C
00003 
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <strings.h>
00007 #include <signal.h>
00008 #include <assert.h>
00009 
00010 #include "Messages.h"
00011 #include "Manager.h"
00012 #include "Layout.h"
00013 #include "parser.h"
00014 #include "FilterSpec.h"
00015 #include "FilterData/Policies.h"
00016 #include "FilterData/Ports.h"
00017 #include "FilterData/FilterData.h"
00018 #include "FilterData/Termination.h"
00019 #include "constants.h"
00020 #include <TaskIdList/TaskIdList.h>
00021 #ifdef VOID_TRACER
00022 struct timeval trcInitTime;
00023 #endif
00024 
00025 
00026 
00027 // $Id: Manager.c 1584 2005-11-21 15:25:31Z rui $
00028 /*
00029  * 12/07/2004 Coutinho: void doesn't call filterFactory anymore. Now we
00030  *                      use dynamic filter library loading.
00031  * 14/07/2004 Coutinho: initDatacutter was split in initManager (if is the 
00032  *                      manager) and runFilter (if is a filter).
00033  * 21/07/2004 Matheus: adding the shell as executble, to avoid problems with
00034  *                      LD_LIBRARY_PATH
00035  * 23/07/2004 Matheus: console no more, now is Manager.c and Manager.h
00036  * 03/09/2004 Matheus: fazer os filtors nao lerem o conf.xml
00037  * 
00038  */
00039 
00040 /* Static functions, not exported, should be declared here */
00041 int recievedSignal = 0;
00042 
00043 
00044 /// Set recievedSigterm to 1, indicating that we recieved a TERM signal and
00045 /// must shutdown the application.
00046 void handleSigterm(int signal) {
00047         printf("%s(%d): Manager recieved signal %d, exiting.\n", __FILE__, __LINE__, signal);
00048         recievedSignal = signal;
00049 }
00050 
00051 
00052 /** Spawn all filters of the layout. Layout structure must be set.
00053  * \param layout Pointer to the system layout
00054  */
00055 static void spawnAllFilter(Layout *layout){
00056         int i;
00057         for(i = 0; i < layout->numFilters; i++) {
00058                 FilterSpec *pfilter = layout->filters[i];
00059 
00060                 //spawn all instances of this filter
00061                 printf("Manager.c: spawning filter %s instances\n", pfilter->name);
00062                 if (fsSpawnInstances(pfilter, layout->command, layout->argvSpawn) == -1){
00063                         printf("manager.c: error spawning filter %s, aborting\n", pfilter->name);
00064                         exit(-1);
00065                 }
00066         }
00067         printf("Manager.c: all filters spawned successfully\n");
00068 }
00069 
00070 
00071 /** Kills all filters in the void pipeline. Used by manager to handle faults
00072  *  and to abort aplication execution.
00073  * \param layout the system layout
00074  */
00075 static void killAllFilters(Layout *layout){
00076         int x, y;
00077         for(x = 0; x < layout->numFilters; x++) {
00078                 for(y = 0; y < layout->filters[x]->filterPlacement.numInstances; y++) {
00079                         if (pvm_kill(layout->filters[x]->filterPlacement.tids[y]) < 0) {
00080                                 fprintf(stderr, "%s(%d): Error killing filter %s instance: %d (pvm tid:t%x)\n", 
00081                                                 __FILE__, __LINE__, layout->filters[x]->name, y, 
00082                                                 layout->filters[x]->filterPlacement.tids[y]);
00083                         }
00084                 }
00085         }
00086 }
00087 
00088 /** when we receive a signal, we call this function 
00089  * \param signal the signal we captured
00090 */
00091 /*static void captureSignal(int signal){
00092         // The manager needs receive all signals...
00093         if (I_AM_THE_MANAGER ){
00094                 //exit signaling
00095                 pvm_halt();
00096                 printf("Manager.c: received signal %d, bailing...\n", signal);
00097                 exit(128 + signal);
00098         }
00099         else {
00100                 //send the signal to the manager
00101                 pvm_sendsig(pvm_parent(),signal);
00102         
00103                 pvm_exit();
00104                 exit(128+signal);
00105         }
00106 }
00107 */
00108 /** Given a tid, returns a pointer to the filter which is using it
00109  * \param layout the layout
00110  * \param tid the tid we are looking for
00111  * \param pFilterAddress the address of the filter pointer, so we can return the filter here
00112  * \param instanceAddress we return the instance holding the tid in the variable pointed by this address
00113  */
00114 /*static void getFilterByTid(Layout *layout, int tid, FilterSpec **pFilterAddress, int *instanceAddress){
00115         int z, w;
00116         FilterSpec *pFilter = NULL;
00117         instanceAddress[0] = -1;
00118         //find who sent the EOW
00119         for (z=0; z < layout->numFilters; z++){
00120                 pFilter = layout->filters[z];
00121                 for (w=0; w < pFilter->filterPlacement.numInstances; w++){
00122                         if (pFilter->filterPlacement.tids[w] == tid){
00123                                 instanceAddress[0] = w;
00124                                 pFilterAddress[0] = pFilter;
00125                                 return;
00126                         }
00127                 }
00128         }
00129 }*/
00130 
00131 /** This sends filter data to all filters we have in the layout 
00132  * \param layout the void layout
00133  */
00134 static void sendFiltersData(Layout *layout){
00135         FilterSpec *pFilter;
00136         int i, j, k;
00137         printf("Manager.c: sending filter data now\n");
00138 
00139 #ifdef VOID_INST
00140         //build instrumentation directory(a string like 1-1-1, which is the number of instances of all filters)
00141         //this is the same for all filters, so we build here and send inside the loop
00142         char instDir[MAX_IDIR_LENGTH];
00143         sprintf(instDir, "%s/", INST_DIR);
00144         for (i=0; i < layout->numFilters-1; i++){
00145                 if (strlen(instDir) >= (MAX_IDIR_LENGTH - 3)){
00146                         //dont want to overflow this array
00147                         fprintf(stderr, "%s %d: warning, instrumentation directory name too big, truncating to %s\n", __FILE__, __LINE__, instDir);
00148                         break;
00149                 }
00150                 sprintf(instDir, "%s%d-", instDir, layout->filters[i]->filterPlacement.numInstances);
00151         }
00152         sprintf(instDir, "%s%d", instDir, layout->filters[layout->numFilters-1]->filterPlacement.numInstances);
00153 #endif
00154 
00155         for(i = 0; i < layout->numFilters; i++) {
00156                 pFilter = layout->filters[i];
00157 
00158                 //for each instance of the filter, we send its data
00159                 for (j=0; j < pFilter->filterPlacement.numInstances; j++){
00160                         int l1, l2, l3, l4, l5, l6, l7, res;
00161 
00162                         //we send only one message with all information inside
00163                         pvm_initsend(PvmDataRaw);
00164 
00165                         //current working directory
00166                         l1 = strlen(layout->cwd);
00167                         pvm_pkint(&l1, 1, 1);
00168                         pvm_pkbyte(layout->cwd, l1, 1);
00169 
00170                         //send filterID
00171                         pvm_pkint(&i, 1, 1);
00172                         // rank
00173                         pvm_pkint(&j, 1, 1);
00174                         // total number of instances of this filter
00175                         pvm_pkint(&pFilter->filterPlacement.numInstances, 1, 1);
00176                         //send all tids of this filter
00177                         pvm_pkint(pFilter->filterPlacement.tids, pFilter->filterPlacement.numInstances, 1);
00178 
00179                         // filtername
00180                         l2 = strlen(pFilter->name);
00181                         pvm_pkint(&l2, 1, 1);
00182                         pvm_pkbyte(pFilter->name, l2, 1);
00183 
00184                         //machine memory
00185                         HostsStruct *h = layout->hostsStruct;
00186                         int hostIndex = hostsGetIndexByName(h, pFilter->filterPlacement.hosts[j]);
00187                         int memory = hostsGetMemory(h, hostIndex);
00188                         pvm_pkint(&memory, 1, 1);
00189 
00190                         //how many brothers do I have in this machine?(used for memory management)
00191                         int z, numLocalInstances = 0; // me and my brothers
00192                         for (z=0; z < pFilter->filterPlacement.numInstances; z++){
00193                                 if (strncmp(pFilter->filterPlacement.hosts[z], pFilter->filterPlacement.hosts[j], MAX_HNAME_LENGTH) == 0){
00194                                         numLocalInstances++;
00195                                 }
00196                         }
00197                         pvm_pkint(&numLocalInstances, 1, 1);
00198 
00199 #ifdef VOID_INST
00200                         int lInst;
00201 
00202                         //send instrumentation directory
00203                         lInst = strlen(instDir);
00204                         pvm_pkint(&lInst, 1, 1);
00205                         pvm_pkbyte(instDir, lInst, 1);                          
00206 #endif
00207                         // shared lib name
00208                         l3 = strlen(pFilter->libname);
00209                         pvm_pkint(&l3, 1, 1);
00210                         pvm_pkbyte(pFilter->libname, l3, 1);
00211 
00212                         //port data
00213                         //numOutputs
00214                         pvm_pkint(&pFilter->numOutputs, 1, 1);
00215                         //numInputs
00216                         pvm_pkint(&pFilter->numInputs, 1, 1);
00217 
00218                         // OutputPorts
00219                         // for each OutputPort
00220                         for(k = 0; k < pFilter->numOutputs; k++) {
00221                                 //port data
00222                                 l4 = strlen(pFilter->outputs[k]->fromPortName);
00223                                 pvm_pkint(&l4, 1, 1);
00224                                 pvm_pkbyte(pFilter->outputs[k]->fromPortName, l4, 1);  //portname
00225 
00226                                 //number of tids & tids
00227                                 pvm_pkint(&pFilter->outputs[k]->toFilter->filterPlacement.numInstances, 1, 1);
00228 
00229                                 pvm_pkint(pFilter->outputs[k]->toFilter->filterPlacement.tids, 
00230                                                 pFilter->outputs[k]->toFilter->filterPlacement.numInstances, 1); //tids
00231                                 //stream tag
00232                                 pvm_pkint(&pFilter->outputs[k]->tag, 1, 1);
00233 
00234                                 //write policy name
00235                                 l5 = strlen(pFilter->outputs[k]->writePolicyName);
00236                                 pvm_pkint(&l5, 1, 1);
00237                                 pvm_pkbyte(pFilter->outputs[k]->writePolicyName, l5, 1);
00238 
00239                                 writePolicy_t wp = getWritePolicyByName(pFilter->outputs[k]->writePolicyName);
00240 
00241                                 // send labeled stream libname if policy is LS
00242                                 if ( (wp == LABELED_STREAM) || (wp == MULTICAST_LABELED_STREAM) ){
00243                                         l6 = strlen(pFilter->outputs[k]->lsLibName);
00244                                         pvm_pkint(&l6, 1, 1);
00245                                         pvm_pkbyte(pFilter->outputs[k]->lsLibName, l6, 1);
00246                                 }
00247                                 else {
00248                                         //if not LS, we needa know who will be the first instance to receive msgs
00249                                         //else we can create hotspots
00250                                         //we use the rank % number of receiving instances
00251                                         res = j % pFilter->outputs[k]->toFilter->filterPlacement.numInstances;
00252                                         pvm_pkint(&res, 1, 1);
00253                                 }
00254                         }
00255 
00256                         // InputPorts
00257                         // foreach InputPort
00258                         for(k = 0; k < pFilter->numInputs; k++) {
00259                                 //portName
00260                                 l7 = strlen(pFilter->inputs[k]->toPortName);
00261                                 pvm_pkint(&l7, 1, 1);
00262                                 pvm_pkbyte(pFilter->inputs[k]->toPortName, l7, 1);
00263 
00264                                 //number of tids we listen to
00265                                 pvm_pkint(&pFilter->inputs[k]->fromFilter->filterPlacement.numInstances, 1, 1); // number of instances
00266                                 pvm_pkint(pFilter->inputs[k]->fromFilter->filterPlacement.tids, 
00267                                                 pFilter->inputs[k]->fromFilter->filterPlacement.numInstances, 1); //the tids of the instances
00268                                 pvm_pkint(&pFilter->inputs[k]->tag, 1, 1); //the stream tag
00269                         }
00270                         pvm_send(pFilter->filterPlacement.tids[j], 0);
00271                 }
00272         }
00273 #ifdef VOID_TRACER
00274 
00275 #define sub_times( a, b, result ) if ( a.tv_usec < b.tv_usec ){ result.tv_usec = 1000000 + a.tv_usec - b.tv_usec; result.tv_sec = (a.tv_sec - 1) - b.tv_sec; } else { result.tv_usec = a.tv_usec - b.tv_usec; result.tv_sec = a.tv_sec - b.tv_sec; };
00276 
00277 #define add_times( a, b, result ) result.tv_usec = a.tv_usec + b.tv_usec; if ( result.tv_usec > 1000000 ){ result.tv_usec -= 1000000; result.tv_sec = a.tv_sec + b.tv_sec + 1; } else { result.tv_sec = a.tv_sec + b.tv_sec; };
00278 
00279         // Clock Synchronization
00280         
00281         struct timeval time, ohtime, t3;
00282         
00283         fprintf( stderr, "Manager.c: syncing clocks (initTime: %lu):\n", trcInitTime.tv_usec + ( trcInitTime.tv_sec * 1000000 ) );
00284         for(i = 0; i < layout->numFilters; i++) 
00285         {
00286                 pFilter = layout->filters[i];
00287                 for (j=0; j < pFilter->filterPlacement.numInstances; j++)
00288                 {
00289                         // Estimating computing time
00290                         gettimeofday( &time, NULL );
00291                         pvm_initsend( PvmDataRaw );
00292                         gettimeofday( &t3, NULL );
00293                         sub_times( t3, trcInitTime, t3);
00294                         add_times( t3, ohtime, t3 );
00295                         pvm_pkint( (int*)(&(t3.tv_sec)), 1, 1 );
00296                         pvm_pkint( (int*)(&(t3.tv_usec)), 1, 1 );
00297                         gettimeofday( &ohtime, NULL );
00298                         // Calculating overhead time
00299                         sub_times( ohtime, time, ohtime );
00300                         // End ( simulation )
00301 
00302                         pvm_initsend( PvmDataRaw );
00303                         pvm_pkint( &j, 1, 1 );
00304                         pvm_send( pFilter->filterPlacement.tids[j], 666 );
00305                         pvm_recv( pFilter->filterPlacement.tids[j], 666 );
00306                         pvm_initsend( PvmDataRaw );
00307                         gettimeofday( &t3, NULL );
00308                         sub_times( t3, trcInitTime, t3);
00309                         add_times( t3, ohtime, t3 );
00310                         pvm_pkint( (int*)(&(t3.tv_sec)), 1, 1 );
00311                         pvm_pkint( (int*)(&(t3.tv_usec)), 1, 1 );
00312                         pvm_send( pFilter->filterPlacement.tids[j], 666 );
00313                         fprintf( stderr, "\t%d.%d: %d,%.6d secs\n", i, j, (int)t3.tv_sec, (int)t3.tv_usec );
00314                 }
00315         }
00316         fprintf( stderr, "Manager.c: synchronization done.\n" );
00317 
00318 #endif
00319 
00320 }
00321 
00322 
00323 /// Init a pipeline manager. Anthill function.
00324 ///     \param confFile XML file configuration, NULL if we want to make things by hand
00325 ///     \param argc argc received by main function that will be forwarded to filter processes 
00326 ///     \param argv argv received by main function that will be forwarded to filter processes
00327 ///     \return Layout of the pipeline
00328 /// \note Thin function installs a SIGTERM handler, overriding it can make 
00329 ///       Anthill unable to cancel a running application.
00330 static Layout *initManager(char *confFile, int argc, char **argv) {
00331         char hostname[50];
00332         int i,j;
00333         int numFilterInstances = 0;
00334 
00335         //the manager pointer
00336         Layout *layout;
00337         layout = createLayout();
00338         layout->argvSpawn = (char **)malloc(sizeof(char *) * (argc));
00339 
00340         // my hostname
00341         gethostname(hostname, 50);
00342         fprintf(stderr, "manager: pvm tid:t%x hostname:%s\n", pvm_mytid(), hostname); 
00343 
00344         // read XML config file
00345         printf("\n====================================\n");
00346         printf("Manager.c: start parsing the file...\n");
00347         if (readConfig(confFile, layout) == -1){
00348                 printf("Manager.c: parse error, aborting\n");
00349                 exit(1);
00350         }
00351         printf("Manager.c: parser ended successfully\n");
00352         printf("====================================\n\n");
00353 
00354         // for each filter, we spawn the children process, but dont send any data yet
00355         for(i = 0; i < layout->numFilters; i++) {
00356                 FilterSpec *pFilter = layout->filters[i];
00357                 
00358                 // Copies argv to layout->argvSpawn, except argv[0]
00359                 for (j=0; j<argc-1; j++) {
00360                         layout->argvSpawn[j] = argv[j+1];
00361                 }
00362                 layout->argvSpawn[argc-1] = NULL;
00363 
00364                 //spawn all instances of this filter
00365                 printf("Manager.c: spawning filter %s instances\n",
00366                         pFilter->name);
00367 
00368                 if (fsSpawnInstances(pFilter, layout->command, layout->argvSpawn) == -1){
00369                         printf("Manager.c: error spawning filter %s, aborting\n", pFilter->name);
00370                         exit(1);
00371                 }
00372 
00373                 numFilterInstances += pFilter->filterPlacement.numInstances;
00374         }
00375 
00376 #ifdef VOID_TRACER
00377         fprintf(stderr, "Manager.c: Getting initial time ( for time synchronization ).\n");
00378         gettimeofday( &trcInitTime, NULL );
00379 #endif
00380 
00381         printf("All process started, sending data now....\n\n");
00382         sendFiltersData(layout);
00383 
00384 #ifdef VOID_TERM
00385         gt = initGlobalTermination(layout, numFilterInstances);
00386 #endif
00387         
00388         return layout;
00389 }
00390 
00391 /* End static funcions ***********************/
00392 
00393 /* Here begins the user functions */
00394 
00395 /// Initialize the manager internal structs and the filters.
00396 ///     \param confFile XML file configuration, NULL if we want to make things by hand
00397 ///     \param argc argc received by main function that will be forwarded to filter processes 
00398 ///     \param argv argv received by main function that will be forwarded to filter processes
00399 ///     \return Layout of the pipeline
00400 Layout *initDs(char *confFile, int argc, char **argv) {
00401         int err;
00402 
00403         //configura pvm pra enviar dado diretamente(IMPORTANTE!!!!)
00404         pvm_setopt(PvmRoute, PvmRouteDirect);
00405 
00406         //err == 0, OK
00407         err = pvm_start_pvmd(0, NULL, 0);
00408         switch (err){
00409                 case PvmSysErr:
00410                         printf("Manager.c: error starting PVM, aborting\n");
00411                         exit(1);
00412                 break;
00413         }
00414 
00415         //start random number generator
00416         srandom(getpid());      
00417         
00418         // config DS to use signals
00419 //      signal(SIGSEGV, &captureSignal);
00420 //      signal(SIGINT, &captureSignal);
00421 //      signal(SIGILL, &captureSignal);
00422 //      signal(SIGHUP, &captureSignal);
00423 //      signal(SIGTERM, &captureSignal);
00424 
00425         if (I_AM_THE_MANAGER) {         
00426                 return initManager(confFile, argc, argv);
00427         } else {
00428                 runFilter();
00429                 exit(0);
00430         }
00431 }
00432 
00433 /* Compare 2 task ids (integers)*/
00434 int compareTaskId(const void *a, const void *b) {
00435         if (( *(int *)a) < ( *(int *)b)) return -1;
00436         if (( *(int *)a) > ( *(int *)b)) return  1;
00437         return 0;
00438 }
00439 
00440 
00441 
00442 int replaceCrashedHost(Layout *layout, FilterSpec *pCrashedFilter, int crashedInstance) {
00443         HostsStruct *pHostStruct = layout->hostsStruct;
00444 
00445         char *crashedHostName = pCrashedFilter->filterPlacement.hosts[crashedInstance];
00446         int hostDeletedIdx = hostsGetIndexByName(pHostStruct, crashedHostName);
00447         if(strcmp(crashedHostName, hostsGetName(pHostStruct, hostDeletedIdx))) {
00448 
00449                 //should never get here
00450                 assert(0);
00451                 
00452                 printf("Manager.c: Problem detecting dead host. PVM detected: %s. Void detected: %s.\n", 
00453                                 crashedHostName, hostsGetName(pHostStruct, hostDeletedIdx));
00454                 exit(-1);
00455         }
00456         
00457         // Change the dead host status
00458         hostsSetStatus(pHostStruct, hostDeletedIdx, NOTAVAIL);
00459         
00460         // we only need to replace the host in the current instance because for
00461         // each instance in the dead host, we will recieve a notification
00462         /// \todo Choose host respecting filter demsnds
00463         /// \todo change hostsGetIndex() 
00464         int newHostIdx = hostsGetIndex(pHostStruct);
00465         pCrashedFilter->filterPlacement.hosts[crashedInstance] =  hostsGetName(pHostStruct, newHostIdx);
00466 
00467         //find all filters using the dead host and replace the host
00468 /*      for(i = 0; i < layout->numFilters; i++) {               
00469                 FilterPlacement *pfilterPlac = &(layout->filters[i]->filterPlacement);
00470                 for(j = 0; j < pfilterPlac->numInstances; j++) {
00471                         if(strcmp(pfilterPlac->hosts[j], crashedHostName) == 0) {
00472                                 int newHostIdx = hostsGetIndex(pHostStruct);
00473                                 pfilterPlac->hosts[j] =  hostsGetName(pHostStruct, newHostIdx);
00474                         }
00475                 }
00476         }*/
00477         
00478         return 0;
00479 }
00480 
00481 
00482 /// user function:add a new query to a pipeline of filters. Called by manager.
00483 ///     \param layout System Layout.
00484 ///     \param work Buffer with a Unit of Work (UoW)
00485 ///     \param workSize Unit of Work Size (UoW)
00486 ///     \return Zero on success, -1 on error.
00487 int appendWork(Layout *layout, void *work, unsigned int workSize){
00488 #ifdef NO_BARRIER
00489 
00490         // sends work for each filter
00491         pvm_initsend(PvmDataRaw);
00492         // First tell that is a mensage of WORK
00493         int msgType = MSGT_WORK;
00494         pvm_pkint(&msgType, 1, 1);
00495         //then attach the work to it
00496         pvm_pkbyte((char *)work, workSize, 1);
00497 
00498         // for each filter, send his work
00499         for(i = 0; i < layout->numFilters; i++) {
00500                 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00501                 // sends work to all filters of this set
00502                 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00503         }
00504 
00505 #else
00506         int i,j;
00507         int totalEows = 0, numEowsReceived;
00508         int reconf = 0 /* should we reconfigure? */, remainingReconfs = 3; //how many times should we try?
00509 
00510         //before sending, we check if we received any filter error
00511         int bufid = pvm_probe(-1, MSGT_FERROR);
00512         if (bufid != 0){
00513                 int bytes, tag, tid;
00514                 char *msg;
00515                 pvm_bufinfo(bufid, &bytes, &tag, &tid);
00516                 msg = (char*)malloc(bytes+1);
00517                 pvm_recv(tid, MSGT_FERROR);
00518                 pvm_upkbyte(msg, bytes, 1);
00519                 msg[bytes] = '\0';
00520 
00521                 fprintf(stderr, "Manager.c: Error, received death notification\n");
00522                 fprintf(stderr, "Manager.c: %s\n", msg);
00523                 free(msg);
00524                 killAllFilters(layout);
00525                 exit(-1);
00526         }
00527 
00528         printf("Manager.c: starting work...\n");
00529 
00530         // number of EOWs we expect to receive
00531         for(i = 0; i < layout->numFilters; i++) {
00532                 totalEows += layout->filters[i]->filterPlacement.numInstances;
00533         }
00534 
00535         //we stay in this loop while we have to reconfigure
00536         //usually, this will be only one time, unless a we get some reconf message
00537         do{
00538 
00539                 // sends work for each filter
00540                 pvm_initsend(PvmDataRaw);
00541                 int msgType;
00542 #ifdef VOID_FT
00543                 if(!reconf){
00544 #endif
00545                         // First tell that is a mensage of WORK
00546                         msgType = MSGT_WORK;
00547 #ifdef VOID_FT
00548                 } else {
00549                         // one fault has occurred
00550                         msgType = MSGT_FT;
00551                 }
00552 #endif
00553                 pvm_pkint(&msgType, 1, 1);
00554                 //then attach the work to it
00555                 pvm_pkbyte((char *)work, workSize, 1);
00556 
00557                 reconf = 0; //we are optimistic, always expect to not reconfigure
00558 
00559 
00560                 // for each filter, send his work
00561                 for(i = 0; i < layout->numFilters; i++) {
00562                         FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00563                         // sends work to all filters of this set
00564                         pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00565                 }
00566 
00567 
00568                 TaskIdList *finalTaskIdList = NULL, *currentTaskIdList;
00569                 int filtersThatUseTasks = 0;
00570                 // Manager receives filter's terminated tasks list
00571                 for(i = 0; i < layout->numFilters; i++) {
00572                         FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00573                         for(j = 0; j < pFilterP->numInstances; j++) {
00574                                 int instanceUseTasks = -1;
00575                                 
00576                                 // Get is this filter use tasks
00577                                 pvm_recv(pFilterP->tids[j], 0);
00578                                 pvm_upkint(&instanceUseTasks, 1, 1);
00579                                 layout->filters[i]->useTasks = instanceUseTasks;
00580                                 
00581 #ifdef VOID_FT
00582                                 if (instanceUseTasks) {
00583                                         currentTaskIdList = (TaskIdList *)unpackTaskIdList();
00584 
00585                                         //      Para fazer intersecao, gerente ordenar? as listas de tarefas recebidas e utilizar? a fun??o meet() do CrazyMiner/ID3.
00586                                         qsort(currentTaskIdList->vetor, currentTaskIdList->size, sizeof(int), compareTaskId);                                   
00587                                         if(finalTaskIdList == NULL) {
00588                                                 finalTaskIdList = currentTaskIdList;                                            
00589                                         } else {                                                
00590                                                 //      Manager makes the intersection of all finished tasks lists 
00591                                                 finalTaskIdList = taskIdListIntersection(finalTaskIdList, currentTaskIdList);
00592                                                 taskIdListDestroy(currentTaskIdList);
00593                                         }
00594                                 }
00595 #endif                                  
00596                                                                         
00597                         } // for
00598                         
00599                         if (layout->filters[i]->useTasks) filtersThatUseTasks++;
00600                 }
00601                 
00602                 //      Gerente devolve resultado das intersecoes para todas as instancias de todos os filtros.
00603                 for(i = 0; i < layout->numFilters; i++) {
00604                         FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00605                         
00606                         int needForwardTaskMessages = 1;
00607                         if (filtersThatUseTasks < 2) needForwardTaskMessages = 0; 
00608 
00609 #ifdef VOID_FT                  
00610                         if (layout->filters[i]->useTasks) {
00611                                 // Send if they should forward task messages 
00612                                 // and pigback :-) the final task id list
00613                                 pvm_initsend(PvmDataDefault);
00614                                 pvm_pkint(&needForwardTaskMessages, 1, 1);
00615                                 
00616                                 packTaskIdList(finalTaskIdList);
00617                                 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00618                         } else {
00619 #endif                          
00620                                 // Only send if they should forward task messages
00621                                 pvm_initsend(PvmDataDefault);
00622                                 pvm_pkint(&needForwardTaskMessages, 1, 1);
00623                                 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00624 #ifdef VOID_FT                  
00625                         }
00626 #endif
00627                 }               
00628                 taskIdListDestroy(finalTaskIdList);
00629 
00630 
00631 
00632                 //now we receive the EOWs
00633                 numEowsReceived = 0;
00634 
00635                 //now we expect to receive EOW or errors
00636                 while(numEowsReceived < totalEows){
00637                         //we are open to receive anything from anyone here
00638                         //all messages to the manager should be tagged, so we now their type
00639                         struct timeval timeout = {MANAGER_MAIN_RECV_TIMEOUT, 0}; // put this on the stack, because stack is more cache friendly
00640                         int szMsg = -1;
00641                         int inst_tid = -1;
00642                         int msgTag = -1;
00643                         int bufid = pvm_trecv(-1, -1, &timeout);
00644                         printf("saiu do recv, bufId = %d\n", bufid);
00645 
00646                         if (bufid <= 0) {
00647                                 // trecv had timeout, we don't recieved nothing
00648                                 
00649                                 // verify if we recieved a signal
00650                                 if (recievedSignal) {
00651                                         time_t now = time(NULL);
00652                                         struct tm brokenNow;
00653                                         
00654                                         localtime_r(&now, &brokenNow);
00655                                         // print error message to tamandua log on stderr
00656                                         fprintf(stderr, "%4d/%2d/%2d %2d:%2d:%2d; ERROR; Anthill Manager; the only one; Recieved signal %d",
00657                                                 brokenNow.tm_year, brokenNow.tm_mon, brokenNow.tm_mday,
00658                                                 brokenNow.tm_hour, brokenNow.tm_min, brokenNow.tm_sec,
00659                                                 recievedSignal);
00660                                         
00661                                         // shutdown everything
00662                                         killAllFilters(layout);
00663                                         exit(1);                                        
00664                                 }
00665                                 
00666                                 // XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
00667                                 // XXX ATTENTION: Restarting loop here!!! XXX 
00668                                 // XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
00669                                 continue;
00670                         }
00671                         
00672                         pvm_bufinfo(bufid, &szMsg, &msgTag, &inst_tid);
00673 
00674                         switch (msgTag){
00675                                 case (MSGT_EOW): {
00676                                         //received EOW, expect this usually
00677                                         int instance = -1;
00678                                         FilterSpec *pFilter = NULL;
00679 
00680                                         getFilterByTid(layout, inst_tid, &pFilter, &instance);
00681 
00682                                         if ((pFilter != NULL) && (instance != -1)){
00683                                                 printf("Manager.c: EOW received from %s, instance %d\n", 
00684                                                                 pFilter->name, instance);
00685                                         } else {
00686                                                 fprintf(stderr, "Manager.c: unknown EOW received! Shouldnt get here!\n");
00687                                         }
00688                                         numEowsReceived++;
00689                                         break;
00690                                 }
00691                                 case (MSGT_AEXIT):
00692                                 case (MSGT_FERROR): {
00693                                         //someone called dsExit or system error at the filter side
00694                                         //common cause for this are library not found, wrong initscritpt etc
00695                                         char *message = (char*)malloc(sizeof(char)*szMsg+1);
00696                                         pvm_upkbyte(message, szMsg, 1);
00697                                         message[szMsg] = '\0';
00698 
00699                                         //the filter and the instance
00700                                         FilterSpec *fp = NULL;
00701                                         int instance = -1;
00702                                         getFilterByTid(layout, inst_tid, &fp, &instance);
00703 
00704                                         if (msgTag == MSGT_AEXIT){
00705                                                 printf("Manager.c: Filter %s, instance %d(tid %x) called dsExit: %s\n",
00706                                                                 fp->name, instance, inst_tid, message);
00707                                         } else {
00708                                                 printf("Manager.c: Filter %s error, instance %d(tid %x) called exit: %s\n",
00709                                                                 fp->name, instance, inst_tid, message);
00710                                         }
00711                                         free(message);
00712                                         // kill all instances
00713                                         killAllFilters(layout);
00714                                         exit(-1);
00715                                         break;
00716                                 }
00717                                 //task exited or host crashed
00718                                 case (MSGT_TEXIT): case (MSGT_HDEL): {
00719                                         //we only reconfigure a fixed number of times
00720                                         if (remainingReconfs <= 0){
00721                                                 //max number of reconfigurations reached... aborting
00722                                                 fprintf(stderr, "Manager.c: max reconfigurations reached, aborting...\n");
00723                                                 reconf = 0;
00724 
00725                                                 // kill all instances which might be alive
00726                                                 killAllFilters(layout);
00727                                                 exit(-1);;
00728 
00729                                         }
00730 
00731                                         remainingReconfs--;
00732                                         reconf = 1;
00733                                         // In case of pvm notification, inst_tid will be t80000000
00734                                         int notifiesRecv = 1; // We are receiving the first death notification
00735                                         int deadFilterTid = -1;
00736                                         FilterSpec *pFilter = NULL;
00737                                         int instanceDead = -1;
00738 
00739                                         // Get the tid and name of the dead filter
00740                                         int info = pvm_upkint(&deadFilterTid, 1, 1);
00741                                         if (info < 0) pvm_perror("Manager.c: error calling pvm_upkint");
00742 
00743                                         //discover which filter died
00744                                         getFilterByTid(layout, deadFilterTid, &pFilter, &instanceDead);
00745 
00746                                         if((pFilter != NULL) && (instanceDead != -1)){
00747                                                 if (msgTag == MSGT_TEXIT) {
00748                                                         fprintf(stderr, "Manager.c: filter %s: instance %d (tid t%x) of %d is dead!!!\n",
00749                                                                         pFilter->name, instanceDead, deadFilterTid, pFilter->filterPlacement.numInstances);
00750                                                 } else {
00751                                                         fprintf(stderr, "Manager.c: filter %s: instance %d (tid t%x) of %d machine's crashed!!!\n",
00752                                                                         pFilter->name, instanceDead, deadFilterTid, pFilter->filterPlacement.numInstances);
00753                                                 }
00754                                         }
00755                                         printf("Manager.c: starting reconfiguration\n");
00756 
00757                                         // kill all filters in the pipeline
00758                                         killAllFilters(layout);
00759 
00760                                         if (msgTag == MSGT_HDEL) {
00761                                                 //int his case, host died, so we must change layout
00762                                                 replaceCrashedHost(layout, pFilter, instanceDead);
00763                                         }
00764 
00765                                         //Flush the streams
00766                                         //receive all messages which are about to arrive till we get the death notification
00767                                         //pvm order should garantee this
00768                                         while (notifiesRecv < totalEows) {
00769                                                 int newMsgTag = -1;
00770                                                 bufid = pvm_recv(-1, MSGT_TEXIT);
00771                                                 info = pvm_bufinfo(bufid, NULL, &newMsgTag, &inst_tid);
00772                                                 info = pvm_upkint(&deadFilterTid, 1, 1);
00773                                                 if (info < 0) pvm_perror("Manager.c: error calling pvm_upkint");
00774 
00775                                                 fprintf(stderr, "Manager.c: WARNING: received notification (tag %d) about pvm tid t%x death\n", newMsgTag, deadFilterTid);
00776                                                 notifiesRecv++;
00777                                         }
00778                                         // probes for remaining machine crash notifications
00779                                         while (pvm_probe(-1, MSGT_HDEL) > 0) {
00780                                                 int newMsgTag = -1;
00781                                                 bufid = pvm_recv(-1, MSGT_HDEL);
00782                                                 info = pvm_bufinfo(bufid, NULL, &newMsgTag, &inst_tid);
00783                                                 info = pvm_upkint(&deadFilterTid, 1, 1);
00784                                                 if (info < 0) pvm_perror("Manager.c: error calling pvm_upkint");
00785                                                 
00786                                                 fprintf(stderr, "Manager.c: WARNING: received notification (tag %d) about pvm tid t%x machine's crash\n", newMsgTag, deadFilterTid);
00787 
00788                                                 // Replace the died host
00789                                                 FilterSpec *pCrashedFilter = NULL;
00790                                                 int crashedInstance = -1;
00791                                                 getFilterByTid(layout, deadFilterTid, &pCrashedFilter, &crashedInstance);
00792                                                 replaceCrashedHost(layout, pCrashedFilter, crashedInstance);
00793                                         }
00794 
00795                                         //spawn all filters again
00796                                         spawnAllFilter(layout);
00797                                         //resend the data
00798                                         sendFiltersData(layout);
00799                                         //start all over again
00800                                         numEowsReceived = 0;
00801                                         break;
00802                                 }
00803 #ifdef VOID_TERM
00804                                 // One filter instance detected local termination
00805                                 case (MSGT_LOCALTERM): {
00806                                         int localTermTag; // filter instance local termination tag
00807                                         pvm_upkint(&localTermTag, 1, 1);
00808                                         verifyGlobalTermination(inst_tid, localTermTag);
00809                                         break;
00810                                 }
00811 #endif
00812                                 default: {
00813                                         fprintf(stderr, "Manager.c: error receiving EOW, unknown tag!!!\n");
00814                                 }
00815                         } //end switch message tag
00816                         if((msgTag == MSGT_TEXIT) || (msgTag == MSGT_HDEL)) {
00817                                 // work should be sent again
00818                                 break;
00819                         }
00820                 } //end receiving eows
00821         } while(reconf == 1); //leave this loop if we will not reconfigure
00822 
00823         printf("Manager.c: Work ended\n\n");
00824         return 0;
00825 #endif
00826 }
00827 
00828 
00829 /// Finalize a Void pipeline. Only manager runs this.
00830 int finalizeDs(Layout *layout) {
00831 #ifdef NO_BARRIER
00832 
00833 #else
00834         int i;
00835 
00836         // Envia eof para todos os filtros
00837         // Primeiro envia se eh work (WORK) ou EOF (END_OF_FILTER)
00838         pvm_initsend(PvmDataRaw);
00839         int tipo_msg = MSGT_EOF;
00840         pvm_pkint(&tipo_msg, 1, 1);
00841         //sends the EOF message for all instances of the filter
00842         for(i = 0; i < layout->numFilters; i++) {
00843                 FilterPlacement *pFilterP = &(layout->filters[i]->filterPlacement);
00844                 pvm_mcast(pFilterP->tids, pFilterP->numInstances, 0);
00845         }
00846         destroyLayout(layout);
00847         pvm_exit();
00848         return 0;
00849 #endif
00850 }
00851 
00852 #endif

Generated on Tue Jan 17 19:18:38 2006 for Void by  doxygen 1.4.6