00001 #define _CACHE_C_
00002
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include <unistd.h>
00006 #include <errno.h>
00007 #include <sys/stat.h>
00008 #include <sys/types.h>
00009 #include <dirent.h>
00010 #include <pvm3.h>
00011 #include <assert.h>
00012 #include "Cache.h"
00013 #include "constants.h"
00014 #include "DataSpace.h"
00015 #include "Task.h"
00016 #include "prod_cons.h"
00017
00018 #define KEY_INT
00019 #define VAL_VOID
00020 #include "hash.h"
00021 #undef VAL_VOID
00022 #undef KEY_INT
00023
00024
00025 #define TASK_FILENAME_SIZE 100
00026 #define TASK_DIRNAME_SIZE ((1000+TASK_FILENAME_SIZE)*sizeof(char))
00027
00028
00029 Cache *cache = NULL;
00030
00031
00032
00033 int createDir(char *dirName) {
00034 int nameSize = strlen(dirName);
00035 char *token = dirName;
00036 char *aux = malloc(nameSize+1);
00037 aux[0] = '\0';
00038
00039 while (token < dirName+nameSize) {
00040 int tokenSize = strcspn(token, "/");
00041 if (tokenSize > 0) {
00042 strncat(aux, "/", 1);
00043 strncat(aux, token, tokenSize);
00044
00045 int status = mkdir(aux, 0777);
00046
00047
00048 if ((status==-1) && (errno!=EEXIST)) {
00049 char *msg = malloc(TASK_DIRNAME_SIZE+1);
00050 msg[TASK_DIRNAME_SIZE] = '\0';
00051 snprintf(msg, TASK_DIRNAME_SIZE, "Error creating tasks dir %s", aux);
00052 perror(msg);
00053 free(msg);
00054
00055 return -1;
00056 }
00057 }
00058
00059 token += tokenSize+1;
00060 }
00061 free(aux);
00062
00063 return 0;
00064 }
00065
00066 void *writerThread(void *arg) {
00067
00068
00069 int status = createDir(cache->finishedTasksDir);
00070 if (status == -1) exit(1);
00071
00072
00073 char *prefix = getenv("VOID_CHECKPOINT_DIR");
00074 if (prefix == NULL) prefix = strdup("/tmp");
00075
00076
00077
00078
00079 char *tmpTasksDir = (char *)malloc((TASK_DIRNAME_SIZE+1)*sizeof(char));
00080 tmpTasksDir[TASK_DIRNAME_SIZE] = '\0';
00081 snprintf(tmpTasksDir, TASK_DIRNAME_SIZE, "%s/void.%d/tmpXXXXXX", prefix, geteuid());
00082 char *success = mkdtemp(tmpTasksDir);
00083 if (success == NULL) {
00084 char *msg = malloc(TASK_DIRNAME_SIZE+1);
00085 msg[TASK_DIRNAME_SIZE] = '\0';
00086 snprintf(msg, TASK_DIRNAME_SIZE, "Error opening tasks dir %s", tmpTasksDir);
00087 perror(msg);
00088 free(msg);
00089
00090 exit(1);
00091 }
00092
00093 while (1) {
00094
00095 FinishedTask *ft = (FinishedTask *)get(cache->taskBuffer);
00096 if (ft == NULL) {
00097 fprintf(stderr, "WARNING: WritherThread recieved NULL pointer from taskBuffer!!!\n");
00098 continue;
00099 }
00100
00101
00102
00103
00104 if (ft->taskId == -1) {
00105 printf("taskId == -1, exiting\n");
00106 free(ft);
00107 break;
00108 }
00109
00110
00111
00112
00113
00114
00115 char taskFileName[TASK_FILENAME_SIZE+1];
00116 taskFileName[TASK_FILENAME_SIZE] = '\0';
00117 snprintf(taskFileName, TASK_FILENAME_SIZE, "/%d", ft->taskId);
00118
00119 char *tmpTaskName = malloc(TASK_DIRNAME_SIZE+1);
00120 tmpTaskName[TASK_DIRNAME_SIZE] = '\0';
00121 strncpy(tmpTaskName, tmpTasksDir, TASK_DIRNAME_SIZE);
00122 strncat(tmpTaskName, taskFileName, TASK_FILENAME_SIZE - strlen(tmpTaskName));
00123
00124 char *finishedTaskName = malloc(TASK_DIRNAME_SIZE+1);
00125 finishedTaskName[TASK_DIRNAME_SIZE] = '\0';
00126 strncpy(finishedTaskName, cache->finishedTasksDir, TASK_DIRNAME_SIZE);
00127 strncat(finishedTaskName, taskFileName, TASK_FILENAME_SIZE - strlen(finishedTaskName));
00128
00129
00130 FILE *serializeFile = fopen(tmpTaskName, "w");
00131 if (serializeFile == NULL) {
00132 char *msg = malloc(TASK_DIRNAME_SIZE+1);
00133 msg[TASK_DIRNAME_SIZE] = '\0';
00134 snprintf(msg, TASK_DIRNAME_SIZE, "Error opening serialize file %s", tmpTaskName);
00135 perror(msg);
00136 free(msg);
00137
00138 exit(1);
00139 }
00140
00141
00142
00143 writeTask(serializeFile, ft->task);
00144
00145
00146 free(ft);
00147 fclose(serializeFile);
00148
00149
00150 rename(tmpTaskName, finishedTaskName);
00151
00152
00153 free(tmpTaskName);
00154 free(finishedTaskName);
00155 }
00156
00157 status = rmdir(tmpTasksDir);
00158 if (status < 0) {
00159 perror("WARNING: Error removing temporary tasks dir");
00160 }
00161
00162 free(tmpTasksDir);
00163 pthread_exit(NULL);
00164 return NULL;
00165 }
00166
00167
00168
00169 void initCacheStruct(int filterId, int instanceId) {
00170
00171 cache = (Cache *) malloc(sizeof(Cache));
00172 cache->IAmTaskCreator = 0;
00173 cache->useTasks = 0;
00174 cache->forwardTaskMsgs = 1;
00175 cache->tasks = hashDSIVoidCreate(INITIAL_CAPACITY);
00176 cache->terminatedTasks = taskIdListCreate(ID_LIST_INIT_SIZE);
00177 cache->runningTasks = hashIntIntCreate(1);
00178 cache->currentWork = -1;
00179 cache->currentTask = -1;
00180 cache->taskBuffer = create_prod_cons(PROD_CONS_SIZE);
00181 cache->recoverCallback = &defaultRecoveryCallback;
00182
00183
00184
00185
00186 char *prefix = getenv("VOID_CHECKPOINT_DIR");
00187 if (prefix == NULL) prefix = strdup("/tmp");
00188
00189
00190
00191 int uid = geteuid();
00192
00193
00194
00195 int parentId = pvm_parent();
00196
00197 cache->finishedTasksDir = malloc(TASK_DIRNAME_SIZE+1);
00198 cache->finishedTasksDir[TASK_DIRNAME_SIZE] = '\0';
00199 snprintf(cache->finishedTasksDir, TASK_DIRNAME_SIZE, "%s/void.%d/t%x/%d/%d", prefix, uid, parentId, filterId, instanceId);
00200 }
00201
00202
00203 void initCache(int filterId, int instanceId) {
00204 initCacheStruct(filterId, instanceId);
00205
00206 #ifdef VOID_FT
00207
00208 pthread_create(&(cache->writeThreadDescriptor), NULL, &writerThread, NULL);
00209 #endif
00210 }
00211
00212 void destroyCache(){
00213
00214 #ifdef VOID_FT
00215
00216 FinishedTask *bufPos = malloc(sizeof(FinishedTask));
00217 bufPos->workId = -1;
00218 bufPos->taskId = -1;
00219 bufPos->task = NULL;
00220 put(cache->taskBuffer, bufPos);
00221
00222
00223 pthread_join(cache->writeThreadDescriptor, NULL);
00224 #endif
00225
00226 hashDSIVoidDestroy(cache->tasks);
00227 taskIdListDestroy(cache->terminatedTasks);
00228 hashIntIntDestroy(cache->runningTasks);
00229 destroy_prod_cons(cache->taskBuffer);
00230 free(cache->finishedTasksDir);
00231 free(cache);
00232 }
00233
00234 int cachePutData(char *key, void * data, int dataSz) {
00235 PosHandlerDSIVoid pos = NULL;
00236 DataSpace *dataSpace = NULL;
00237 Task *task = NULL;
00238 DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00239
00240 if(cache->tasks == NULL) {
00241 cache->tasks = hashDSIVoidCreate(INITIAL_CAPACITY);
00242 }
00243
00244 dataSpaceId->work = cache->currentWork;
00245 dataSpaceId->task = cache->currentTask;
00246 pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);
00247 if (pos == NULL) {
00248 return E_NO_SUCH_TASK;
00249 }
00250 free(dataSpaceId);
00251
00252 task = posGetValue(pos);
00253 if(task == NULL) {
00254 task = createTask();
00255 posSetValue(pos, task);
00256 }
00257
00258 dataSpace = getTaskDataSpace(task);
00259 if(dataSpace == NULL) {
00260 dataSpace = createDataSpace();
00261 }
00262
00263 return putData(dataSpace, key, data, dataSz);
00264 }
00265
00266 void *cacheGetData(int taskId, char *key, int *dataSz) {
00267 PosHandlerDSIVoid pos = NULL;
00268 DataSpace *dataSpace = NULL;
00269 DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00270 Task *task = NULL;
00271 void *val = NULL;
00272
00273 *dataSz = -1;
00274
00275
00276
00277
00278
00279
00280 dataSpaceId->work = cache->currentWork;
00281 dataSpaceId->task = taskId;
00282 pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);
00283 if(pos == NULL) {
00284 printf("cacheGetData Error: work=%d and taskId=%d dont have saved data\n", cache->currentWork, taskId);
00285 return NULL;
00286 }
00287 free(dataSpaceId);
00288
00289 task = posGetValue(pos);
00290 if (taskId != cache->currentTask) {
00291
00292 if (finished != getTaskState(task)) {
00293 return NULL;
00294 }
00295 }
00296
00297 dataSpace = getTaskDataSpace(task);
00298
00299 if(dataSpace == NULL) {
00300 printf("cacheGetData Error: data space contains no data\n");
00301 return NULL;
00302 }
00303
00304 val = getData(dataSpace, key, dataSz);
00305 return val;
00306 }
00307
00308 int cacheRemoveData(char *key) {
00309 PosHandlerDSIVoid pos = NULL;
00310 DataSpace *dataSpace = NULL;
00311 DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00312 Task *task = NULL;
00313
00314 dataSpaceId->work = cache->currentWork;
00315 dataSpaceId->task = cache->currentTask;
00316 pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);
00317
00318 if(pos == NULL) {
00319 printf("cacheGetData Error: work=%d and taskId=%d dont have saved data\n", cache->currentWork, cache->currentTask);
00320 return -1;
00321 }
00322 free(dataSpaceId);
00323
00324 task = posGetValue(pos);
00325 dataSpace = getTaskDataSpace(task);
00326 if(dataSpace == NULL) {
00327 printf("cacheGetData Error: data space contains no data\n");
00328 return -1;
00329 }
00330
00331 return removeData (dataSpace, key);
00332 }
00333
00334 int *cacheGetTaskDeps(int taskId, int *depsSz) {
00335 DataSpaceId *dataSpaceId = malloc(sizeof(DataSpaceId));
00336
00337 *depsSz = -1;
00338
00339 if (cacheGetCurrentTask() < 0) {
00340 return NULL;
00341 }
00342
00343 dataSpaceId->work = cache->currentWork;
00344 dataSpaceId->task = taskId;
00345 PosHandlerDSIVoid pos = hashDSIVoidGet(cache->tasks, *dataSpaceId);
00346 if(pos == NULL) {
00347 printf("cacheGetData Error: work=%d and taskId=%d dont have saved data\n", cache->currentWork, taskId);
00348 return NULL;
00349 }
00350 free(dataSpaceId);
00351
00352 Task *task = posGetValue(pos);
00353 TaskIdList *deps = getTaskMyDeps(task);
00354 if (deps == NULL) {
00355 *depsSz = 0;
00356 return NULL;
00357 }
00358
00359 int listSz = taskIdListGetSize(deps);
00360 *depsSz = listSz;
00361 if (listSz == 0) return NULL;
00362
00363 int *list = malloc(sizeof(int)*listSz);
00364 int i=0;
00365 for (i=0; i<listSz; i++)
00366 list[i] = taskIdListGet(deps, i);
00367
00368
00369 return list;
00370 }
00371
00372
00373 TaskIdList *getFinishedTasks() {
00374 TaskIdList *terminatedTasks = taskIdListCreate(0);
00375 DIR *finishedTasksDirDD = opendir(cache->finishedTasksDir);
00376
00377 if (finishedTasksDirDD != NULL) {
00378 struct dirent *finishedTasksDirEntry = readdir(finishedTasksDirDD);
00379 while (finishedTasksDirEntry != NULL) {
00380 char point[] = ".";
00381 char pointPoint[] = "..";
00382
00383 char *fileName = strdup(finishedTasksDirEntry->d_name);
00384 if ((strcmp(fileName, point) != 0) && (strcmp(fileName, pointPoint) != 0)) {
00385 int taskId = atoi(fileName);
00386 taskIdListAdd(terminatedTasks, taskId);
00387 printf("Read task: %s\n", fileName);
00388 }
00389
00390 free(fileName);
00391 finishedTasksDirEntry = readdir(finishedTasksDirDD);
00392 }
00393
00394 closedir(finishedTasksDirDD);
00395 }
00396
00397 return terminatedTasks;
00398 }
00399
00400 int cacheRecoverTasks(TaskIdList *tasks) {
00401 int taskListLen = taskIdListGetSize(tasks);
00402 int i=0;
00403 HashIntVoid *tasksToBeCreated = hashIntVoidCreate(10);
00404
00405
00406
00407
00408 taskIdListSortAscendig(tasks);
00409
00410 for (i=0; i<taskListLen; i++) {
00411 int taskId = taskIdListGet(tasks, i);
00412
00413
00414 char taskFileName[TASK_FILENAME_SIZE+1];
00415 taskFileName[TASK_FILENAME_SIZE] = '\0';
00416 snprintf(taskFileName, TASK_FILENAME_SIZE, "/%d", taskId);
00417
00418 char *finishedTaskName = malloc(TASK_DIRNAME_SIZE+1);
00419 finishedTaskName[TASK_DIRNAME_SIZE] = '\0';
00420 strncpy(finishedTaskName, cache->finishedTasksDir, TASK_DIRNAME_SIZE);
00421 strncat(finishedTaskName, taskFileName, TASK_FILENAME_SIZE - strlen(finishedTaskName));
00422
00423
00424 FILE *serializeFile = fopen(finishedTaskName, "r");
00425 if (serializeFile == NULL) {
00426 char *msg = malloc(TASK_DIRNAME_SIZE+1);
00427 msg[TASK_DIRNAME_SIZE] = '\0';
00428 snprintf(msg, TASK_DIRNAME_SIZE, "Error opening serialize file %s", finishedTaskName);
00429 perror(msg);
00430 free(msg);
00431
00432 return E_COULD_NOT_RECOVER_TASK;
00433 }
00434 Task *recoveredTask = readTask(serializeFile);
00435 fclose(serializeFile);
00436 free(finishedTaskName);
00437 assert(recoveredTask->id == taskId);
00438 #ifdef DEBUG
00439 printf("Recovered task %d\n", recoveredTask->id);
00440 #endif
00441
00442
00443 PosHandlerIntVoid pos = hashIntVoidGet(tasksToBeCreated, taskId);
00444 if (pos != NULL) {
00445 Task *motherReference = posGetValue(pos);
00446
00447
00448
00449
00450 taskAddChild(motherReference->mother, recoveredTask->id, recoveredTask);
00451
00452 destroyTask(motherReference);
00453 hashIntVoidRemove(tasksToBeCreated, taskId);
00454 }
00455
00456
00457 setTaskState(recoveredTask, finished);
00458
00459
00460 DataSpaceId recoveredTaskId;
00461 recoveredTaskId.work = cache->currentWork;
00462 recoveredTaskId.task = taskId;
00463 PosHandlerDSIVoid recoveredTaskPos = hashDSIVoidAdd(cache->tasks, recoveredTaskId);
00464 posSetValue(recoveredTaskPos, recoveredTask);
00465
00466
00467 taskIdListAdd(cache->terminatedTasks, taskId);
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479 HashIntVoid *children = taskTakeChildren(recoveredTask);
00480
00481
00482
00483 HashIntVoidIterator *it = createHashIntVoidIterator(children, 1);
00484 for (pos = hashIntVoidIteratorNext(it, children); pos != NULL; pos = hashIntVoidIteratorNext(it, children)) {
00485 Task *recoveredChild = posGetValue(pos);
00486 int recoveredChildId = posGetKey(pos);
00487 assert(recoveredChildId == recoveredChild->id);
00488
00489 DataSpaceId childId;
00490 childId.work = recoveredTaskId.work;
00491 childId.task = posGetKey(pos);
00492 PosHandlerDSIVoid realChildPos = hashDSIVoidGet(cache->tasks, childId);
00493
00494
00495 if (realChildPos != NULL) {
00496
00497 Task *realChild = posGetValue(realChildPos);
00498 taskAddChild(recoveredTask, recoveredChildId, realChild);
00499
00500 destroyTask(recoveredChild);
00501 } else {
00502
00503 PosHandlerIntVoid childPos = hashIntVoidAdd(tasksToBeCreated, recoveredChildId);
00504 posSetValue(childPos, recoveredChild);
00505 }
00506 }
00507 hashIntVoidIteratorDestroy(it, children);
00508 hashIntVoidDestroy(children);
00509
00510 }
00511
00512
00513 PosHandlerIntVoid pos = NULL;
00514 HashIntVoidIterator *it = createHashIntVoidIterator(tasksToBeCreated, 0);
00515 TaskIdList *toCreateList = taskIdListCreate(0);
00516 for (pos = hashIntVoidIteratorNext(it, tasksToBeCreated); pos!= NULL; pos = hashIntVoidIteratorNext(it, tasksToBeCreated)) {
00517 Task *toBeCreated = posGetValue(pos);
00518 taskIdListAdd(toCreateList, toBeCreated->id);
00519 }
00520 hashIntVoidIteratorDestroy(it, tasksToBeCreated);
00521 taskIdListSortAscendig(toCreateList);
00522
00523
00524
00525 int listSize = taskIdListGetSize(toCreateList);
00526 for (i=0; i<listSize; i++) {
00527 int toCreateId = taskIdListGet(toCreateList, i);
00528 pos = hashIntVoidGet(tasksToBeCreated, toCreateId);
00529 Task *toBeCreated = posGetValue(pos);
00530 int newTaskId = posGetKey(pos);
00531 char *metadata = getTaskMetadata(toBeCreated);
00532 int metasize = getTaskMetasize(toBeCreated);
00533
00534 int *depList = NULL;
00535 TaskIdList *deps = getTaskMyDeps(toBeCreated);
00536 int depSize = taskIdListGetSize(deps);
00537 if (depSize > 0) {
00538 depList = malloc(sizeof(int)*depSize);
00539 }
00540 int j;
00541 for (j=0; j<depSize; j++) {
00542 depList[j] = taskIdListGet(deps, j);
00543 }
00544
00545
00546 Task *mother = toBeCreated->mother;
00547 cache->currentTask = mother->id;
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570 if ( cache->recoverCallback == NULL ) {
00571 fprintf( stderr, "Cache.c: Fatal: Undefined user recover callback function.\n" );
00572 } else {
00573
00574 (*(cache->recoverCallback))(newTaskId, depList, depSize, metadata, metasize);
00575 }
00576
00577 if (depList != NULL) {
00578 free(depList);
00579 }
00580 destroyTask(toBeCreated);
00581 }
00582 taskIdListDestroy(toCreateList);
00583 hashIntVoidDestroy(tasksToBeCreated);
00584
00585
00586 HashIntIntIterator *hit = createHashIntIntIterator(cache->runningTasks, 0);
00587 PosHandlerIntInt posi = hashIntIntIteratorNext(hit, cache->runningTasks);
00588 if (posi != NULL) {
00589 cache->currentTask = posGetKey(posi);
00590 } else {
00591 cache->currentTask = -1;
00592 }
00593 hashIntIntIteratorDestroy(hit, cache->runningTasks);
00594
00595 return 0;
00596 }
00597
00598 void setCreator() {
00599 cache->IAmTaskCreator = 1;
00600 }
00601
00602 int cacheSetCurrentTask(int task) {
00603 DataSpaceId taskId;
00604 taskId.work = cache->currentWork;
00605 taskId.task = task;
00606
00607 PosHandlerDSIVoid taskPos = hashDSIVoidGet(cache->tasks, taskId);
00608 if (taskPos != NULL) {
00609 Task *t = (Task *)posGetValue(taskPos);
00610 TaskState_t tState = getTaskState(t);
00611 if (tState == running) {
00612 cache->currentTask = task;
00613 return 0;
00614 } else {
00615 return E_TASK_NOT_RUNNING;
00616 }
00617 }
00618 return E_NO_SUCH_TASK;
00619 }
00620
00621 int cacheGetCurrentTask() {
00622 return cache->currentTask;
00623 }
00624
00625 void cacheSetCurrentWork(int work) {
00626 cache->currentWork = work;
00627 }
00628
00629 int cacheGetCurrentWork() {
00630 return cache->currentWork;
00631 }
00632
00633 int cacheRunTask(int taskId) {
00634
00635
00636 DataSpaceId taskDSI;
00637 taskDSI.work = cache->currentWork;
00638 taskDSI.task = taskId;
00639 PosHandlerDSIVoid taskPos = hashDSIVoidGet(cache->tasks, taskDSI);
00640 Task *task = (Task *)posGetValue(taskPos);
00641 assert(task != NULL);
00642 if (task == NULL) return -1;
00643 setTaskState(task, running);
00644
00645
00646 PosHandlerIntInt posi = hashIntIntAdd(cache->runningTasks, taskId);
00647 posSetValue(posi, taskId);
00648
00649
00650 if (cache->currentTask <0){
00651 cache->currentTask = taskId;
00652 }
00653
00654 return 0;
00655 }
00656
00657 int cacheCreateTask(int taskId, int *deps, int depSize, char *metadata, int metaSize) {
00658 int i=0;
00659
00660
00661
00662
00663
00664 for (i=0; i<depSize; i++) {
00665 DataSpaceId dependenceId;
00666 dependenceId.work = cache->currentWork;
00667 dependenceId.task = deps[i];
00668 PosHandlerDSIVoid dependencePos = hashDSIVoidGet(cache->tasks, dependenceId);
00669
00670 if (dependencePos == NULL) {
00671
00672 return -1;
00673 }
00674
00675
00676
00677
00678 }
00679
00680
00681 DataSpaceId newTaskId;
00682 newTaskId.work = cache->currentWork;
00683 newTaskId.task = taskId;
00684 PosHandlerDSIVoid newPos = hashDSIVoidAdd(cache->tasks, newTaskId);
00685 if (posGetValue(newPos) != NULL) return E_TASK_EXISTS;
00686
00687
00688
00689
00690 Task *newTask = createTask();
00691 setTaskMetadata(newTask, metadata, metaSize);
00692 setTaskId(newTask, taskId);
00693
00694
00695 TaskIdList *depList = taskIdListCreate(depSize);
00696 for (i=0; i<depSize; i++) {
00697 taskIdListAdd(depList, deps[i]);
00698 }
00699 setTaskMyDeps(newTask, depList);
00700 taskIdListDestroy(depList);
00701
00702
00703 posSetValue(newPos, newTask);
00704
00705 DataSpaceId motherId;
00706 motherId.work = cache->currentWork;
00707 motherId.task = cache->currentTask;
00708 PosHandlerDSIVoid motherPos = hashDSIVoidGet(cache->tasks, motherId);
00709
00710 if (motherPos != NULL) {
00711
00712 Task *motherTask = (Task *) posGetValue(motherPos);
00713 taskAddChild(motherTask, taskId, newTask);
00714 }
00715
00716
00717 int finishedDependences = 0;
00718 for (i=0; i<depSize; i++) {
00719 DataSpaceId dependenceId;
00720 dependenceId.work = cache->currentWork;
00721 dependenceId.task = deps[i];
00722 PosHandlerDSIVoid dependencePos = hashDSIVoidGet(cache->tasks, dependenceId);
00723 Task *dependenceTask = (Task *) posGetValue(dependencePos);
00724
00725
00726 addTaskToDependsOnMe(dependenceTask, taskId);
00727
00728 if (dependenceTask->state == finished) {
00729 finishedDependences++;
00730 }
00731 }
00732 setTaskEndedTasks(newTask, finishedDependences);
00733
00734
00735 if (finishedDependences >= depSize) {
00736 assert(finishedDependences == depSize);
00737 cacheRunTask(taskId);
00738 }
00739
00740 return 0;
00741 }
00742
00743 int cacheEndTask(int taskId) {
00744 DataSpaceId taskDSI;
00745 taskDSI.work = cache->currentWork;
00746 taskDSI.task = taskId;
00747 PosHandlerDSIVoid taskPos = hashDSIVoidGet(cache->tasks, taskDSI);
00748
00749 if (taskPos == NULL) {
00750 return E_NO_SUCH_TASK;
00751 }
00752
00753 Task *task = (Task *)posGetValue(taskPos);
00754 if (running != getTaskState(task)) {
00755 return E_TASK_NOT_RUNNING;
00756 }
00757
00758
00759 setTaskState(task, finished);
00760
00761
00762 hashIntIntRemove(cache->runningTasks, taskId);
00763
00764
00765 taskIdListAdd(cache->terminatedTasks, taskId);
00766
00767
00768
00769 if (taskId == cache->currentTask) {
00770 HashIntIntIterator *hit = createHashIntIntIterator(cache->runningTasks, 0);
00771 PosHandlerIntInt posi = hashIntIntIteratorNext(hit, cache->runningTasks);
00772 if (posi != NULL) {
00773 cache->currentTask = posGetKey(posi);
00774 } else {
00775 cache->currentTask = -1;
00776 }
00777 hashIntIntIteratorDestroy(hit, cache->runningTasks);
00778 }
00779
00780
00781
00782 int i=0;
00783 int dependentTasks = taskIdListGetSize(task->dependsOnMe);
00784 for (i=0; i<dependentTasks; i++) {
00785 DataSpaceId dependentId;
00786 dependentId.work = cache->currentWork;
00787 dependentId.task = taskIdListGet(task->dependsOnMe, i);
00788 PosHandlerDSIVoid dependentPos = hashDSIVoidGet(cache->tasks, dependentId);
00789 Task *dependentTask = (Task *) posGetValue(dependentPos);
00790
00791 dependentTask->endedTasks++;
00792
00793 if (dependentTask->endedTasks == taskIdListGetSize(dependentTask->myDeps)) {
00794 cacheRunTask(dependentId.task);
00795 }
00796 }
00797
00798
00799 #ifdef VOID_FT
00800
00801 FinishedTask *bufPos = malloc(sizeof(FinishedTask));
00802 bufPos->workId = cache->currentWork;
00803 bufPos->taskId = taskId;
00804 bufPos->task = posGetValue(taskPos);
00805 put(cache->taskBuffer, (void *)bufPos);
00806 #endif
00807
00808 return 0;
00809 }
00810
00811 int cacheGetRunningTasks( int * numTasks, int ** taskList ){
00812 *numTasks = hashGetChaves(cache->runningTasks);
00813 *taskList = malloc( sizeof(int) * (*numTasks));
00814 HashIntIntIterator *itii = createHashIntIntIterator(cache->runningTasks, 0);
00815 PosHandlerIntInt poshdr = NULL;
00816 int counter = 0;
00817
00818 while( (( poshdr = hashIntIntIteratorNext( itii, cache->runningTasks) ) != NULL ) && counter < ( *numTasks )) {
00819 assert(poshdr != NULL);
00820 (*taskList)[counter++] = posGetKey( poshdr );
00821 }
00822 hashIntIntIteratorDestroy( itii, cache->runningTasks );
00823
00824 assert(counter == *numTasks);
00825 return 0;
00826 }
00827
00828 int defaultRecoveryCallback(int newTaskId,int *newTaskDep,int depSize,char *metadata,int metaSize){
00829 dsCreateTask(newTaskId, newTaskDep, depSize, metadata, metaSize);
00830 return 0;
00831 }
00832
00833 int cacheRegisterRecoverCallback( RecoverCallback_t * callback ) {
00834 cache->recoverCallback = callback;
00835 return(0);
00836 }
00837
00838 void cacheSetUseTasks() {
00839 cache->useTasks = 1;
00840 }
00841
00842 int cacheGetUseTasks() {
00843 return cache->useTasks;
00844 }
00845
00846 void cacheSetForwardTaskMsgs(int forward) {
00847 cache->forwardTaskMsgs = forward;
00848 }
00849
00850 int cacheGetForwardTaskMsgs() {
00851 return cache->forwardTaskMsgs;
00852 }
00853
00854 int *cacheGetFinishedTasks(int *numTasks) {
00855 return taskIdListToArray(cache->terminatedTasks, numTasks);
00856 }
00857
00858
00859 Cache *___getCache() {
00860 return cache;
00861 }
00862