00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include <dlfcn.h>
00004 #include <pvm3.h>
00005 #include "Ports.h"
00006 #include "../Messages.h"
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 OutputPort *createOutputPort(){
00018 OutputPort *outputPort = (OutputPort *) malloc(sizeof(OutputPort));
00019 if(outputPort == NULL){
00020 printf("createOutputPort: Could not alocate memory\n");
00021 exit(1);
00022 }
00023 setOPState(outputPort, 1);
00024 return outputPort;
00025 }
00026
00027
00028 void destroyOutputPort(OutputPort *o){
00029
00030 if ((o->writePolicy == LABELED_STREAM) || (o->writePolicy == MULTICAST_LABELED_STREAM)){
00031 dlclose(o->lsData.lsHandler);
00032 }
00033
00034 if(o == NULL){
00035 printf("destroyOutputPort: We cant destroy a null outputPort\n");
00036 exit(1);
00037 }
00038
00039 free(o);
00040 }
00041
00042
00043 void resetOutputPort(OutputPort *op){
00044 setOPState(op, OP_STATE_OPEN);
00045 }
00046
00047
00048 void closeOutputPort(OutputPort *op){
00049 if (op->state == OP_STATE_CLOSED)
00050 return;
00051
00052
00053 int msgType = MSGT_EOW;
00054 int taskId = -1;
00055
00056 pvm_initsend(PvmDataRaw);
00057 pvm_pkint(&msgType, 1, 1);
00058 pvm_pkint(&taskId, 1, 1);
00059
00060 pvm_mcast(op->tidsDestinations, op->numDestinations, op->tag);
00061
00062 setOPState(op, OP_STATE_CLOSED);
00063 }
00064
00065
00066 void setOPState(OutputPort *op, int state){
00067 op->state = state;
00068 }
00069
00070
00071 void setOPName(OutputPort *op, const char *name){
00072 op->name = (char *)strdup(name);
00073 }
00074
00075
00076 void setOPNumDestinations(OutputPort *op, int num){
00077 op->numDestinations = num;
00078 }
00079
00080
00081 void setOPWritePolicy(OutputPort *op, writePolicy_t p){
00082 op->writePolicy = p;
00083 }
00084
00085
00086 void setOPNextToSend(OutputPort *op, int next){
00087 op->nextToSend = next;
00088 }
00089
00090
00091 void setOPTag(OutputPort *op, const int tag){
00092 op->tag = tag;
00093 }
00094
00095 void setOPLibName(OutputPort *op, const char *libname){
00096 int libnameSize = strlen(libname);
00097
00098
00099 if((libnameSize + 1) > MAX_HNAME_LENGTH){
00100 printf("Warning: Lib name out of bounds\n");
00101 memcpy(op->lsData.libname, libname, MAX_LNAME_LENGTH);
00102 op->lsData.libname[MAX_LNAME_LENGTH -1] = '\0';
00103 }else{
00104 memcpy(op->lsData.libname, libname, libnameSize +1);
00105 }
00106 }
00107
00108
00109 void setOPTidsDestinations(OutputPort *op, const int *tids){
00110 int i;
00111
00112 op->tidsDestinations = (int *) malloc(sizeof(int) * op->numDestinations);
00113
00114 if(op->tidsDestinations == NULL){
00115 printf("Could not allocate memory\n");
00116 }
00117 for(i = 0; i < op->numDestinations; i++){
00118 op->tidsDestinations[i] = tids[i];
00119 }
00120 }
00121
00122
00123
00124
00125
00126 InputPort *createInputPort(){
00127 InputPort *inputPort = (InputPort *) malloc(sizeof(InputPort));
00128 if(inputPort == NULL){
00129 printf("createInputPort: Could not allocate memory\n");
00130 exit(1);
00131 }
00132
00133 inputPort->numEowRecv = 0;
00134
00135 return inputPort;
00136 }
00137
00138
00139 void destroyInputPort(InputPort *o){
00140 if(o == NULL){
00141 printf("destroyInputPort: We cant destroy a null outputPort\n");
00142 exit(1);
00143 }
00144
00145 free(o);
00146 }
00147
00148 void resetInputPort(InputPort *ip){
00149
00150 while (pvm_probe(-1, ip->tag) != 0){
00151 pvm_recv(-1, ip->tag);
00152 }
00153
00154 ip->numEowRecv = 0;
00155 }
00156
00157
00158 void setIPName(InputPort *ip, const char *name){
00159 ip->name = (char *)strdup(name);
00160 }
00161
00162
00163 void setIPNumSources(InputPort *ip, int num){
00164 ip->numSources = num;
00165 }
00166
00167
00168 void setIPTag(InputPort *ip, int tag){
00169 ip->tag = tag;
00170 }
00171
00172 void setIPTidsSources(InputPort *ip, const int *tids){
00173 int i;
00174
00175 if(ip->numSources <= 0 ){
00176 printf("Warning: the number of tids needs to be set in ip->numSource before copy\n");
00177 exit(1);
00178 }
00179 ip->tidsSources = (int *) malloc(sizeof(int) * ip->numSources);
00180
00181
00182 for(i = 0; i < ip->numSources; i++){
00183 ip->tidsSources[i] = tids[i];
00184 }
00185 }
00186
00187 int loadOPLSData(OutputPort *o){
00188 char *error = NULL;
00189 char *libnameLocal;
00190
00191
00192 libnameLocal = (char*)malloc(strlen(o->lsData.libname) + 3);
00193 sprintf(libnameLocal, "./%s",o->lsData.libname);
00194
00195
00196 if (((o->lsData.lsHandler = dlopen(libnameLocal, RTLD_NOW)) == NULL) &&
00197 ((o->lsData.lsHandler = dlopen(o->lsData.libname, RTLD_NOW)) == NULL )) {
00198 fprintf(stderr, "Ports.c: could not load labeled stream %s library: %s\n", o->lsData.libname, dlerror());
00199 return -1;
00200 }
00201
00202 free(libnameLocal);
00203
00204
00205
00206
00207
00208 o->lsData.hash = (Hash*)dlsym(o->lsData.lsHandler, "hash");
00209 if ((error = dlerror()) != NULL) {
00210 o->lsData.hash = (Hash*)dlsym(o->lsData.lsHandler, "_Z4hashPci");
00211 if ((error = dlerror()) != NULL) {
00212 o->lsData.hash = &hashDefault;
00213 fprintf (stderr, "Ports.c: Loading default hash: %s\n", error);
00214 }
00215 }
00216
00217
00218 o->lsData.getLabel = (GetLabel*)dlsym(o->lsData.lsHandler, "getLabel");
00219 if ((error = dlerror()) != NULL) {
00220 o->lsData.getLabel = (GetLabel*)dlsym(o->lsData.lsHandler, "_Z8getLabelPviPc");
00221 if ((error = dlerror()) != NULL) {
00222 fprintf (stderr, "Ports.c: error loading getLabel: %s\n", error);
00223 return -1;
00224 }
00225 }
00226 return 1;
00227 }
00228
00229
00230 int loadOPMLSData(OutputPort *o){
00231 char *error = NULL;
00232 char *libnameLocal;
00233
00234
00235 libnameLocal = (char*)malloc(strlen(o->lsData.libname) + 3);
00236 sprintf(libnameLocal, "./%s",o->lsData.libname);
00237
00238
00239 if (((o->lsData.lsHandler = dlopen(libnameLocal, RTLD_NOW)) == NULL) &&
00240 ((o->lsData.lsHandler = dlopen(o->lsData.libname, RTLD_NOW)) == NULL )) {
00241 fprintf(stderr, "Ports.c: could not load mls %s library: %s\n", o->lsData.libname, dlerror());
00242 return -1;
00243 }
00244
00245 free(libnameLocal);
00246
00247
00248
00249
00250
00251 o->lsData.mlshash = (MLSHash*)dlsym(o->lsData.lsHandler, "mlshash");
00252 if ((error = dlerror()) != NULL) {
00253
00254
00255
00256 o->lsData.mlshash = &mlsHashDefault;
00257 fprintf (stderr, "Ports.c: Loading default mlshash: %s\n", error);
00258
00259 }
00260
00261
00262 o->lsData.getLabel = (GetLabel*)dlsym(o->lsData.lsHandler, "getLabel");
00263 if ((error = dlerror()) != NULL) {
00264 o->lsData.getLabel = (GetLabel*)dlsym(o->lsData.lsHandler, "_Z8getLabelPviPc");
00265 if ((error = dlerror()) != NULL) {
00266 fprintf (stderr, "Ports.c: error loading getLabel: %s\n", error);
00267 return -1;
00268 }
00269 }
00270 return 1;
00271 }