FilterDev.c File Reference

This file implements all functions called by filters. More...

#include <pvm3.h>
#include <stdlib.h>
#include <assert.h>
#include <signal.h>
#include <sys/time.h>
#include <string.h>
#include <unistd.h>
#include "FilterDev.h"
#include <Task/Task.h>
#include <DataSpace/DataSpace.h>
#include <Cache.h>
#include "../FilterData/FilterData.h"
#include "../FilterData/Termination.h"
#include "../constants.h"
#include "../Messages.h"
#include "hash.h"

Include dependency graph for FilterDev.c:

Go to the source code of this file.

Data Structures

struct  BufSt
 This is the send buffer. More...

Defines

#define KEY_INT
#define VAL_VOID

Functions

InputPortHandler dsGetInputPortByName (const char *name)
 get the handlers of ports
OutputPortHandler dsGetOutputPortByName (const char *name)
int dsCloseOutputPort (OutputPortHandler oph)
int dsGetNumWriters (InputPortHandler iph)
 these functions return the number of instances on the other side of the strem say, we have a filter A->B.
int dsGetNumReaders (OutputPortHandler oph)
void dsMCast (OutputPort *op, void *buf, int bufSz)
void dsSend (int tid, int msgtag, void *buf, int bufSz)
int dsWriteBuffer (OutputPortHandler oph, void *buf, unsigned int bufSz)
 function that writes to other filter(pack and send)
int dsInitPack (int initSize)
 function to start packing data, size in bytes
int dsPackData (void *data, int size)
 function to pack data to the buffer
int dsWritePackedBuffer (OutputPortHandler oph)
 function to send the packed buffer
void taskSend (int taskId, int *deps, int depSize, char *metadata, int metaSize, int creatorTid)
int endTaskSend (int taskId, int enderTid)
int dsReadBuffer (InputPortHandler iph, void *buf, int szBuf)
 function that reads from other filter(receive and unpack)
int recvNonBlockingData (InputPort *ip)
int dsReadNonBlockingBuffer (InputPortHandler iph, void *buf, int szBuf)
 returns 0 if there is no data to be received
int dsInitReceive (InputPortHandler iph)
 function to receive a buffer, and unpack later
int dsUnpackData (void *buf, int size)
 function to unpack data
char * dsGetFilterName ()
 get filter name
int dsGetFilterId ()
 get filter id
int dsGetNumInputPorts ()
 function to get the number of one filter input ports
char ** dsGetInputPortNames ()
 function to get all input ports names of one filter
int dsGetNumOutputPorts ()
 function to get the number of one filter output ports
char ** dsGetOutputPortNames ()
 function to get all output ports names of one filter
int dsGetNumUpStreamsRunning ()
 get the number of writers to me that are still running
int dsGetMyRank ()
 Function that returns which brother am I.
int dsGetTotalInstances ()
 returns the total intances of this filter(me + mybrothers)
int dsExit (char *userMesg)
 kill all filters and finish void, user calls this to exit the system abnormally
int dsProbe (InputPortHandler iph)
 funtion to probe an input port for data, returns the size of the buffer, 0 otherwise
int dsGetMachineMemory ()
 get the ammount of memory this machine has for this execution
int dsGetLocalInstances ()
 the number of brothers I have in this machine + 1 brothers are the same filters as me, so say, you have a filter A running here, and 2 Bs runnning here, if A calls this hell get 1, and B will get 2.
void dsUseTasks ()
 Task functions.
int dsGetCurrentTask ()
 get the current task we are working on
int dsSetCurrentTask (int taskId)
int * dsGetTaskDeps (int taskId, int *depsSz)
int dsCreateTask (int taskId, int *deps, int depSize, char *metadata, int metaSize)
 creates a new task
int dsEndTask (int taskId)
 ends a task
int dsPutData (char *id, void *val, int valSize)
 Data functions.
void * dsGetData (int taskId, char *id, int *valSz)
int dsRemoveData (char *id)
void dsInstSetStates (char **states, int numStates)
void dsInstSwitchState (int stateId)
void dsInstEnterState (int stateId)
void dsInstLeaveState ()
int dsGetRunningTasks (int *numTasks, int **taskList)
int * dsGetFinishedTasks (int *numTasks)
int dsRegisterRecoverCallback (RecoverCallback_t *callback)


Detailed Description

This file implements all functions called by filters.

Todo:
Clean this file, It should be like the linux VFS, inly redirecting calls to the apropriate Anthill module.
Todo:
Change functions prefix to Anthill prefix: ahXxx().
Todo:
Purge message packing and create a wrapper to it.

Definition in file FilterDev.c.


Define Documentation

#define KEY_INT
 

Definition at line 20 of file FilterDev.c.

#define VAL_VOID
 

Definition at line 21 of file FilterDev.c.


Function Documentation

int dsCloseOutputPort OutputPortHandler  oph  ) 
 

Definition at line 109 of file FilterDev.c.

References closeOutputPort(), fd, __FilterData__::outputPorts, trcEnterState(), trcLeaveState(), and VT_OH_CLOSEOPORT.

Here is the call graph for this function:

int dsCreateTask int  taskId,
int *  deps,
int  depSize,
char *  metadata,
int  metaSize
 

creates a new task

Definition at line 1468 of file FilterDev.c.

References cacheCreateTask(), cacheGetForwardTaskMsgs(), fd, taskSend(), trcEnterState(), trcLeaveState(), and VT_OH_CREATETASK.

Referenced by defaultRecoveryCallback(), and dsReadBuffer().

Here is the call graph for this function:

int dsEndTask int  taskId  ) 
 

ends a task

Definition at line 1490 of file FilterDev.c.

References cacheEndTask(), cacheGetForwardTaskMsgs(), E_TASK_NOT_RUNNING, endTaskSend(), fd, trcEnterState(), trcLeaveState(), and VT_OH_ENDTASK.

Referenced by dsReadNonBlockingBuffer().

Here is the call graph for this function:

int dsExit char *  userMesg  ) 
 

kill all filters and finish void, user calls this to exit the system abnormally

used to notify manager the application called exit(dsExit)

Definition at line 1354 of file FilterDev.c.

References fd, MSGT_AEXIT, trcDestroyData(), trcEnterState(), trcLeaveState(), and VT_OH_EXIT.

Here is the call graph for this function:

int dsGetCurrentTask  ) 
 

get the current task we are working on

Definition at line 1441 of file FilterDev.c.

References cacheGetCurrentTask(), fd, trcEnterState(), trcLeaveState(), and VT_OH_GETTASK.

Here is the call graph for this function:

void* dsGetData int  taskId,
char *  id,
int *  valSz
 

Definition at line 1513 of file FilterDev.c.

References cacheGetData().

Here is the call graph for this function:

int dsGetFilterId  ) 
 

get filter id

Definition at line 1276 of file FilterDev.c.

References fd, and __FilterData__::id.

char* dsGetFilterName  ) 
 

get filter name

Definition at line 1271 of file FilterDev.c.

References fd, and __FilterData__::name.

int* dsGetFinishedTasks int *  numTasks  ) 
 

Definition at line 1555 of file FilterDev.c.

References cacheGetFinishedTasks().

Here is the call graph for this function:

InputPortHandler dsGetInputPortByName const char *  name  ) 
 

get the handlers of ports

Definition at line 58 of file FilterDev.c.

References fd, __FilterData__::inputPorts, InputPort::name, __FilterData__::numInputPorts, trcEnterState(), trcLeaveState(), and VT_OH_GETIPORT.

Here is the call graph for this function:

char** dsGetInputPortNames  ) 
 

function to get all input ports names of one filter

Definition at line 1286 of file FilterDev.c.

References fd, __FilterData__::inputPorts, InputPort::name, and __FilterData__::numInputPorts.

int dsGetLocalInstances  ) 
 

the number of brothers I have in this machine + 1 brothers are the same filters as me, so say, you have a filter A running here, and 2 Bs runnning here, if A calls this hell get 1, and B will get 2.

Definition at line 1426 of file FilterDev.c.

References fd, getFDLocalInstances(), trcEnterState(), trcLeaveState(), and VT_OH_GETLI.

Here is the call graph for this function:

int dsGetMachineMemory  ) 
 

get the ammount of memory this machine has for this execution

Definition at line 1418 of file FilterDev.c.

References fd, getFDMachineMem(), trcEnterState(), trcLeaveState(), and VT_OH_GETMEM.

Here is the call graph for this function:

int dsGetMyRank  ) 
 

Function that returns which brother am I.

Definition at line 1332 of file FilterDev.c.

References fd, __FilterData__::myRank, trcEnterState(), trcLeaveState(), and VT_OH_GETMYRANK.

Here is the call graph for this function:

int dsGetNumInputPorts  ) 
 

function to get the number of one filter input ports

Definition at line 1281 of file FilterDev.c.

References fd, and __FilterData__::numInputPorts.

int dsGetNumOutputPorts  ) 
 

function to get the number of one filter output ports

Definition at line 1300 of file FilterDev.c.

References fd, and __FilterData__::numOutputPorts.

int dsGetNumReaders OutputPortHandler  oph  ) 
 

Definition at line 144 of file FilterDev.c.

References fd, OutputPort::numDestinations, __FilterData__::numOutputPorts, __FilterData__::outputPorts, trcEnterState(), trcLeaveState(), and VT_OH_GETNR.

Here is the call graph for this function:

int dsGetNumUpStreamsRunning  ) 
 

get the number of writers to me that are still running

Definition at line 1319 of file FilterDev.c.

References fd, __FilterData__::inputPorts, InputPort::numEowRecv, __FilterData__::numInputPorts, and InputPort::numSources.

int dsGetNumWriters InputPortHandler  iph  ) 
 

these functions return the number of instances on the other side of the strem say, we have a filter A->B.

B knows it will receive one message only from each A. He can use this number to leave a loop like while (numMessages != dsGetNumWriters(iph))

Definition at line 124 of file FilterDev.c.

References fd, __FilterData__::inputPorts, __FilterData__::numInputPorts, InputPort::numSources, trcEnterState(), trcLeaveState(), and VT_OH_GETNW.

Here is the call graph for this function:

OutputPortHandler dsGetOutputPortByName const char *  name  ) 
 

Definition at line 83 of file FilterDev.c.

References fd, OutputPort::name, __FilterData__::numOutputPorts, __FilterData__::outputPorts, trcEnterState(), trcLeaveState(), and VT_OH_GETOPORT.

Here is the call graph for this function:

char** dsGetOutputPortNames  ) 
 

function to get all output ports names of one filter

Definition at line 1305 of file FilterDev.c.

References fd, OutputPort::name, __FilterData__::numOutputPorts, and __FilterData__::outputPorts.

int dsGetRunningTasks int *  numTasks,
int **  taskList
 

Definition at line 1550 of file FilterDev.c.

References cacheGetRunningTasks().

Here is the call graph for this function:

int* dsGetTaskDeps int  taskId,
int *  depsSz
 

Definition at line 1463 of file FilterDev.c.

References cacheGetTaskDeps().

Here is the call graph for this function:

int dsGetTotalInstances  ) 
 

returns the total intances of this filter(me + mybrothers)

Definition at line 1344 of file FilterDev.c.

References fd, __FilterData__::numInstances, trcEnterState(), trcLeaveState(), and VT_OH_GETTI.

Here is the call graph for this function:

int dsInitPack int  initSize  ) 
 

function to start packing data, size in bytes

Warning:
Multithread apps will crash if they use pack and unpack.
Deprecated:
Packing and unpacking is deprecated, you should use only dsWriteBuffer() and dsReadBuffer().

Definition at line 458 of file FilterDev.c.

References BufSt::buffer, dsPackData(), fd, BufSt::maxSize, MINSIZE, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_INITPACK.

Here is the call graph for this function:

int dsInitReceive InputPortHandler  iph  ) 
 

function to receive a buffer, and unpack later

Warning:
Multithread apps will crash if they use pack and unpack.
Deprecated:
Packing and unpacking is deprecated, you should use only dsWriteBuffer() and dsReadBuffer().

Definition at line 1061 of file FilterDev.c.

References EOW, fd, trcEnterState(), trcLeaveState(), and VT_OH_IRECEIVE.

Here is the call graph for this function:

void dsInstEnterState int  stateId  ) 
 

Definition at line 1537 of file FilterDev.c.

References fd, and instEnterState().

Here is the call graph for this function:

void dsInstLeaveState  ) 
 

Definition at line 1544 of file FilterDev.c.

References fd, and instLeaveState().

Here is the call graph for this function:

void dsInstSetStates char **  states,
int  numStates
 

Definition at line 1525 of file FilterDev.c.

References fd, and instSetUserStates().

Here is the call graph for this function:

void dsInstSwitchState int  stateId  ) 
 

Definition at line 1531 of file FilterDev.c.

References fd, and instSwitchState().

Here is the call graph for this function:

void dsMCast OutputPort op,
void *  buf,
int  bufSz
 

all filter to filter messages should have this

Definition at line 163 of file FilterDev.c.

References cacheGetCurrentTask(), fd, instEnterState(), MSGT_F2F, OutputPort::numDestinations, OutputPort::tag, OutputPort::tidsDestinations, and TIMER_WRITE_BLOCKED.

Referenced by dsWriteBuffer().

Here is the call graph for this function:

int dsPackData void *  data,
int  size
 

function to pack data to the buffer

Warning:
Multithread apps will crash if they use pack and unpack.
Deprecated:
Packing and unpacking is deprecated, you should use only dsWriteBuffer() and dsReadBuffer().

Definition at line 507 of file FilterDev.c.

References BufSt::buffer, fd, BufSt::maxSize, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_PACK.

Referenced by dsInitPack().

Here is the call graph for this function:

int dsProbe InputPortHandler  iph  ) 
 

funtion to probe an input port for data, returns the size of the buffer, 0 otherwise

Definition at line 1382 of file FilterDev.c.

References fd, __FilterData__::inputPorts, InputPort::tag, trcEnterState(), trcLeaveState(), and VT_OH_PROBE.

Here is the call graph for this function:

int dsPutData char *  id,
void *  val,
int  valSize
 

Data functions.

Definition at line 1509 of file FilterDev.c.

References cachePutData().

Here is the call graph for this function:

int dsReadBuffer InputPortHandler  iph,
void *  buf,
int  szBuf
 

function that reads from other filter(receive and unpack)

used to notify filter a new task

used to notify filter the end of a task

used to notify manager this filter ended its work

all filter to filter messages should have this

Definition at line 689 of file FilterDev.c.

References beginTerminationDetection(), cacheCreateTask(), cacheEndTask(), cacheGetForwardTaskMsgs(), dsCreateTask(), dsSetCurrentTask(), E_TASK_NOT_RUNNING, endTaskSend(), EOW, ERROR, fd, __FilterData__::inputPorts, instEnterState(), instLeaveState(), MSGT_CREATETASK, MSGT_ENDTASK, MSGT_EOW, MSGT_F2F, MSGT_INITTERM, NOTPARTICIPATING, InputPort::numEowRecv, __FilterData__::numInportsAdded, __FilterData__::numInputPorts, InputPort::numSources, _TerminationDetection_::status, InputPort::tag, taskSend(), tdd, terminationDetectionRound(), TIMER_READ, trcEnterState(), trcLeaveState(), trcResetCurrentState(), updateTermStreamToEmpty(), VT_COMM_READ, VT_IDLE_READ, VT_LEAVE_COMM_READ, and VT_OH_RBUFFER.

Here is the call graph for this function:

int dsReadNonBlockingBuffer InputPortHandler  iph,
void *  buf,
int  szBuf
 

returns 0 if there is no data to be received

used to notify filter a new task

used to notify filter the end of a task

used to notify manager this filter ended its work

all filter to filter messages should have this

Definition at line 909 of file FilterDev.c.

References cacheCreateTask(), cacheGetForwardTaskMsgs(), dsEndTask(), dsSetCurrentTask(), EOW, ERROR, fd, __FilterData__::inputPorts, instEnterState(), instLeaveState(), MSGT_CREATETASK, MSGT_ENDTASK, MSGT_EOW, MSGT_F2F, MSGT_INITTERM, InputPort::numEowRecv, __FilterData__::numInportsAdded, __FilterData__::numInputPorts, InputPort::numSources, recvNonBlockingData(), InputPort::tag, taskSend(), terminationDetectionRound(), TIMER_READ, trcEnterState(), trcLeaveState(), updateTermStreamToEmpty(), VT_COMM_READ, and VT_OH_RBUFFER.

Here is the call graph for this function:

int dsRegisterRecoverCallback RecoverCallback_t callback  ) 
 

Definition at line 1559 of file FilterDev.c.

References cacheRegisterRecoverCallback().

Here is the call graph for this function:

int dsRemoveData char *  id  ) 
 

Definition at line 1517 of file FilterDev.c.

References cacheRemoveData().

Here is the call graph for this function:

void dsSend int  tid,
int  msgtag,
void *  buf,
int  bufSz
 

all filter to filter messages should have this

Definition at line 189 of file FilterDev.c.

References cacheGetCurrentTask(), fd, instEnterState(), instLeaveState(), MSGT_F2F, and TIMER_WRITE_BLOCKED.

Here is the call graph for this function:

int dsSetCurrentTask int  taskId  ) 
 

Definition at line 1452 of file FilterDev.c.

References cacheSetCurrentTask(), fd, trcEnterState(), trcLeaveState(), and VT_OH_SETTASK.

Referenced by dsReadBuffer(), and dsReadNonBlockingBuffer().

Here is the call graph for this function:

int dsUnpackData void *  buf,
int  size
 

function to unpack data

Warning:
Multithread apps will crash if they use pack and unpack.
Deprecated:
Packing and unpacking is deprecated, you should use only dsWriteBuffer() and dsReadBuffer().
Todo:
Modify this to return what it really read

Definition at line 1246 of file FilterDev.c.

References BufSt::buffer, fd, BufSt::maxSize, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_UNPACK.

Here is the call graph for this function:

void dsUseTasks  ) 
 

Task functions.

Definition at line 1437 of file FilterDev.c.

References cacheSetUseTasks().

Here is the call graph for this function:

int dsWriteBuffer OutputPortHandler  oph,
void *  buf,
unsigned int  bufSz
 

function that writes to other filter(pack and send)

2 things can happen here. Either the message destination is a single instance or message has multiple destinations. we treat the multicast first, then the others

all filter to filter messages should have this

Definition at line 245 of file FilterDev.c.

References BROADCAST, dsMCast(), fd, __LabeledStreamData::getLabel, instEnterState(), instLeaveState(), OutputPort::lsData, MAX_LBL_LENGTH, MAXINSTANCES, __LabeledStreamData::mlshash, MULTICAST_LABELED_STREAM, OutputPort::numDestinations, __FilterData__::outputPorts, OutputPort::tidsDestinations, TIMER_WRITE, trcEnterState(), trcLeaveState(), VT_COMM_WRITE, VT_OH_WBUFFER, and OutputPort::writePolicy.

Referenced by dsWritePackedBuffer().

Here is the call graph for this function:

int dsWritePackedBuffer OutputPortHandler  oph  ) 
 

function to send the packed buffer

Warning:
Multithread apps will crash if they use pack and unpack.
Deprecated:
Packing and unpacking is deprecated, you should use only dsWriteBuffer() and dsReadBuffer().

Definition at line 533 of file FilterDev.c.

References BufSt::buffer, dsWriteBuffer(), fd, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_WPBUFFER.

Here is the call graph for this function:

int endTaskSend int  taskId,
int  enderTid
 

used to notify filter the end of a task

Definition at line 643 of file FilterDev.c.

References fd, MSGT_ENDTASK, OutputPort::numDestinations, __FilterData__::numOutputPorts, __FilterData__::outputPorts, and OutputPort::tidsDestinations.

Referenced by dsEndTask(), and dsReadBuffer().

int recvNonBlockingData InputPort ip  ) 
 

Definition at line 896 of file FilterDev.c.

References InputPort::tag.

Referenced by dsReadNonBlockingBuffer().

void taskSend int  taskId,
int *  deps,
int  depSize,
char *  metadata,
int  metaSize,
int  creatorTid
 

used to notify filter a new task

Definition at line 591 of file FilterDev.c.

References fd, MSGT_CREATETASK, OutputPort::numDestinations, __FilterData__::numOutputPorts, __FilterData__::outputPorts, and OutputPort::tidsDestinations.

Referenced by dsCreateTask(), dsReadBuffer(), and dsReadNonBlockingBuffer().


Generated on Tue Jan 17 19:21:55 2006 for Void by  doxygen 1.4.6