00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include <dlfcn.h>
00004 #include <pvm3.h>
00005 #include "FilterSpec.h"
00006 #include "Messages.h"
00007
00008
00009
00010
00011
00012
00013 FilterSpec *createFilterSpec(char *filterName, char *libname) {
00014 int i;
00015 FilterSpec *fs = (FilterSpec *)malloc(sizeof(FilterSpec));
00016 for (i=0;i<MAXINSTANCES;i++){
00017 fs->filterPlacement.hosts[i] = (char*)malloc(sizeof(char)*MAX_HNAME_LENGTH);
00018 fs->filterPlacement.numInstances = 0;
00019 }
00020 strncpy(fs->name, filterName, MAX_FNAME_LENGTH);
00021 strncpy(fs->libname, libname, MAX_LNAME_LENGTH);
00022 return fs;
00023 }
00024
00025 void destroyFilterSpec(FilterSpec *f){
00026
00027 free(f);
00028 }
00029
00030
00031
00032
00033 int addHostToFilter(FilterSpec *fs, char *hostName){
00034
00035 if (fs->filterPlacement.numInstances >= MAXINSTANCES){
00036 printf("FilterSpec.c: cant add host, maximum number of hosts reached for this filter");
00037 return -1;
00038 }
00039 else{
00040 strncpy(fs->filterPlacement.hosts[fs->filterPlacement.numInstances++], hostName, MAX_HNAME_LENGTH);
00041 }
00042
00043 return (1);
00044 }
00045
00046 int addHostsToFilter(FilterSpec *fs, char *hostName, int qty){
00047 int i;
00048 for (i=0; i<qty; i++){
00049 if (!addHostToFilter(fs, hostName)) return -1;
00050 }
00051 return (1);
00052 }
00053
00054
00055
00056
00057 int addInputToFilter(FilterSpec *fs, StreamSpec *ss){
00058 if (fs->numInputs >= MAXINPSTREAMS)
00059 return 0;
00060
00061 fs->inputs[fs->numInputs++] = ss;
00062 return 1;
00063 }
00064
00065 int addOutputToFilter(FilterSpec *fs, StreamSpec *ss){
00066 if (fs->numInputs >= MAXOUTSTREAMS)
00067 return 0;
00068
00069 fs->outputs[fs->numOutputs++] = ss;
00070 return 1;
00071 }
00072
00073
00074
00075 int fsSpawnInstances(FilterSpec *fs, char *command, char **argvSpawn){
00076 int j, spawnReturn;
00077
00078
00079 for(j = 0; j < fs->filterPlacement.numInstances; j++) {
00080
00081 char hname[MAX_HNAME_LENGTH+1];
00082 strcpy(hname, fs->filterPlacement.hosts[j]);
00083 spawnReturn = pvm_spawn(command, argvSpawn, PvmTaskHost,
00084 hname, 1,
00085 &(fs->filterPlacement.tids[j]));
00086
00087
00088 if (spawnReturn < 0) {
00089 printf("FilterSpec.c: Error creating filter %s, host %s\n",
00090 fs->name, hname);
00091 pvm_perror("");
00092 exit(1);
00093 }
00094 else if (spawnReturn == 0){
00095 printf("FilterSpec.c: Error creating filter %s, host %s\n",
00096 fs->name, hname);
00097 switch ((fs->filterPlacement.tids[j])){
00098 case PvmBadParam:
00099 printf("FilterSpec.c: Invalid argument\n");
00100 break;
00101 case PvmNoHost:
00102 printf("FilterSpec.c: Specified host not on virtual machine\n");
00103 break;
00104 case PvmNoFile:
00105 printf("FilterSpec.c: Cant find executable on remote machine\n");
00106 break;
00107 case PvmNoMem:
00108 printf("FilterSpec.c: out of memory\n");
00109 break;
00110 case PvmSysErr:
00111 printf("FilterSpec.c: system error\n");
00112 break;
00113 case PvmOutOfRes:
00114 printf("FilterSpec.c: out of resources\n");
00115 break;
00116 default:
00117 pvm_perror("pvm_spawn()");
00118 break;
00119 }
00120 return -1;
00121 }
00122
00123 pvm_notify(PvmTaskExit, MSGT_TEXIT, 1, &(fs->filterPlacement.tids[j]));
00124 pvm_notify(PvmHostDelete, MSGT_HDEL, 1, &(fs->filterPlacement.tids[j]));
00125 }
00126
00127 return 0;
00128
00129 }
00130