Cache.c

Go to the documentation of this file.
00001 #define _CACHE_C_
00002 
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include <unistd.h>
00006 #include <errno.h>
00007 #include <sys/stat.h>
00008 #include <sys/types.h>
00009 #include <dirent.h>
00010 #include <pvm3.h>
00011 #include <assert.h>
00012 #include "Cache.h"
00013 #include "constants.h"
00014 #include "DataSpace.h"
00015 #include "Task.h"
00016 #include "prod_cons.h"
00017 
00018 #define KEY_INT
00019 #define VAL_VOID
00020 #include "hash.h"
00021 #undef VAL_VOID
00022 #undef KEY_INT
00023 
00024 
00025 #define TASK_FILENAME_SIZE 100
00026 #define TASK_DIRNAME_SIZE ((1000+TASK_FILENAME_SIZE)*sizeof(char))
00027 
00028 
00029 Cache *cache = NULL;
00030 
00031 
00032 /// \todo move this to oher place. cser?
00033 int createDir(char *dirName) {
00034         int nameSize = strlen(dirName);
00035         char *token  = dirName;
00036         char *aux    = malloc(nameSize+1); // in the worst case aux size will be equal nameSize, plus the '\0'
00037         aux[0] = '\0';  
00038 
00039         while (token < dirName+nameSize) {
00040                 int tokenSize = strcspn(token, "/");
00041                 if (tokenSize > 0) {
00042                         strncat(aux, "/", 1);
00043                         strncat(aux, token, tokenSize);
00044                         
00045                         int status = mkdir(aux, 0777);
00046                         // if we get a error in directory creation and this error isn't directory 
00047                         // already exists, exit
00048                         if ((status==-1) && (errno!=EEXIST)) {
00049                                 char *msg = malloc(TASK_DIRNAME_SIZE+1); // +1 to the \0
00050                                 msg[TASK_DIRNAME_SIZE] = '\0';
00051                                 snprintf(msg, TASK_DIRNAME_SIZE, "Error creating tasks dir %s", aux);
00052                                 perror(msg);
00053                                 free(msg);
00054                                 
00055                                 return -1;
00056                         }                       
00057                 }
00058                 
00059                 token += tokenSize+1;
00060         }
00061         free(aux);      
00062 
00063         return 0;       
00064 }
00065 
00066 void *writerThread(void *arg) {
00067         
00068         // creates the finished tasks dir
00069         int status = createDir(cache->finishedTasksDir);
00070         if (status == -1) exit(1);
00071 
00072         // get where we will put the instance temporary directory
00073         char *prefix = getenv("VOID_CHECKPOINT_DIR");
00074         if (prefix == NULL) prefix = strdup("/tmp");
00075 
00076         // Creates the instance temporary directory. Because we called createDir(cache->finishedTasksDir)
00077         // and the finished tasks dir contains <prefix>/void.<user id>/ , the call 
00078         // to mkdtemp() should be successful.
00079         char *tmpTasksDir = (char *)malloc((TASK_DIRNAME_SIZE+1)*sizeof(char)); // +1 to the \0
00080         tmpTasksDir[TASK_DIRNAME_SIZE] = '\0';
00081         snprintf(tmpTasksDir, TASK_DIRNAME_SIZE, "%s/void.%d/tmpXXXXXX", prefix, geteuid());    
00082         char *success = mkdtemp(tmpTasksDir);
00083         if (success == NULL) {
00084                 char *msg = malloc(TASK_DIRNAME_SIZE+1);
00085                 msg[TASK_DIRNAME_SIZE] = '\0';
00086                 snprintf(msg, TASK_DIRNAME_SIZE, "Error opening tasks dir %s", tmpTasksDir);
00087                 perror(msg);
00088                 free(msg);
00089                 
00090                 exit(1);
00091         }
00092 
00093         while (1) {
00094                 // get task from the taskBuffer 
00095                 FinishedTask *ft = (FinishedTask *)get(cache->taskBuffer);
00096                 if (ft == NULL) {
00097                         fprintf(stderr, "WARNING: WritherThread recieved NULL pointer from taskBuffer!!!\n");
00098                         continue;
00099                 }
00100 
00101                 /// \todo make cache capable of saving/recovering multiple works
00102 
00103                 // Verify exit          
00104                 if (ft->taskId == -1) {
00105                         printf("taskId == -1, exiting\n");
00106                         free(ft);
00107                         break;
00108                 }
00109                 
00110                 // add task im terminatedTasks
00111                 // cacheEndTask() already do that
00112                 //taskIdListAdd(cache->terminatedTasks, ft->taskId);
00113 
00114                 // creates the task file name, full temporary task path and full finished task path             
00115                 char taskFileName[TASK_FILENAME_SIZE+1]; // +1 to the \0
00116                 taskFileName[TASK_FILENAME_SIZE] = '\0';
00117                 snprintf(taskFileName, TASK_FILENAME_SIZE, "/%d", ft->taskId);
00118                 
00119                 char *tmpTaskName = malloc(TASK_DIRNAME_SIZE+1); // +1 to the \0
00120                 tmpTaskName[TASK_DIRNAME_SIZE] = '\0';
00121                 strncpy(tmpTaskName, tmpTasksDir, TASK_DIRNAME_SIZE);
00122                 strncat(tmpTaskName, taskFileName, TASK_FILENAME_SIZE - strlen(tmpTaskName));
00123 
00124                 char *finishedTaskName = malloc(TASK_DIRNAME_SIZE+1);
00125                 finishedTaskName[TASK_DIRNAME_SIZE] = '\0';
00126                 strncpy(finishedTaskName, cache->finishedTasksDir, TASK_DIRNAME_SIZE);
00127                 strncat(finishedTaskName, taskFileName, TASK_FILENAME_SIZE - strlen(finishedTaskName));
00128                 
00129                 // opens task`s temporary file
00130                 FILE *serializeFile = fopen(tmpTaskName, "w");
00131                 if (serializeFile == NULL) {
00132                         char *msg = malloc(TASK_DIRNAME_SIZE+1);
00133                         msg[TASK_DIRNAME_SIZE] = '\0';
00134                         snprintf(msg, TASK_DIRNAME_SIZE, "Error opening serialize file %s", tmpTaskName);
00135                         perror(msg);
00136                         free(msg);
00137                         
00138                         exit(1);
00139                 }
00140                 
00141                 // serialize task
00142                 // WARNING: concurrent task access
00143                 writeTask(serializeFile, ft->task);
00144                 
00145                 // close file
00146                 free(ft);
00147                 fclose(serializeFile);
00148                 
00149                 // moves file to the directory of finished tasks
00150                 rename(tmpTaskName, finishedTaskName);
00151                 
00152                 // free everything
00153                 free(tmpTaskName);
00154                 free(finishedTaskName);
00155         }
00156         
00157         status = rmdir(tmpTasksDir);
00158         if (status < 0) {
00159                 perror("WARNING: Error removing temporary tasks dir");  
00160         }
00161         
00162         free(tmpTasksDir);
00163         pthread_exit(NULL);
00164         return NULL;    
00165 }
00166 
00167 // Coutinho: I separated the initialization of the cache and the creation of the
00168 //      writer thread to make testing of the writer thread easier
00169 void initCacheStruct(int filterId, int instanceId) {
00170 
00171         cache = (Cache *) malloc(sizeof(Cache));
00172         cache->IAmTaskCreator  = 0;
00173         cache->useTasks        = 0;
00174         cache->forwardTaskMsgs = 1; // by default we forward task messages, we only disable this if manager say that we could
00175         cache->tasks           = hashDSIVoidCreate(INITIAL_CAPACITY);
00176         cache->terminatedTasks = taskIdListCreate(ID_LIST_INIT_SIZE);
00177         cache->runningTasks    = hashIntIntCreate(1);
00178         cache->currentWork     = -1;
00179         cache->currentTask     = -1;
00180         cache->taskBuffer      = create_prod_cons(PROD_CONS_SIZE);
00181         cache->recoverCallback = &defaultRecoveryCallback;
00182 
00183         // get where we will put the checkpoints        
00184         /// \todo remove garbage files from previous executions
00185         // OBS: several executions, filters, instances can be using in the same filesystem
00186         char *prefix = getenv("VOID_CHECKPOINT_DIR");
00187         if (prefix == NULL) prefix = strdup("/tmp");
00188 
00189         // User id. We use the efective user id to a setuid filter don't make a
00190         // common user's directory owned by root.
00191         int uid = geteuid();
00192         
00193         // Execution. As we assume that the manager dosn't dies, it's tid is
00194         // constant in the whole execution.     
00195         int parentId = pvm_parent();    
00196 
00197         cache->finishedTasksDir = malloc(TASK_DIRNAME_SIZE+1);
00198         cache->finishedTasksDir[TASK_DIRNAME_SIZE] = '\0';
00199         snprintf(cache->finishedTasksDir, TASK_DIRNAME_SIZE, "%s/void.%d/t%x/%d/%d", prefix, uid, parentId, filterId, instanceId);
00200 }
00201 
00202 
00203 void initCache(int filterId, int instanceId) {
00204         initCacheStruct(filterId, instanceId);
00205 
00206 #ifdef VOID_FT                  
00207         // create the writer thread
00208         pthread_create(&(cache->writeThreadDescriptor), NULL, &writerThread, NULL);
00209 #endif
00210 }
00211 
00212 void destroyCache(){
00213 
00214 #ifdef VOID_FT  
00215         // insert a task with taskId == -1 to break the loop of the writer thread
00216         FinishedTask *bufPos = malloc(sizeof(FinishedTask));
00217         bufPos->workId = -1;
00218         bufPos->taskId = -1;
00219         bufPos->task = NULL;
00220         put(cache->taskBuffer, bufPos); 
00221         
00222         // wait writer thread finish
00223         pthread_join(cache->writeThreadDescriptor, NULL);
00224 #endif
00225         
00226         hashDSIVoidDestroy(cache->tasks);
00227         taskIdListDestroy(cache->terminatedTasks);
00228         hashIntIntDestroy(cache->runningTasks);
00229         destroy_prod_cons(cache->taskBuffer);
00230         free(cache->finishedTasksDir);
00231         free(cache);
00232 }
00233 
00234 int cachePutData(char *key, void * data, int dataSz) {
00235         PosHandlerDSIVoid pos = NULL;
00236         DataSpace *dataSpace = NULL;
00237         Task *task = NULL;
00238         DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00239 
00240         if(cache->tasks == NULL) {
00241                 cache->tasks = hashDSIVoidCreate(INITIAL_CAPACITY);
00242         }
00243         
00244         dataSpaceId->work = cache->currentWork;
00245         dataSpaceId->task = cache->currentTask;
00246         pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);
00247         if (pos == NULL) {
00248                 return E_NO_SUCH_TASK;
00249         }
00250         free(dataSpaceId);
00251 
00252         task = posGetValue(pos);
00253         if(task == NULL) {
00254                 task = createTask();
00255                 posSetValue(pos, task);
00256         }
00257 
00258         dataSpace = getTaskDataSpace(task);
00259         if(dataSpace == NULL) {
00260                 dataSpace = createDataSpace();
00261         }
00262 
00263         return putData(dataSpace, key, data, dataSz);
00264 }
00265 
00266 void *cacheGetData(int taskId, char *key, int *dataSz) {
00267         PosHandlerDSIVoid pos = NULL;
00268         DataSpace *dataSpace = NULL;
00269         DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00270         Task *task = NULL;
00271         void *val = NULL;
00272 
00273         *dataSz = -1;   
00274         // Coutinho: on recovey could be no running tasks and
00275         // the aplication could need read data from finished tasks
00276         //if (cacheGetCurrentTask() < 0) {
00277         //      return NULL;
00278         //}
00279         
00280         dataSpaceId->work = cache->currentWork;
00281         dataSpaceId->task = taskId;
00282         pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);       
00283         if(pos == NULL) {
00284                 printf("cacheGetData Error: work=%d and taskId=%d dont have saved data\n", cache->currentWork, taskId);
00285                 return NULL;
00286         }
00287         free(dataSpaceId);
00288 
00289         task = posGetValue(pos);
00290         if (taskId != cache->currentTask) {
00291                 // To get data of other tasks, they must be finished
00292                 if (finished != getTaskState(task)) {
00293                         return NULL;    
00294                 }
00295         }
00296         
00297         dataSpace = getTaskDataSpace(task);
00298 
00299         if(dataSpace == NULL) {
00300                 printf("cacheGetData Error: data space contains no data\n");
00301                 return NULL;
00302         }
00303 
00304         val = getData(dataSpace, key, dataSz);  
00305         return val;
00306 }
00307 
00308 int cacheRemoveData(char *key) {
00309         PosHandlerDSIVoid pos = NULL;
00310         DataSpace *dataSpace = NULL;
00311         DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00312         Task *task = NULL;
00313         
00314         dataSpaceId->work = cache->currentWork;
00315         dataSpaceId->task = cache->currentTask;
00316         pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);
00317         
00318         if(pos == NULL) {
00319                 printf("cacheGetData Error: work=%d and taskId=%d dont have saved data\n", cache->currentWork, cache->currentTask);
00320                 return -1;
00321         }
00322         free(dataSpaceId);
00323 
00324         task = posGetValue(pos);
00325         dataSpace = getTaskDataSpace(task);
00326         if(dataSpace == NULL) {
00327                 printf("cacheGetData Error: data space contains no data\n");
00328                 return -1;
00329         }
00330 
00331         return removeData (dataSpace, key);
00332 }
00333 
00334 int *cacheGetTaskDeps(int taskId, int *depsSz) {
00335         DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00336 
00337         *depsSz = -1;   
00338         /// \todo Define access verifications for this function
00339         if (cacheGetCurrentTask() < 0) {
00340                 return NULL;
00341         }
00342         
00343         dataSpaceId->work = cache->currentWork;
00344         dataSpaceId->task = taskId;
00345         PosHandlerDSIVoid pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);     
00346         if(pos == NULL) {
00347                 printf("cacheGetData Error: work=%d and taskId=%d dont have saved data\n", cache->currentWork, taskId);
00348                 return NULL;
00349         }
00350         free(dataSpaceId);
00351 
00352         Task *task = posGetValue(pos);
00353         TaskIdList *deps = getTaskMyDeps(task);
00354         if (deps == NULL) {
00355                 *depsSz = 0;
00356                 return NULL;
00357         }
00358         
00359         int listSz = taskIdListGetSize(deps);
00360         *depsSz = listSz;
00361         if (listSz == 0) return NULL;
00362         
00363         int *list = malloc(sizeof(int)*listSz);
00364         int i=0;
00365         for (i=0; i<listSz; i++)
00366                 list[i] = taskIdListGet(deps, i);
00367         
00368         
00369         return list;
00370 }
00371 
00372 
00373 TaskIdList *getFinishedTasks() {
00374         TaskIdList *terminatedTasks = taskIdListCreate(0);
00375         DIR *finishedTasksDirDD = opendir(cache->finishedTasksDir);
00376 
00377         if (finishedTasksDirDD != NULL) {
00378                 struct dirent *finishedTasksDirEntry = readdir(finishedTasksDirDD);     
00379                 while (finishedTasksDirEntry != NULL) {
00380                         char point[] = ".";
00381                         char pointPoint[] = "..";
00382                         
00383                         char *fileName = strdup(finishedTasksDirEntry->d_name); 
00384                         if ((strcmp(fileName, point) != 0) && (strcmp(fileName, pointPoint) != 0)) {
00385                                 int taskId = atoi(fileName);
00386                                 taskIdListAdd(terminatedTasks, taskId); 
00387                                 printf("Read task: %s\n", fileName);
00388                         }
00389                                 
00390                         free(fileName); 
00391                         finishedTasksDirEntry = readdir(finishedTasksDirDD);    
00392                 }
00393                 
00394                 closedir(finishedTasksDirDD);
00395         }
00396         
00397         return terminatedTasks;
00398 }
00399 
00400 int cacheRecoverTasks(TaskIdList *tasks) {
00401         int taskListLen = taskIdListGetSize(tasks);
00402         int i=0;
00403         HashIntVoid *tasksToBeCreated = hashIntVoidCreate(10);
00404 
00405         /// \todo make cache capable of saving/recovering multiple works        
00406 
00407         // Recover tasks in ascendig order
00408         taskIdListSortAscendig(tasks);
00409 
00410         for (i=0; i<taskListLen; i++) {
00411                 int taskId = taskIdListGet(tasks, i);
00412 
00413                 // creates the task file name, and full finished task path              
00414                 char taskFileName[TASK_FILENAME_SIZE+1];
00415                 taskFileName[TASK_FILENAME_SIZE] = '\0';
00416                 snprintf(taskFileName, TASK_FILENAME_SIZE, "/%d", taskId);
00417                 
00418                 char *finishedTaskName = malloc(TASK_DIRNAME_SIZE+1); // +1 to the \0
00419                 finishedTaskName[TASK_DIRNAME_SIZE] = '\0';
00420                 strncpy(finishedTaskName, cache->finishedTasksDir, TASK_DIRNAME_SIZE);
00421                 strncat(finishedTaskName, taskFileName, TASK_FILENAME_SIZE - strlen(finishedTaskName));
00422                 
00423                 // opens task`s checkpoint file
00424                 FILE *serializeFile = fopen(finishedTaskName, "r");
00425                 if (serializeFile == NULL) {
00426                         char *msg = malloc(TASK_DIRNAME_SIZE+1);
00427                         msg[TASK_DIRNAME_SIZE] = '\0';
00428                         snprintf(msg, TASK_DIRNAME_SIZE, "Error opening serialize file %s", finishedTaskName);
00429                         perror(msg);
00430                         free(msg);
00431                         
00432                         return E_COULD_NOT_RECOVER_TASK;
00433                 }
00434                 Task *recoveredTask = readTask(serializeFile);
00435                 fclose(serializeFile);
00436                 free(finishedTaskName);
00437                 assert(recoveredTask->id == taskId);
00438 #ifdef DEBUG
00439                 printf("Recovered task %d\n", recoveredTask->id);
00440 #endif
00441 
00442                 // Se tarefa esta no hash de tarefas a serem criadas, tira ela de la
00443                 PosHandlerIntVoid pos = hashIntVoidGet(tasksToBeCreated, taskId);
00444                 if (pos != NULL) {
00445                         Task *motherReference = posGetValue(pos);
00446                         
00447                         // Mother points to motherReference, so we make it
00448                         // points to recovered task. taskAddChild() make
00449                         // recoveredTask points to mother too.
00450                         taskAddChild(motherReference->mother, recoveredTask->id, recoveredTask);
00451                         
00452                         destroyTask(motherReference);
00453                         hashIntVoidRemove(tasksToBeCreated, taskId);
00454                 }
00455 
00456                 // switch the task state to finished
00457                 setTaskState(recoveredTask, finished);
00458 
00459                 // Put the recovered task in the tasks hash
00460                 DataSpaceId recoveredTaskId;
00461                 recoveredTaskId.work = cache->currentWork;
00462                 recoveredTaskId.task = taskId;
00463                 PosHandlerDSIVoid recoveredTaskPos = hashDSIVoidAdd(cache->tasks, recoveredTaskId);
00464                 posSetValue(recoveredTaskPos, recoveredTask);
00465 
00466                 // insert task in terminatedTasks
00467                 taskIdListAdd(cache->terminatedTasks, taskId);
00468         
00469 
00470 
00471                 /// \todo test:
00472                 /// mother inserted, then child
00473                 /// child insered then mother
00474 
00475                 /// \todo verify if mother points to the real child
00476 
00477                 
00478                 // Add children in the "to be created" hash
00479                 HashIntVoid *children = taskTakeChildren(recoveredTask);
00480 
00481                 // p/ cada tarefa filha: 
00482                 // we use a destructor iterator, because we will discard this hash anyway
00483                 HashIntVoidIterator *it = createHashIntVoidIterator(children, 1);  
00484                 for (pos = hashIntVoidIteratorNext(it, children); pos != NULL; pos = hashIntVoidIteratorNext(it, children)) {
00485                         Task *recoveredChild = posGetValue(pos);
00486                         int recoveredChildId = posGetKey(pos);
00487                         assert(recoveredChildId == recoveredChild->id);
00488 
00489                         DataSpaceId childId;
00490                         childId.work = recoveredTaskId.work;
00491                         childId.task = posGetKey(pos);
00492                         PosHandlerDSIVoid realChildPos = hashDSIVoidGet(cache->tasks, childId);
00493                         
00494                         // if child already exist
00495                         if (realChildPos != NULL) {
00496                                 // Substitute the recovered child by it
00497                                 Task *realChild = posGetValue(realChildPos);                    
00498                                 taskAddChild(recoveredTask, recoveredChildId, realChild);
00499 
00500                                 destroyTask(recoveredChild);
00501                         } else {
00502                                 // put it in the "to be created" hash
00503                                 PosHandlerIntVoid childPos = hashIntVoidAdd(tasksToBeCreated, recoveredChildId);
00504                                 posSetValue(childPos, recoveredChild);
00505                         }
00506                 }
00507                 hashIntVoidIteratorDestroy(it, children);
00508                 hashIntVoidDestroy(children);
00509                         
00510         }
00511 
00512         // Put the ids of the tasks to be reexecuted and sort them in ascending order
00513         PosHandlerIntVoid pos = NULL;
00514         HashIntVoidIterator *it = createHashIntVoidIterator(tasksToBeCreated, 0);
00515         TaskIdList *toCreateList = taskIdListCreate(0);
00516         for (pos = hashIntVoidIteratorNext(it, tasksToBeCreated); pos!= NULL; pos = hashIntVoidIteratorNext(it, tasksToBeCreated)) {
00517                 Task *toBeCreated = posGetValue(pos); 
00518                 taskIdListAdd(toCreateList, toBeCreated->id);
00519         }
00520         hashIntVoidIteratorDestroy(it, tasksToBeCreated);
00521         taskIdListSortAscendig(toCreateList);
00522         
00523         //      P/ cada tarefa que ficou no hassh de tarefas a serem criadas, 
00524         // instancias chamam cacheCreateTask()
00525         int listSize = taskIdListGetSize(toCreateList);
00526         for (i=0; i<listSize; i++) {
00527                 int toCreateId = taskIdListGet(toCreateList, i);
00528                 pos = hashIntVoidGet(tasksToBeCreated, toCreateId);
00529                 Task *toBeCreated = posGetValue(pos); 
00530                 int newTaskId    = posGetKey(pos);
00531                 char *metadata   = getTaskMetadata(toBeCreated); // returns toBeCreated->metadata
00532                 int metasize     = getTaskMetasize(toBeCreated);
00533                 
00534                 int *depList     = NULL;
00535                 TaskIdList *deps = getTaskMyDeps(toBeCreated);
00536                 int depSize = taskIdListGetSize(deps);
00537                 if (depSize > 0) {
00538                         depList = malloc(sizeof(int)*depSize);                  
00539                 }
00540                 int j; 
00541                 for (j=0; j<depSize; j++) {
00542                         depList[j] = taskIdListGet(deps, j);
00543                 }
00544 
00545                 // change context to the mother task
00546                 Task *mother = toBeCreated->mother;
00547                 cache->currentTask = mother->id;
00548         
00549                 // If the task arrived here is because it's mother referenced it and 
00550                 // it isn't finished or could not be recovered
00551                 // cacheCreateTask() makes a internal copy of depList and metadata
00552                 //
00553                 // ABORTED ! Now using the more elegant callback solution !
00554                 // 
00555 /*              int ret = cacheCreateTask(newTaskId, depList, depSize, metadata, metasize);
00556                 if (ret != 0) {
00557                         fprintf(stderr, "Faltal error restarting task %d: %d\n", newTaskId, ret);
00558                         // remove corrupt data
00559                         char *cmd = malloc(TASK_DIRNAME_SIZE+1); // +1 to the \0
00560                         cmd[TASK_DIRNAME_SIZE] = '\0';
00561                         snprintf(cmd, TASK_DIRNAME_SIZE, "rm -rf %s \n", cache->finishedTasksDir);
00562                         system(cmd);
00563                         
00564                         // abort recovering and restart everything
00565                         abort();
00566                 } 
00567 */
00568                 
00569                 // Calling user defined callback:
00570                 if ( cache->recoverCallback == NULL ) {
00571                         fprintf( stderr, "Cache.c: Fatal: Undefined user recover callback function.\n" );
00572                 } else {
00573                         // If callback calls createTask() it will make mother point to the createdTask
00574                         (*(cache->recoverCallback))(newTaskId, depList, depSize, metadata, metasize);
00575                 }
00576                 
00577                 if (depList != NULL) {
00578                         free(depList);
00579                 }
00580                 destroyTask(toBeCreated);
00581         }
00582         taskIdListDestroy(toCreateList);
00583         hashIntVoidDestroy(tasksToBeCreated);
00584         
00585         // change context to any running task
00586         HashIntIntIterator *hit = createHashIntIntIterator(cache->runningTasks, 0);
00587         PosHandlerIntInt posi = hashIntIntIteratorNext(hit, cache->runningTasks);
00588         if (posi != NULL) {
00589                 cache->currentTask = posGetKey(posi);
00590         } else {
00591                 cache->currentTask = -1;        
00592         }
00593         hashIntIntIteratorDestroy(hit, cache->runningTasks);
00594                 
00595         return 0;
00596 }
00597 
00598 void setCreator() {
00599         cache->IAmTaskCreator = 1;
00600 }
00601 
00602 int cacheSetCurrentTask(int task) {
00603         DataSpaceId taskId;
00604         taskId.work = cache->currentWork;
00605         taskId.task = task;
00606         
00607         PosHandlerDSIVoid taskPos = hashDSIVoidGet(cache->tasks, taskId);       
00608         if (taskPos != NULL) {
00609                 Task *t = (Task *)posGetValue(taskPos);
00610                 TaskState_t tState = getTaskState(t);
00611                 if (tState == running) {
00612                         cache->currentTask = task;
00613                         return 0;
00614                 } else {
00615                         return E_TASK_NOT_RUNNING;
00616                 }
00617         }
00618         return E_NO_SUCH_TASK;
00619 }
00620 
00621 int cacheGetCurrentTask() {
00622         return cache->currentTask;
00623 }
00624 
00625 void cacheSetCurrentWork(int work) {
00626         cache->currentWork = work;              
00627 }
00628 
00629 int cacheGetCurrentWork() {
00630         return cache->currentWork;
00631 }
00632 
00633 int cacheRunTask(int taskId) {
00634 
00635         // change the task state to running
00636         DataSpaceId taskDSI;
00637         taskDSI.work = cache->currentWork;
00638         taskDSI.task = taskId;
00639         PosHandlerDSIVoid taskPos = hashDSIVoidGet(cache->tasks, taskDSI);
00640         Task *task = (Task *)posGetValue(taskPos);
00641         assert(task != NULL);
00642         if (task == NULL) return -1;
00643         setTaskState(task, running);
00644 
00645         // Poe tarefa na lista de tarefas sendo executadas
00646         PosHandlerIntInt posi = hashIntIntAdd(cache->runningTasks, taskId);
00647         posSetValue(posi, taskId);
00648 
00649         // if there isn't any task running, the created task becomes the currentTask
00650         if (cache->currentTask <0){
00651                 cache->currentTask = taskId;
00652         }               
00653                 
00654         return 0;       
00655 }
00656 
00657 int cacheCreateTask(int taskId, int *deps, int depSize, char *metadata, int metaSize) {
00658         int i=0;
00659         
00660         /*------------------- test if the task can be created -------------------*/
00661 
00662         // Verify if all dependences are finished
00663         // Non task creator filters: depSize==0
00664         for (i=0; i<depSize; i++) {
00665                         DataSpaceId dependenceId;
00666                         dependenceId.work = cache->currentWork;
00667                         dependenceId.task = deps[i];
00668                         PosHandlerDSIVoid dependencePos = hashDSIVoidGet(cache->tasks, dependenceId);
00669                         
00670                         if (dependencePos == NULL) {
00671                                 // new task depends on a inexistent task, return error
00672                                 return -1;
00673                         }       
00674                         /*if (dependenceTask->state != finished) {
00675                                 // new task depends on a non finished task, return error
00676                                 return -1;
00677                         }*/
00678         }
00679         
00680         // get the new task position and test if is something in it
00681         DataSpaceId newTaskId;
00682         newTaskId.work = cache->currentWork;
00683         newTaskId.task = taskId;
00684         PosHandlerDSIVoid newPos = hashDSIVoidAdd(cache->tasks, newTaskId);
00685         if (posGetValue(newPos) != NULL) return E_TASK_EXISTS;
00686         
00687         /*--------------------------- task creation -----------------------------*/
00688         
00689         // Create the task object
00690         Task *newTask = createTask();
00691         setTaskMetadata(newTask, metadata, metaSize);
00692         setTaskId(newTask, taskId);
00693 
00694         // Create task dependence list
00695         TaskIdList *depList = taskIdListCreate(depSize);
00696         for (i=0; i<depSize; i++) {
00697                 taskIdListAdd(depList, deps[i]);
00698         }
00699         setTaskMyDeps(newTask, depList);
00700         taskIdListDestroy(depList);
00701         
00702         // add the new task in the hash position
00703         posSetValue(newPos, newTask);
00704         
00705         DataSpaceId motherId;
00706         motherId.work = cache->currentWork;
00707         motherId.task = cache->currentTask;
00708         PosHandlerDSIVoid motherPos = hashDSIVoidGet(cache->tasks, motherId);
00709         
00710         if (motherPos != NULL) {
00711                 // if this task has a mother add it to this mother
00712                 Task *motherTask = (Task *) posGetValue(motherPos);
00713                 taskAddChild(motherTask, taskId, newTask);
00714         }
00715 
00716         // Add task in the dependences` dependsOnMe lists and count the finished dependences
00717         int finishedDependences = 0;
00718         for (i=0; i<depSize; i++) {
00719                         DataSpaceId dependenceId;
00720                         dependenceId.work = cache->currentWork;
00721                         dependenceId.task = deps[i];
00722                         PosHandlerDSIVoid dependencePos = hashDSIVoidGet(cache->tasks, dependenceId);
00723                         Task *dependenceTask = (Task *) posGetValue(dependencePos);
00724                         
00725                         // WARNING: Writing on a termiated task object
00726                         addTaskToDependsOnMe(dependenceTask, taskId);
00727                         
00728                         if (dependenceTask->state == finished) {
00729                                 finishedDependences++;
00730                         }
00731         }
00732         setTaskEndedTasks(newTask, finishedDependences);
00733 
00734         // If all dependencies are satisfied, run the new task
00735         if (finishedDependences >= depSize) {
00736                 assert(finishedDependences == depSize);
00737                 cacheRunTask(taskId);
00738         }
00739                 
00740         return 0;
00741 }
00742 
00743 int cacheEndTask(int taskId) {
00744         DataSpaceId taskDSI;
00745         taskDSI.work = cache->currentWork;
00746         taskDSI.task = taskId;
00747         PosHandlerDSIVoid taskPos = hashDSIVoidGet(cache->tasks, taskDSI);
00748         
00749         if (taskPos == NULL) {
00750                 return E_NO_SUCH_TASK;
00751         }
00752         
00753         Task *task = (Task *)posGetValue(taskPos);
00754         if (running != getTaskState(task)) {
00755                 return E_TASK_NOT_RUNNING;
00756         }
00757 
00758         // switch the task state to finished
00759         setTaskState(task, finished);
00760         
00761         // remove task from running tasks
00762         hashIntIntRemove(cache->runningTasks, taskId);
00763         
00764         // insert task in terminatedTasks
00765         taskIdListAdd(cache->terminatedTasks, taskId);
00766         
00767         
00768         // if the current task is terminated, get another current task
00769         if (taskId == cache->currentTask) {
00770                 HashIntIntIterator *hit = createHashIntIntIterator(cache->runningTasks, 0);
00771                 PosHandlerIntInt posi = hashIntIntIteratorNext(hit, cache->runningTasks);
00772                 if (posi != NULL) {
00773                         cache->currentTask = posGetKey(posi);
00774                 } else {
00775                         cache->currentTask = -1;        
00776                 }
00777                 hashIntIntIteratorDestroy(hit, cache->runningTasks);
00778         }
00779         
00780 
00781         // incrementa endedTasks das tarefas em dependsOnMe
00782         int i=0;        
00783         int dependentTasks = taskIdListGetSize(task->dependsOnMe);
00784         for (i=0; i<dependentTasks; i++) {
00785                 DataSpaceId dependentId;
00786                 dependentId.work = cache->currentWork;
00787                 dependentId.task = taskIdListGet(task->dependsOnMe, i);
00788                 PosHandlerDSIVoid dependentPos = hashDSIVoidGet(cache->tasks, dependentId);
00789                 Task *dependentTask = (Task *) posGetValue(dependentPos);
00790                 
00791                 dependentTask->endedTasks++;
00792                 // caso task->endedTasks == task->depSize, dispare tarefa
00793                 if (dependentTask->endedTasks == taskIdListGetSize(dependentTask->myDeps)) {
00794                         cacheRunTask(dependentId.task);
00795                 }
00796         }
00797 
00798         
00799 #ifdef VOID_FT
00800         // insert task in the taskBuffer
00801         FinishedTask *bufPos = malloc(sizeof(FinishedTask));
00802         bufPos->workId = cache->currentWork;
00803         bufPos->taskId = taskId;
00804         bufPos->task = posGetValue(taskPos);
00805         put(cache->taskBuffer, (void *)bufPos);
00806 #endif
00807                 
00808         return 0;
00809 }
00810 
00811 int cacheGetRunningTasks( int * numTasks, int ** taskList ){
00812         *numTasks = hashGetChaves(cache->runningTasks);
00813         *taskList = malloc( sizeof(int) * (*numTasks));
00814         HashIntIntIterator *itii = createHashIntIntIterator(cache->runningTasks, 0);
00815         PosHandlerIntInt poshdr = NULL;
00816         int counter = 0;
00817 
00818         while( (( poshdr = hashIntIntIteratorNext( itii, cache->runningTasks) ) != NULL ) && counter < ( *numTasks )) {
00819                 assert(poshdr != NULL); 
00820                 (*taskList)[counter++] = posGetKey( poshdr );
00821         }
00822         hashIntIntIteratorDestroy( itii, cache->runningTasks );
00823         
00824         assert(counter == *numTasks);
00825         return 0;
00826 }
00827 
00828 int defaultRecoveryCallback(int newTaskId,int *newTaskDep,int depSize,char *metadata,int metaSize){
00829         dsCreateTask(newTaskId, newTaskDep, depSize, metadata, metaSize);
00830         return 0;
00831 }
00832 
00833 int cacheRegisterRecoverCallback( RecoverCallback_t * callback ) {
00834         cache->recoverCallback = callback;
00835         return(0);
00836 }
00837 
00838 void cacheSetUseTasks() {
00839         cache->useTasks = 1;
00840 }
00841 
00842 int cacheGetUseTasks() {
00843         return cache->useTasks;
00844 }
00845 
00846 void cacheSetForwardTaskMsgs(int forward) {
00847         cache->forwardTaskMsgs = forward;
00848 }
00849 
00850 int cacheGetForwardTaskMsgs() {
00851         return cache->forwardTaskMsgs;
00852 }
00853 
00854 int *cacheGetFinishedTasks(int *numTasks) {
00855         return taskIdListToArray(cache->terminatedTasks, numTasks);
00856 }
00857 
00858 /// This function is for internel testing only
00859 Cache *___getCache() {
00860         return cache;
00861 }
00862 

Generated on Tue Jan 17 19:18:37 2006 for Void by  doxygen 1.4.6