#include <pvm3.h>
#include <stdlib.h>
#include <assert.h>
#include <signal.h>
#include <sys/time.h>
#include <string.h>
#include <unistd.h>
#include "FilterDev.h"
#include <Task/Task.h>
#include <DataSpace/DataSpace.h>
#include <Cache.h>
#include "../FilterData/FilterData.h"
#include "../FilterData/Termination.h"
#include "../constants.h"
#include "../Messages.h"
#include "hash.h"
Include dependency graph for FilterDev.c:

Go to the source code of this file.
Data Structures | |
| struct | BufSt |
| This is the send buffer. More... | |
Defines | |
| #define | KEY_INT |
| #define | VAL_VOID |
Functions | |
| InputPortHandler | dsGetInputPortByName (const char *name) |
| get the handlers of ports | |
| OutputPortHandler | dsGetOutputPortByName (const char *name) |
| int | dsCloseOutputPort (OutputPortHandler oph) |
| int | dsGetNumWriters (InputPortHandler iph) |
| these functions return the number of instances on the other side of the strem say, we have a filter A->B. | |
| int | dsGetNumReaders (OutputPortHandler oph) |
| void | dsMCast (OutputPort *op, void *buf, int bufSz) |
| void | dsSend (int tid, int msgtag, void *buf, int bufSz) |
| int | dsWriteBuffer (OutputPortHandler oph, void *buf, unsigned int bufSz) |
| function that writes to other filter(pack and send) | |
| int | dsInitPack (int initSize) |
| function to start packing data, size in bytes | |
| int | dsPackData (void *data, int size) |
| function to pack data to the buffer | |
| int | dsWritePackedBuffer (OutputPortHandler oph) |
| function to send the packed buffer | |
| void | taskSend (int taskId, int *deps, int depSize, char *metadata, int metaSize, int creatorTid) |
| int | endTaskSend (int taskId, int enderTid) |
| int | dsReadBuffer (InputPortHandler iph, void *buf, int szBuf) |
| function that reads from other filter(receive and unpack) | |
| int | recvNonBlockingData (InputPort *ip) |
| int | dsReadNonBlockingBuffer (InputPortHandler iph, void *buf, int szBuf) |
| returns 0 if there is no data to be received | |
| int | dsInitReceive (InputPortHandler iph) |
| function to receive a buffer, and unpack later | |
| int | dsUnpackData (void *buf, int size) |
| function to unpack data | |
| char * | dsGetFilterName () |
| get filter name | |
| int | dsGetFilterId () |
| get filter id | |
| int | dsGetNumInputPorts () |
| function to get the number of one filter input ports | |
| char ** | dsGetInputPortNames () |
| function to get all input ports names of one filter | |
| int | dsGetNumOutputPorts () |
| function to get the number of one filter output ports | |
| char ** | dsGetOutputPortNames () |
| function to get all output ports names of one filter | |
| int | dsGetNumUpStreamsRunning () |
| get the number of writers to me that are still running | |
| int | dsGetMyRank () |
| Function that returns which brother am I. | |
| int | dsGetTotalInstances () |
| returns the total intances of this filter(me + mybrothers) | |
| int | dsExit (char *userMesg) |
| kill all filters and finish void, user calls this to exit the system abnormally | |
| int | dsProbe (InputPortHandler iph) |
| funtion to probe an input port for data, returns the size of the buffer, 0 otherwise | |
| int | dsGetMachineMemory () |
| get the ammount of memory this machine has for this execution | |
| int | dsGetLocalInstances () |
| the number of brothers I have in this machine + 1 brothers are the same filters as me, so say, you have a filter A running here, and 2 Bs runnning here, if A calls this hell get 1, and B will get 2. | |
| void | dsUseTasks () |
| Task functions. | |
| int | dsGetCurrentTask () |
| get the current task we are working on | |
| int | dsSetCurrentTask (int taskId) |
| int * | dsGetTaskDeps (int taskId, int *depsSz) |
| int | dsCreateTask (int taskId, int *deps, int depSize, char *metadata, int metaSize) |
| creates a new task | |
| int | dsEndTask (int taskId) |
| ends a task | |
| int | dsPutData (char *id, void *val, int valSize) |
| Data functions. | |
| void * | dsGetData (int taskId, char *id, int *valSz) |
| int | dsRemoveData (char *id) |
| void | dsInstSetStates (char **states, int numStates) |
| void | dsInstSwitchState (int stateId) |
| void | dsInstEnterState (int stateId) |
| void | dsInstLeaveState () |
| int | dsGetRunningTasks (int *numTasks, int **taskList) |
| int * | dsGetFinishedTasks (int *numTasks) |
| int | dsRegisterRecoverCallback (RecoverCallback_t *callback) |
Definition in file FilterDev.c.
|
|
Definition at line 20 of file FilterDev.c. |
|
|
Definition at line 21 of file FilterDev.c. |
|
|
Definition at line 109 of file FilterDev.c. References closeOutputPort(), fd, __FilterData__::outputPorts, trcEnterState(), trcLeaveState(), and VT_OH_CLOSEOPORT. Here is the call graph for this function: ![]() |
|
||||||||||||||||||||||||
|
creates a new task
Definition at line 1468 of file FilterDev.c. References cacheCreateTask(), cacheGetForwardTaskMsgs(), fd, taskSend(), trcEnterState(), trcLeaveState(), and VT_OH_CREATETASK. Referenced by defaultRecoveryCallback(), and dsReadBuffer(). Here is the call graph for this function: ![]() |
|
|
ends a task
Definition at line 1490 of file FilterDev.c. References cacheEndTask(), cacheGetForwardTaskMsgs(), E_TASK_NOT_RUNNING, endTaskSend(), fd, trcEnterState(), trcLeaveState(), and VT_OH_ENDTASK. Referenced by dsReadNonBlockingBuffer(). Here is the call graph for this function: ![]() |
|
|
kill all filters and finish void, user calls this to exit the system abnormally used to notify manager the application called exit(dsExit) Definition at line 1354 of file FilterDev.c. References fd, MSGT_AEXIT, trcDestroyData(), trcEnterState(), trcLeaveState(), and VT_OH_EXIT. Here is the call graph for this function: ![]() |
|
|
get the current task we are working on
Definition at line 1441 of file FilterDev.c. References cacheGetCurrentTask(), fd, trcEnterState(), trcLeaveState(), and VT_OH_GETTASK. Here is the call graph for this function: ![]() |
|
||||||||||||||||
|
Definition at line 1513 of file FilterDev.c. References cacheGetData(). Here is the call graph for this function: ![]() |
|
|
get filter id
Definition at line 1276 of file FilterDev.c. References fd, and __FilterData__::id. |
|
|
get filter name
Definition at line 1271 of file FilterDev.c. References fd, and __FilterData__::name. |
|
|
Definition at line 1555 of file FilterDev.c. References cacheGetFinishedTasks(). Here is the call graph for this function: ![]() |
|
|
get the handlers of ports
Definition at line 58 of file FilterDev.c. References fd, __FilterData__::inputPorts, InputPort::name, __FilterData__::numInputPorts, trcEnterState(), trcLeaveState(), and VT_OH_GETIPORT. Here is the call graph for this function: ![]() |
|
|
function to get all input ports names of one filter
Definition at line 1286 of file FilterDev.c. References fd, __FilterData__::inputPorts, InputPort::name, and __FilterData__::numInputPorts. |
|
|
the number of brothers I have in this machine + 1 brothers are the same filters as me, so say, you have a filter A running here, and 2 Bs runnning here, if A calls this hell get 1, and B will get 2.
Definition at line 1426 of file FilterDev.c. References fd, getFDLocalInstances(), trcEnterState(), trcLeaveState(), and VT_OH_GETLI. Here is the call graph for this function: ![]() |
|
|
get the ammount of memory this machine has for this execution
Definition at line 1418 of file FilterDev.c. References fd, getFDMachineMem(), trcEnterState(), trcLeaveState(), and VT_OH_GETMEM. Here is the call graph for this function: ![]() |
|
|
Function that returns which brother am I.
Definition at line 1332 of file FilterDev.c. References fd, __FilterData__::myRank, trcEnterState(), trcLeaveState(), and VT_OH_GETMYRANK. Here is the call graph for this function: ![]() |
|
|
function to get the number of one filter input ports
Definition at line 1281 of file FilterDev.c. References fd, and __FilterData__::numInputPorts. |
|
|
function to get the number of one filter output ports
Definition at line 1300 of file FilterDev.c. References fd, and __FilterData__::numOutputPorts. |
|
|
Definition at line 144 of file FilterDev.c. References fd, OutputPort::numDestinations, __FilterData__::numOutputPorts, __FilterData__::outputPorts, trcEnterState(), trcLeaveState(), and VT_OH_GETNR. Here is the call graph for this function: ![]() |
|
|
get the number of writers to me that are still running
Definition at line 1319 of file FilterDev.c. References fd, __FilterData__::inputPorts, InputPort::numEowRecv, __FilterData__::numInputPorts, and InputPort::numSources. |
|
|
these functions return the number of instances on the other side of the strem say, we have a filter A->B. B knows it will receive one message only from each A. He can use this number to leave a loop like while (numMessages != dsGetNumWriters(iph)) Definition at line 124 of file FilterDev.c. References fd, __FilterData__::inputPorts, __FilterData__::numInputPorts, InputPort::numSources, trcEnterState(), trcLeaveState(), and VT_OH_GETNW. Here is the call graph for this function: ![]() |
|
|
Definition at line 83 of file FilterDev.c. References fd, OutputPort::name, __FilterData__::numOutputPorts, __FilterData__::outputPorts, trcEnterState(), trcLeaveState(), and VT_OH_GETOPORT. Here is the call graph for this function: ![]() |
|
|
function to get all output ports names of one filter
Definition at line 1305 of file FilterDev.c. References fd, OutputPort::name, __FilterData__::numOutputPorts, and __FilterData__::outputPorts. |
|
||||||||||||
|
Definition at line 1550 of file FilterDev.c. References cacheGetRunningTasks(). Here is the call graph for this function: ![]() |
|
||||||||||||
|
Definition at line 1463 of file FilterDev.c. References cacheGetTaskDeps(). Here is the call graph for this function: ![]() |
|
|
returns the total intances of this filter(me + mybrothers)
Definition at line 1344 of file FilterDev.c. References fd, __FilterData__::numInstances, trcEnterState(), trcLeaveState(), and VT_OH_GETTI. Here is the call graph for this function: ![]() |
|
|
function to start packing data, size in bytes
Definition at line 458 of file FilterDev.c. References BufSt::buffer, dsPackData(), fd, BufSt::maxSize, MINSIZE, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_INITPACK. Here is the call graph for this function: ![]() |
|
|
function to receive a buffer, and unpack later
Definition at line 1061 of file FilterDev.c. References EOW, fd, trcEnterState(), trcLeaveState(), and VT_OH_IRECEIVE. Here is the call graph for this function: ![]() |
|
|
Definition at line 1537 of file FilterDev.c. References fd, and instEnterState(). Here is the call graph for this function: ![]() |
|
|
Definition at line 1544 of file FilterDev.c. References fd, and instLeaveState(). Here is the call graph for this function: ![]() |
|
||||||||||||
|
Definition at line 1525 of file FilterDev.c. References fd, and instSetUserStates(). Here is the call graph for this function: ![]() |
|
|
Definition at line 1531 of file FilterDev.c. References fd, and instSwitchState(). Here is the call graph for this function: ![]() |
|
||||||||||||||||
|
all filter to filter messages should have this Definition at line 163 of file FilterDev.c. References cacheGetCurrentTask(), fd, instEnterState(), MSGT_F2F, OutputPort::numDestinations, OutputPort::tag, OutputPort::tidsDestinations, and TIMER_WRITE_BLOCKED. Referenced by dsWriteBuffer(). Here is the call graph for this function: ![]() |
|
||||||||||||
|
function to pack data to the buffer
Definition at line 507 of file FilterDev.c. References BufSt::buffer, fd, BufSt::maxSize, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_PACK. Referenced by dsInitPack(). Here is the call graph for this function: ![]() |
|
|
funtion to probe an input port for data, returns the size of the buffer, 0 otherwise
Definition at line 1382 of file FilterDev.c. References fd, __FilterData__::inputPorts, InputPort::tag, trcEnterState(), trcLeaveState(), and VT_OH_PROBE. Here is the call graph for this function: ![]() |
|
||||||||||||||||
|
Data functions.
Definition at line 1509 of file FilterDev.c. References cachePutData(). Here is the call graph for this function: ![]() |
|
||||||||||||||||
|
||||||||||||||||
|
returns 0 if there is no data to be received used to notify filter a new task used to notify filter the end of a task used to notify manager this filter ended its work all filter to filter messages should have this Definition at line 909 of file FilterDev.c. References cacheCreateTask(), cacheGetForwardTaskMsgs(), dsEndTask(), dsSetCurrentTask(), EOW, ERROR, fd, __FilterData__::inputPorts, instEnterState(), instLeaveState(), MSGT_CREATETASK, MSGT_ENDTASK, MSGT_EOW, MSGT_F2F, MSGT_INITTERM, InputPort::numEowRecv, __FilterData__::numInportsAdded, __FilterData__::numInputPorts, InputPort::numSources, recvNonBlockingData(), InputPort::tag, taskSend(), terminationDetectionRound(), TIMER_READ, trcEnterState(), trcLeaveState(), updateTermStreamToEmpty(), VT_COMM_READ, and VT_OH_RBUFFER. Here is the call graph for this function: ![]() |
|
|
Definition at line 1559 of file FilterDev.c. References cacheRegisterRecoverCallback(). Here is the call graph for this function: ![]() |
|
|
Definition at line 1517 of file FilterDev.c. References cacheRemoveData(). Here is the call graph for this function: ![]() |
|
||||||||||||||||||||
|
all filter to filter messages should have this Definition at line 189 of file FilterDev.c. References cacheGetCurrentTask(), fd, instEnterState(), instLeaveState(), MSGT_F2F, and TIMER_WRITE_BLOCKED. Here is the call graph for this function: ![]() |
|
|
Definition at line 1452 of file FilterDev.c. References cacheSetCurrentTask(), fd, trcEnterState(), trcLeaveState(), and VT_OH_SETTASK. Referenced by dsReadBuffer(), and dsReadNonBlockingBuffer(). Here is the call graph for this function: ![]() |
|
||||||||||||
|
function to unpack data
Definition at line 1246 of file FilterDev.c. References BufSt::buffer, fd, BufSt::maxSize, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_UNPACK. Here is the call graph for this function: ![]() |
|
|
Task functions.
Definition at line 1437 of file FilterDev.c. References cacheSetUseTasks(). Here is the call graph for this function: ![]() |
|
||||||||||||||||
|
function that writes to other filter(pack and send) 2 things can happen here. Either the message destination is a single instance or message has multiple destinations. we treat the multicast first, then the others all filter to filter messages should have this Definition at line 245 of file FilterDev.c. References BROADCAST, dsMCast(), fd, __LabeledStreamData::getLabel, instEnterState(), instLeaveState(), OutputPort::lsData, MAX_LBL_LENGTH, MAXINSTANCES, __LabeledStreamData::mlshash, MULTICAST_LABELED_STREAM, OutputPort::numDestinations, __FilterData__::outputPorts, OutputPort::tidsDestinations, TIMER_WRITE, trcEnterState(), trcLeaveState(), VT_COMM_WRITE, VT_OH_WBUFFER, and OutputPort::writePolicy. Referenced by dsWritePackedBuffer(). Here is the call graph for this function: ![]() |
|
|
function to send the packed buffer
Definition at line 533 of file FilterDev.c. References BufSt::buffer, dsWriteBuffer(), fd, BufSt::size, trcEnterState(), trcLeaveState(), and VT_OH_WPBUFFER. Here is the call graph for this function: ![]() |
|
||||||||||||
|
used to notify filter the end of a task Definition at line 643 of file FilterDev.c. References fd, MSGT_ENDTASK, OutputPort::numDestinations, __FilterData__::numOutputPorts, __FilterData__::outputPorts, and OutputPort::tidsDestinations. Referenced by dsEndTask(), and dsReadBuffer(). |
|
|
Definition at line 896 of file FilterDev.c. References InputPort::tag. Referenced by dsReadNonBlockingBuffer(). |
|
||||||||||||||||||||||||||||
|
used to notify filter a new task Definition at line 591 of file FilterDev.c. References fd, MSGT_CREATETASK, OutputPort::numDestinations, __FilterData__::numOutputPorts, __FilterData__::outputPorts, and OutputPort::tidsDestinations. Referenced by dsCreateTask(), dsReadBuffer(), and dsReadNonBlockingBuffer(). |
1.4.6