EVPath
The EVpath Library

Introduction

EVpath is designed to be an event transport middleware layer. Specifically, it is designed to allow for the easy implementation of overlay networks, with active data processing, routing and management at all points in the overlay.

EVpath specifically does not encompass global overlay creation, management or destruction functions. Rather it focusses on providing efficient environment for routine transport, while providing interfaces necessary for external management layers.

The package is built around the concept of "stones" (like stepping stones) which are linked to build end-to-end "paths". While the "path" is not an explicitly supported construct in EVpath, the goal is to provide all the local, stone-level, support necessary to accomplish their setup, monitoring, management, modification and tear-down.

Stones in EVpath are lightweight entities that roughly correspond to processing points in dataflow diagrams. Stones of different types perform data filtering, data transformation, mux and demux of data as well as transmission of data between processes over network links.

Stone Types

Connected collections of stones (or paths) specify a particular flow path of data, typically a directed acyclic graph, across a complete distributed system. Before detailing the various types of stones, we'll examine some sample applications that drove the development of EVPath. For example, one might use stones to build an overlay network to distribute data from the data source to the sinks, as is shown below:

overlay.jpg

We can also imagine that each of those sinks might actually want a slightly customized version of the event stream (with sink i customizing it's stream with function i) and that an efficient implementation of event delivery would place those filter functions as close to the source as possible to avoid transmitting data that is only to be discarded later.

func1.jpg

A yet more efficient implementation, armed with knowledge of the nature of functions 1 through 4 might use that knowledge to introduce combined filter functions, for example using filter FC to eliminate data that would be rejected by both filters F3 anf F4 before it is sent.

Examples

Single Process Example

The simplest EVpath program, which involves submitting and receiving an event inside a single program (address space), is shown below.

Include evpath.h and define the data structure we're using:
#include <stdio.h>
#include <string.h>
/* this file is evpath/examples/triv.c */
#include "evpath.h"
typedef struct _simple_rec {
int integer_field;
} simple_rec, *simple_rec_ptr;
static FMField simple_field_list[] =
{
{"integer_field", "integer", sizeof(int), FMOffset(simple_rec_ptr, integer_field)},
{NULL, NULL, 0, 0}
};
static FMStructDescRec simple_format_list[] =
{
{"simple", simple_field_list, sizeof(simple_rec), NULL},
{NULL, NULL}
};

Here, FMField and FMStructDescRec declarations describe to EVPath the data structure (simple_rec) that is to be transmitted. EVPath deals with structured data and that it provides the the safe marshalling of complex structures, even between architectures with different machine representations of data. These will be described in more detail in Section Types and FFS. For the moment it is merely necessary to be aware that all data in EVPath is typed and types play a role in determining actions.

Next, define a function which will be called to consume data from a stone as a passive event receiver. Note that the event is delivered as a void* and cast into the appropriate data type.

static int
simple_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
{
simple_rec_ptr event = vevent;
printf("I got %d\n", event->integer_field);
return 1;
}

In the main program, we declare some values we'll need later and create a CManager to handle the network (even though we're not doing network data transmission in this simple example, EVPath is network-focussed and won't work without Connection Manager).

int main(int argc, char **argv)
{
CManager cm;
simple_rec data;
EVstone stone;
EVsource source;
cm = CManager_create();

For the sink (event receiver) side of the application, we create a stone and associate a terminal action with it, specifying the simple_handler() function from above and the simple_format_list which describes the data type that simple_handler expects to receive.

stone = EValloc_stone(cm);
EVassoc_terminal_action(cm, stone, simple_format_list, simple_handler, NULL);

For the source (event sender) side of the application, we create a source handle (EVsource) with simple_format_list describing the type of data that we will submit with this source handle, and the stone to which it will be submitted. (Here we are using the same data specification for both the source and sink. In less tightly-coupled applications, these data types can vary, causing special rules to be invoked. More in Section Types and FFS. It is also possible to create many sources, each of which submit different types of data to the same stone.)

source = EVcreate_submit_handle(cm, stone, simple_format_list);
Finally, we initialize a record and submit it as an event.
data.integer_field = 217;
EVsubmit(source, &data, NULL);
return 0;
}
When run, the program above should print "I got 217".

Simple Multi-Process Example

The trival program in the previous example isn't very useful. The next step is to split the sender and receiver and introduce network data transmission using bridge stones.

The Receiving Side

This example will use the same data definitions and handler that were defined in the example above. However, the main program is somewhat different. In addition to the CManager_create(), we call CMlisten() so that Connection Manager will listen for incoming connections. (Strictly speaking, this is not necessary for process that only make and don't receive connections, but we'll do it for all the examples from here on.)
/* this file is evpath/examples/net_recv.c */
int main(int argc, char **argv)
{
CManager cm;
EVstone stone;
char *string_list;
cm = CManager_create();
CMlisten(cm);
After setting up CM, we allocate a stone and associate a terminal action as before, however we also have to extract contact information from CM so that it can be provided to the sender. Here, we use CMget_contact_list() to extract that information from CM as an attr_list, then turn the attr_list into a string with attr_list_to_string(). Then the stone ID and the stringified contact information is printed so that it can be supplied to the sender.
stone = EValloc_stone(cm);
EVassoc_terminal_action(cm, stone, simple_format_list, simple_handler, NULL);
string_list = attr_list_to_string(CMget_contact_list(cm));
printf("Contact list \"%d:%s\"\n", stone, string_list);
Finally, we call CMrun_network(). This causes Connection Manager to use the main program's thread of control to service incoming messages.
CMrun_network(cm);
return 0;
}
When this program is run it will print out something like:

Contact list "0:AQIAAENJUEGCzwVFQ0lQUAAA0mY="

and then sit quietly waiting for events. When an event is received it will print the value of the "integer_field" in the event data.

The Sending Side

The sending side of our multi-process example uses the same data definitions from the trivial example above (but without the simple_handler() routine, which is not necessary). The main program begins similarly, but we use the first command line arguments to specify the contact information for the receiver, extracted to the variables remote_stone and contact_list.
/* this file is evpath/examples/net_send.c */
int main(int argc, char **argv)
{
CManager cm;
simple_rec data;
EVstone stone;
EVsource source;
char string_list[2048];
attr_list contact_list;
EVstone remote_stone;
if (sscanf(argv[1], "%d:%s", &remote_stone, &string_list[0]) != 2) {
printf("Bad arguments \"%s\"\n", argv[1]);
exit(0);
}
cm = CManager_create();
CMlisten(cm);
The next step is to create a stone and associate an bridge action with it. The bridge action requires a CM-style contact list in attribute list form, so we must unstringify the string_list we extracted from the argument.
stone = EValloc_stone(cm);
contact_list = attr_list_from_string(string_list);
EVassoc_bridge_action(cm, stone, contact_list, remote_stone);
Finally, we create an event submission handle, initialize a record to send and submit it.
source = EVcreate_submit_handle(cm, stone, simple_format_list);
data.integer_field = 318;
EVsubmit(source, &data, NULL);
return 0;
}
(For these trivial examples, we are using command-line arguments to communicate such things as contact lists and stone IDs between processes. In a real environment, these would probably be commicated via (perhaps CM-based) message passing, RPCs, carrier pigeons or some other mechanism allowing dynamic stone creation, action registration, etc. All that is done during initialization time while parsing arguments in these example programs can also be done dynamically during the course of a long execution.)

To run this program pair, first run net_recv in one window. After it prints out the contact information, run net_send in another window, specifying that contact information as the only argument to net_send. When net_send runs, net_recv should print out "I got 318". The logical architecture of these two programs is shown below:

dot_inline_dotgraph_1.gif

Sending to multiple receivers

Net_send and net_recv above constitute a basic mechanism for sending data across the net, but what if we want to expand it to something more like a more capable publish-subscribe communication system? The first obvious limitation is net_send only supports sending to one receiver. Our next example is a modification of the sending side to support replication of events to multiple receivers. Again, we use the same data definitions as before, but we modify the program body to introduce a split action:
/* this file is evpath/examples/multi_send.c */
int main(int argc, char **argv)
{
CManager cm;
simple_rec data;
EVstone split_stone;
EVaction split_action;
EVassoc_split_action() call above adds a split action to the specified stone with a NULL target list. Split actions essentially replicate the data stream that arrives on a stone and submit it to multiple "target stones". The number and identity of the target stones can be changed on the fly. The third argument to EVassoc_split_action() specifies a zero-terminated initial target list. In this case, we'll add targets to the split stone as we parse the input arguments, so the initial target list is NULL. For this program, we'll specify the multiple remote receivers by listing them separately on the program's command line, so we next parse the command line, create bridge stones for each receiver and add each bridge stone as a target for the split stone.
EVsource source;
int i;
cm = CManager_create();
CMlisten(cm);
split_stone = EValloc_stone(cm);
split_action = EVassoc_split_action(cm, split_stone, NULL);
for (i = 1; i < argc; i++) {
char string_list[2048];
attr_list contact_list;
EVstone remote_stone, output_stone;
if (sscanf(argv[i], "%d:%s", &remote_stone, &string_list[0]) != 2) {
printf("Bad argument \"%s\"\n", argv[i]);
exit(0);
}
output_stone = EValloc_stone(cm);
contact_list = attr_list_from_string(string_list);
EVassoc_bridge_action(cm, output_stone, contact_list, remote_stone);
EVaction_add_split_target(cm, split_stone, split_action, output_stone);
}
The call to EVaction_add_split_target() adds the newly-created bridge stone to the list of stones to which the split stone will replicate data. Note that the EVaction_add_split_target() call requires specifying both the stone to which the split action was registered as well as the EVaction value that was returned. The last bit of the program is largely unchanged, we create a submit handle, initialize a record and submit it to EVpath.
source = EVcreate_submit_handle(cm, split_stone, simple_format_list);
data.integer_field = 318;
EVsubmit(source, &data, NULL);
return 0;
}
If you run N versions of net_recv, then run multi_send with all N contact strings on the command line, each of the net_recv programs should print out "I got 318". The logical architecture of these programs is shown below (with 3 copies of net_recv):

dot_inline_dotgraph_2.gif

Some further notes. The text above uses the term "split stone" as a shorthand for "a stone with a split action associated with it". "bridge stone" is used similarly. Actions which are not associated with a specific data format are installed as the default action on stones and are performed when there is no other action that more specifically matches the data format of the triggering event. There is only one default action per stone, so registering a split action on top of an bridge action will overwrite the former (and likely leak some memory). Also, it's technically possible to register, for example, a terminal action and an bridge action on the same stone. In this case, the terminal action would be applied to incoming data of compatible formats, but the bridge action would be applied to other data (more about data compatability in Section Types and FFS). At this point, I'm not sure if there are circumstances where this sort of flexibility is more useful than confusing. But the implementation makes it easy, so for the moment the EVPath API will remain capable of these things. Don't shoot yourself in the foot.

Specializing the event stream

The receiving side

With bridge stones and split stones we can build the sorts of overlay networks described at the beginning of this description. So we'll focus next on how clients can specialize the event stream that they receive. In order to keep the example realistic, we'll assume that only the receiver starts out with the knowledge of how he wants to specialize his data stream. To this end, we'll modify the receiver in a simple way to encode it's desired specialization in the contact information that it prints out. This is done by using create_filter_action_spec() to create a char* value that encodes both the data type that the filter function expects and the body of the function, in this case a simple statement that returns the modulus of the field integer_field in the input. If the function returns false (0), the event is to be discarded. For non-zero returns the event will be passed unchanged through the filter. The resulting filter specification string is added to the printed contact list after being base64 encoded. (The filter spec string contains shell newlines and special characters such as quotes, so it is not easily specified on the command line. By using a base64 encoding (as attributes are encoded), we map the string to characters that are shell-safe. The function used here for encoding is exported from the ATtribute List package and is defined in atl.h.)
/* this file is evpath/examples/derived_recv.c */
int main(int argc, char **argv)
{
CManager cm;
EVstone stone;
char *string_list, *filter_spec, *encoded_filter_spec;
cm = CManager_create();
CMlisten(cm);
stone = EValloc_stone(cm);
EVassoc_terminal_action(cm, stone, simple_format_list, simple_handler, NULL);
string_list = attr_list_to_string(CMget_contact_list(cm));
filter_spec = create_filter_action_spec(simple_format_list,
"{ return input.integer_field % 2;}");
encoded_filter_spec = atl_base64_encode(filter_spec, strlen(filter_spec) + 1);
printf("Contact list \"%d:%s:%s\"\n", stone, string_list, encoded_filter_spec);
free(filter_spec);
free(encoded_filter_spec);
CMsleep(cm, 600);
return 0;
}
Just to make life somewhat easier, we've also changed the CMrun_network() at the last line of the receiver to CMsleep(). This causes the receiver to service the network for 600 seconds and then exit, rather than hanging around forever.

The sending side

So, the actual architecture of the receiver is unchanged, we just have it export information about its filtering needs. The actual specialization is to occur on the sending side. We'll start with multi_send and change it so that it supports both the original net_recv clients as well as our new derived_recv clients. The program preamble is the same as multi_send, so we'll skip forward to the body of the for loop. In this modified code, we parse out the base64-encoded filter specification by looking for the ':' separator if present (base64 encoding uses the upper- and lower-case alphabet, the decimal digits, '+' and '/', so ':' will never be represented.).

/* this file is evpath/examples/derived_send.c */
int main(int argc, char **argv)
{
CManager cm;
simple_rec data;
EVstone split_stone;
EVaction split_action;
EVsource source;
int i;
cm = CManager_create();
CMlisten(cm);
split_stone = EValloc_stone(cm);
split_action = EVassoc_split_action(cm, split_stone, NULL);
/* this file is evpath/examples/derived_send.c */
for (i = 1; i < argc; i++) {
char string_list[2048];
attr_list contact_list;
char *filter_spec;
EVstone remote_stone, output_stone;
if (sscanf(argv[i], "%d:%s", &remote_stone, &string_list[0]) != 2) {
printf("Bad argument \"%s\"\n", argv[i]);
exit(0);
}
filter_spec = strchr(string_list, ':');
if (filter_spec != NULL) { /* if there is a filter spec */
*filter_spec = 0; /* terminate the contact list */
filter_spec++; /* advance pointer to string start */
atl_base64_decode((unsigned char *)filter_spec, NULL); /* decode in place */
printf("String list is %s\n", string_list);
}
At the end of this section, the variable filter_spec is set to the the decoded filter specification, or NULL if no filter specification was included (I.E. if the argument was from a net_recv client.)

Both cases require the creation of an appropriately-targetted bridge stone:

/* regardless of filtering or not, we'll need an output stone */
output_stone = EValloc_stone(cm);
contact_list = attr_list_from_string(string_list);
printf("This is the contact list --------");
dump_attr_list(contact_list);
EVassoc_bridge_action(cm, output_stone, contact_list, remote_stone);
If we're dealing with an original non-filtered client, all we have to do is to add its bridge stone as a target of the split stone:
if (filter_spec == NULL) {
EVaction_add_split_target(cm, split_stone, split_action, output_stone);
} else {
For the filtered case it's a little more complicated. In order to accomplish filtering, we create a new stone and use EVassoc_intermediate_action() to attach the filtering action to it. EVaction_set_output() is used to set the output of the filter stone to be the bridge stone. Then the new filter stone is added as a target of the split stone:
EVstone filter_stone = EValloc_stone(cm);
EVaction filter_action = EVassoc_immediate_action(cm, filter_stone, filter_spec, NULL);
EVaction_set_output(cm, filter_stone, filter_action, 0, output_stone);
EVaction_add_split_target(cm, split_stone, split_action, filter_stone);
}

The EVassoc_intermediate_action() can be used to install handlers which take only a single event as input and can therefore run and "consume" their data immediately. In particular, they are distinct from actions which may leave their input data enqueued for some time (typically handlers which might require more than one event to act). The current EVPath implementation supports only immediate actions with one input and one output, but multiple output actions will be implemented soon.

}
source = EVcreate_submit_handle(cm, split_stone, simple_format_list);
data.integer_field = 318;
data.str = "kraut";
for (i=0; i < 10; i++) {
EVsubmit(source, &data, NULL);
data.integer_field++;
}
return 0;
}

Finally, we modify the submit action of the program to submit multiple events (so that some will be passed by the filters and others discarded). If you run a mix of net_recv and derived_recv, then run multi_send with all the contact strings on the command line, each of the net_recv programs should receive all 10 events submitted by derived_send, but the derived_recv programs will receive only the odd events. The logical architecture of these programs is shown below :

dot_inline_dotgraph_3.gif

Further details

EVPath action specifications of the sort created by create_filter_action_spec() are designed to encompass exactly the details of the system's data processing that are not captured by diagrams like the one above, to wit, the details about data types and the nature of the processing that occurs. The idea being that the more structural aspects of the system are best managed separately. This also allows us to extend the functionality of EVpath without disrupting the structural aspects of the API.

So far, we have only introduced the create_filter_action_spec() call as a mechanism for creating action specifications. The semantics associated with the filter action are relatively simple. The data format list specified in the create_filter_action_spec specifies the data type for a parameter named "input" through which the incoming event is made available to the filter function. There is a fair amount of "magic" surrounding the EVpath type handling in any circumstance and to some extent, this magic is customized for the class of processing that is occurring. In the handling of actions created by create_filter_action_spec(), EVpath performs very flexible type matching and applies the filter function to any incoming event which contains a superset of the data fields that are required by the filter. In particular, EVpath is careful that all matching events that satisfy the predicate are passed through unchanged. This is a fairly obvious and natural semantic for a pure filter and is possible because of the type flexibility of the dynamic code generation package that underlies EVpath.

Transforming streaming data

EVpath actions are also capable of processing that modifies or transforms the data, potentially modifying its data type in the process. This is slightly more complex than the simple filter because we have to specify an output data type, so in comparison to create_filter_action_spec(), create_transform_action_spec() has an CMFormatList parameter called out_format_list. Semantically, the out_format_list specifies the data type of an "output" parameter to the specified function. The output record is allocated, but set to zero when provided to the function. It should be initialized properly before the function exits. The following example shows the stream transformation to add an "average" data value to the event. We start with derived_recv and add a declaration of the transform's output data, adding the new "average" field, but also keeping the basic data field for good measure :

/* this file is evpath/examples/transform_recv.c */
typedef struct _output_rec {
int integer_field;
double average;
char *str;
} output_rec, *output_rec_ptr;
static FMField output_field_list[] =
{
{"integer_field", "integer", sizeof(int), FMOffset(output_rec_ptr, integer_field)},
{"average", "double", sizeof(double), FMOffset(output_rec_ptr, average)},
{"str", "string", sizeof(char*), FMOffset(output_rec_ptr, str)},
{NULL, NULL, 0, 0}
};
static FMStructDescRec output_format_list[] =
{
{"simple2", output_field_list, sizeof(output_rec), NULL},
{NULL, NULL}
};

We also define a new handler function that prints out the new field as well:

static int
output_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
{
output_rec_ptr event = vevent;
printf("I got %d, average is now %g\n", event->integer_field, event->average);
printf("Base event is %p event, string is %p\n", event, event->str);
printf("real string is %s\n", event->str);
return 1;
}
Finally, main() is modified a bit to associate output_handler() as the new terminal with output_format_list defining its input type, and to use create_transform_action_spec() instead of create_filter_action_spec().
int main(int argc, char **argv)
{
CManager cm;
EVstone stone;
char *string_list, *trans_spec, *encoded_trans_spec;
char *trans_func = "\
{\
static double sum = 0.0;\
static int count = 0;\
sum = sum + input.integer_field;\
count++;\
output.integer_field = input.integer_field;\
output.average = sum / count;\
output.str = input.str;\
return (count % 5) == 0; /* pass filter every fifth*/ \
}";
cm = CManager_create();
CMlisten(cm);
stone = EValloc_stone(cm);
EVassoc_terminal_action(cm, stone, output_format_list, output_handler, NULL);
string_list = attr_list_to_string(CMget_contact_list(cm));
trans_spec = create_transform_action_spec(simple_format_list, output_format_list,
trans_func);
printf("trns spec is %s\n", trans_spec);
encoded_trans_spec = atl_base64_encode(trans_spec, strlen(trans_spec) + 1);
printf("Contact list \"%d:%s:%s\"\n", stone, string_list, encoded_trans_spec);
free(trans_spec);
free(encoded_trans_spec);
CMsleep(cm, 600);
return 0;
}
The data transformation function, stored here in trans_func before being passed to create_transform_action_spec(), is more complex than the previous simple filter function. It necessarily fills in all the fields in the output data record, in this case using ECL's static data feature to track event count and data sum between invocations. It also passes only every fifth event.

Because both filter and transform actions are immediate, and because the architecture in both examples is identical, we can use derived_send to install the transform action. When we run transform_recv with derived_send, the output should be:

I got 322, average is now 320
I got 327, average is now 322.5

Multiple actions on a stone

To this point, only a single EVpath action has been associated with any stone. However, because actions are type-specialized and multiple data types can flow along a single path, a stone can hold multiple actions. In order to demonstrate this, we'll modify the transform_recv2 program to create transform_recv2. Transform_recv2 will receive the same data as before, but we'll introduce a second data type that we'll use in a second transformation action:.

/* this file is evpath/examples/transform_recv2.c */
typedef struct _second_rec {
double data_field;
char data_type;
} second_rec, *second_rec_ptr;
static FMField second_field_list[] =
{
{"data_field", "float", sizeof(double), FMOffset(second_rec_ptr, data_field)},
{"data_type", "char", sizeof(char), FMOffset(second_rec_ptr, data_type)},
{NULL, NULL, 0, 0}
};
static FMStructDescRec second_format_list[] =
{
{"second", second_field_list, sizeof(second_rec), NULL},
{NULL, NULL}
};
The output_rec type and output_handler() are unchanged, but we'll change the main program to encode the second transformation and tack it onto the end of our ever-growing contact list. The second transformation function is similar to the first, except it takes the new datatype as an input parameter and maps the double to an integer to create the original output data type.
int main(int argc, char **argv)
{
CManager cm;
EVstone stone;
char *string_list, *trans_spec, *encoded_trans_spec, *trans_spec2, *encoded_trans_spec2;
char *trans_func = "\
{\
static double sum = 0.0;\
static int count = 0;\
sum = sum + input.integer_field;\
count++;\
output.integer_field = input.integer_field;\
output.average = sum / count;\
return (count % 5) == 0; /* pass filter every fifth*/ \
}";
char *trans_func2 = "\
{\
static double sum = 0.0;\
static int count = 0;\
sum = sum + input.data_field;\
count++;\
output.integer_field = input.data_field;\
output.average = sum / count;\
return (count % 5) == 0; /* pass filter every fifth*/ \
}";
cm = CManager_create();
CMlisten(cm);
stone = EValloc_stone(cm);
EVassoc_terminal_action(cm, stone, output_format_list, output_handler, NULL);
string_list = attr_list_to_string(CMget_contact_list(cm));
trans_spec = create_transform_action_spec(simple_format_list, output_format_list,
trans_func);
trans_spec2 = create_transform_action_spec(second_format_list, output_format_list,
trans_func2);
encoded_trans_spec = atl_base64_encode(trans_spec, strlen(trans_spec) + 1);
encoded_trans_spec2 = atl_base64_encode(trans_spec2, strlen(trans_spec2) + 1);
printf("Contact list \"%d:%s:%s:%s\"\n", stone, string_list, encoded_trans_spec,
encoded_trans_spec2);
free(trans_spec);
free(trans_spec2);
free(encoded_trans_spec);
free(encoded_trans_spec2);
CMsleep(cm, 600);
return 0;
}
(Now our contact list is growing quite large now and it may exceed the limitations of some shells for individual arguments. If you get an error like "Word too long", try a different shell and remember that in the real world you won't be using the shell as a communication mechanism.)

On the sending side, we have to modify our derived_send program to create derived_send2 which submits two data types and can associate multiple actions with filter stones. We'll use the second_rec data type that we specified above and modify the body of the main program to potentially parse multiple filter specs and use them to associate multiple immediate actions to the created filter stones.

/* this file is evpath/examples/derived_send2.c */
int main(int argc, char **argv)
{
CManager cm;
simple_rec data;
second_rec data2;
EVstone split_stone;
EVaction split_action;
EVsource source, source2;
int i;
cm = CManager_create();
CMlisten(cm);
split_stone = EValloc_stone(cm);
split_action = EVassoc_split_action(cm, split_stone, NULL);
/* this file is evpath/examples/derived_send2.c */
for (i = 1; i < argc; i++) {
char string_list[20480];
attr_list contact_list;
char **filter_specs = NULL, *filter_spec, *next;
EVstone remote_stone, output_stone;
if (sscanf(argv[i], "%d:%s", &remote_stone, &string_list[0]) != 2) {
printf("Bad argument \"%s\"\n", argv[i]);
exit(0);
}
filter_spec = strchr(string_list, ':');
if (filter_spec != NULL) { /* if there is a filter spec */
int filter_count = 0;
*filter_spec = 0; /* terminate the contact list */
filter_spec++; /* advance pointer to string start */
filter_specs = malloc(sizeof(filter_specs[0]) * 2);
while (filter_spec != NULL) {
next = strchr(filter_spec, ':');
if (next != NULL) {
*next = 0;
next++;
}
atl_base64_decode((unsigned char *)filter_spec, NULL); /* decode in place */
filter_specs = realloc(filter_specs, sizeof(filter_specs[0]) * (filter_count + 2));
filter_specs[filter_count++] = filter_spec;
filter_spec = next;
}
filter_specs[filter_count] = NULL;
}
/* regardless of filtering or not, we'll need an output stone */
output_stone = EValloc_stone(cm);
contact_list = attr_list_from_string(string_list);
EVassoc_bridge_action(cm, output_stone, contact_list, remote_stone);
if (filter_specs == NULL) {
EVaction_add_split_target(cm, split_stone, split_action, output_stone);
} else {
int i = 0;
EVstone filter_stone = EValloc_stone(cm);
while (filter_specs[i] != NULL) {
EVaction filter_action = EVassoc_immediate_action(cm, filter_stone, filter_specs[i], NULL);
EVaction_set_output(cm, filter_stone, filter_action, 0, output_stone);
i++;
}
EVaction_add_split_target(cm, split_stone, split_action, filter_stone);
}
}

Once the output and filter stones are established, all that remains is to submit events. In order to submit two different data types to our path, we need two EVsource handles, one associated with each data type:

source = EVcreate_submit_handle(cm, split_stone, simple_format_list);
source2 = EVcreate_submit_handle(cm, split_stone, second_format_list);
Then we can use the submit handles to submit the two data types alternately:
data.integer_field = 318;
data2.data_field = 18.8;
data2.data_type = 'A';
for (i=0; i < 20; i++) {
if ((i % 2) == 0) {
EVsubmit(source, &data, NULL);
data.integer_field++;
} else {
EVsubmit(source2, &data2, NULL);
data2.data_field += 1.0;
data2.data_type++;
}
}

When transform_recv2 and derived_send2 are run, the following output should appear:

I got 322, average is now 320
I got 22, average is now 20.8
I got 327, average is now 322.5
I got 27, average is now 23.3

In the output above, notice that the average values don't really look "right". Rather than consistent values near $ 170 $ (I.E. around the average of our starting values, $ 318 $ and $ 18.8 $), we have two lines that are unchanged from the output of the previous example and two lines that seem to represent the average of the floating point values. This is because the "static" declarations in the two filter functions are disjoint. That is, the "static double sum" in trans_func does not reference the same data value that "static double sum" in trans_func2 references. This may or may not be intuitive to you. What is probably not intuitive is that even if there were just one filter function involved, it's possible for there to be multiple, disjoint values for "sum" and "count". This can occur because a single action can match more than one incoming data type. For example, the first function is designed to match incoming data that looks like:

typedef struct _simple_rec {
    int integer_field;
} simple_rec, *simple_rec_ptr;

but it will also match any record that has a top-level field named "integer field". Such as

typedef struct _simple_rec {
    double new_stuff;   /* extra field */
    int integer_field;
} simple_rec, *simple_rec_ptr;

Or even

typedef struct _simple_rec {
    double integer_field;       /* different type for integer_field */
    int  more_stuff[5];         /* more fields */
} simple_rec, *simple_rec_ptr;

The details of type matching will be described more fully in Section Types and FFS, but for our current purposes the important point is that EVpath generates a new instance of the filter function for each distinct incoming type. This helps to minimize recurring overhead (eliminating the need to map each new type into some single common representation allows us to minimize data transformation), but at the cost of making "static" variables a somewhat trickier tool to use.

Communicating between actions

EVpath does afford a more reliable mechanism for communicating data between different incarnations of an action using attribute lists. In particular, evpath defines a variable, stone_attrs, that is accessible from within each action and which represents a set of attributes (name/value pairs) that is associated with the stone. All actions on a particular stone share the same attribute list. This attribute list can be operated on with a set of standard ECL functions to set and retrieve values. We can modify the transformation functions in transform_recv2.c to utilize this attribute list to get results that represent the real average of data flowing through the stone:

/* this file is evpath/examples/transform_recv3.c */
char *trans_func = "\
{\
double sum = 0.0;\
int count = 0;\
if (attr_set(stone_attrs, \"sum\")) {\n\
sum = attr_dvalue(stone_attrs, \"sum\");\n\
count = attr_ivalue(stone_attrs, \"count\");\n\
}\n\
sum = sum + input.integer_field;\
count++;\
output.integer_field = input.integer_field;\
output.average = sum / count;\
set_double_attr(stone_attrs, \"sum\", sum);\n\
set_int_attr(stone_attrs, \"count\", count);\n\
return (count % 5) == 0; /* pass filter every fifth*/ \
}";
char *trans_func2 = "\
{\
double sum = 0.0;\
int count = 0;\
if (attr_set(stone_attrs, \"sum\")) {\n\
sum = attr_dvalue(stone_attrs, \"sum\");\n\
count = attr_ivalue(stone_attrs, \"count\");\n\
}\n\
sum = sum + input.data_field;\
count++;\
output.integer_field = input.data_field;\
output.average = sum / count;\
set_double_attr(stone_attrs, \"sum\", sum);\n\
set_int_attr(stone_attrs, \"count\", count);\n\
return (count % 5) == 0; /* pass filter every fifth*/ \
}";
When these modified transformation functions are used with derived_send2 the output is:

I got 320, average is now 199.12
I got 22, average is now 170.4
I got 325, average is now 181.64
I got 27, average is now 172.9

Router stones

We have previously created 'split' actions to replicate events on multiple output paths, but sometimes what ones wants to do is to select a single output path down which an event should propogate. EVPath allows this to be done with a 'router' stone, and with the previously used EVaction_set_output() routine. EVPath stones can have multiple numbered output 'ports'. In split stones, the output ports are identical and largely anonymous. In filter and transform stones, we only use output '0' as they can only support a single output. However in router stones we can have many outputs, each linked with a specific target stone via EVaction_set_output(). In particular, the code associated with a router action is somewhat like that of a filter stone, except that where the filter would return 0 or 1 indicating that the event should be dropped or passed forward, the code in the router function should return an positive integer indicating which of the output ports to which the even should be forwarded. That is, if one has associated target stones with three output ports, something like:

EVaction_set_output(cm, router_stone, router_action, 0, first_output_stone);
EVaction_set_output(cm, router_stone, router_action, 1, middle_output_stone);
EVaction_set_output(cm, router_stone, router_action, 2, last_output_stone);

then the router function should return a value between 0 and 2 inclusive, and the event will be forwarded to the corresponding output stone. (No sample programs included here. Hopefully you've gotten to the point where you can write your own.)

Multi-stones

Multi-stones (sometimes called multi-queue or multi-type stones), are the most complex stones in EVPath. All of the previously-described stone actions are 'immediate' stones, in which the action of the stone is applied to a single event and generally evpath will try to perform those actions immediately upon submission. In particular, immediate processing of the events, with the result of either further submission or dropping the event, is automatic. Situations which require aggregation of information from multiple events, possibly of different data types, are more directly supported by multi-stones.

Two main challenges in creating an event-aggregation functionality in the context of a system like EVPath are 1) specifying when a multi-event action is to run, and 2) in a situation where multiple different event data types might be in the queue at once making the event data visible in a type-safe way. WRT 1), while some systems take the approach of creating a language in which action trigger preconditions can be specified, EVPath takes a somewhat simpler approach. In EVPath multi-stones, the specified action is run every time a new event is placed in the stones event queue. However, there is no automatic dequeue of events, so the action has an opportunity to examine the contents of the event queue and make a decision as to whether or not its pre-conditions have been met. If they have not, the action simply exits without consuming anything from its event queue. This essentially assigns to the action the responsibility for evaluating its own preconditions, however because the action code is dynamically generated native code, the costs are minimized.

In order to allow evaluation of pre-conditions, the action must have the ability to inspect the stone's event queue. The built-in CoD functions that allow this present multiple views of the event queue, allowing events to be referenced by their absolute locations in the queue (with zero being the oldest, first-entered event), or by relative locations in type-based queues (I.E. the 2nd event in the queue that matches type 'A'). The full set of functions provided are described in Section Multi-stone Functions, but the example below demonstrates a typical action function for a multi-stone. In this case, one with a pre-condition that one event of each of its two data types ("a_rec" and "b_rec") to be present before it can operate. It tests for the presence of each type of data. If both types are not found it exits without further operation. But if they are found, it accesses the data, generates a new record and submits it to output port 0. Note that events must be explicitly discarded or they are not removed from the stone's event queue. It is an error to use the 'data' segment of an event after the event is discarded.

static char *multi_func = "{\n\
int found = 0;\n\
a_rec *a;\n\
b_rec *b;\n\
/* if there at least one "a_rec" event */\n\
if (EVcount_a_rec() > 0) {\n\
/* get a pointer to the a_rec event data */\n\
a = EVdata_a_rec(0);\n\
++found;\n\
}\n\
/* if there at least one "b_rec" event */\n\
if (EVcount_b_rec() > 0) {\n\
/* get a pointer to the b_rec event data */\n\
b = EVdata_b_rec(0);\n\
++found;\n\
}\n\
if (found == 2) {\n\
c_rec c;\n\
/* combine the event data */
c.c_field = a.a_field + b.b_field;\n\
/* discard the used events */
EVdiscard_a_rec(0);\n\
EVdiscard_b_rec(0);\n\
/* submit the new, combined event */\n\
EVsubmit(0, c);\n\
}\n\
}\0\0";

For completeness, we include below the types of the records referenced in the multi-stone action above and the code lines used to create and install the action:

typedef struct _rec_a {
int a_field;
} rec_a, *rec_a_ptr;
typedef struct _rec_b {
int b_field;
} rec_b, *rec_b_ptr;
typedef struct _rec_c {
int c_field;
} rec_c, *rec_c_ptr;
static FMField a_field_list[] =
{
{"a_field", "integer",
sizeof(int), FMOffset(rec_a_ptr, a_field)},
{NULL, NULL, 0, 0}
};
static FMField b_field_list[] =
{
{"b_field", "integer",
sizeof(int), FMOffset(rec_b_ptr, b_field)},
{NULL, NULL, 0, 0}
};
static FMField c_field_list[] =
{
{"c_field", "integer",
sizeof(int), FMOffset(rec_c_ptr, c_field)},
{NULL, NULL, 0, 0}
};
static FMStructDescRec a_format_list[] =
{
{"a_rec", a_field_list, sizeof(rec_a), NULL},
{NULL, NULL, 0, NULL}
};
static FMStructDescRec b_format_list[] =
{
{"b_rec", b_field_list, sizeof(rec_b), NULL},
{NULL, NULL, 0, NULL}
};
static FMStructDescRec c_format_list[] =
{
{"c_rec", c_field_list, sizeof(rec_c), NULL},
{NULL, NULL, 0, NULL}
};
static FMStructDescList queue_list[] = {a_format_list, b_format_list, c_format_list, NULL};
mq = create_multityped_action_spec(queue_list, multi_func);
maction = EVassoc_multi_action(cm, mstone, mq, NULL);

Note that the parameter to create_multityped_action_spec() is a null-terminated list of FMStructDescList. In this case, only "a_rec" and "b_rec" are expected incoming event types. "c_rec" is not expected as an incoming event, but including it in the queue_list declares the "c_rec" type so that it can be used as the output type. The complete list of functions available in multi-stone action routines is available below in Section Multi-stone Functions.

Types and FFS

The prior examples have employed FMField and FMStructDescRec declarations describe to EVPath the data structures that are to be transmitted, as well as describing the structures into which EVPath is to place data for terminal handlers. The software package used for these specifications is FFS. FFS was designed and implemented for marshalling/unmarshalling and data filtering/transformation in high performance applications. A description of the motivating applications for FFS can be found in :
A Type System for High Performance Communication and Computation, Greg Eisenhauer, Matthew Wolf, Hasan Abasi, Scott Klasky, Karsten Schwan. in Proceedings of the Workshop on D3Science, associated with e-Science11, Dec 5, 2011, Stockholm, Sweden PDF Format
Additional resources on FFS, including a manual, can be found on the FFS web page.

CoD

The language used in the filter and transformation functions above is CoD (C-on-Demand). CoD is part of the FFS package and is a dynamically-generated subset of C. In general, CoD code is a text string representing the body of a subroutine. The parameters of the subroutine are defined not by the "user provided" text, but by the generating context. In the case of EVPath, the parameters are generally structures whose fields and data types are created from the specified input and output data types. The FFS manual will eventually be extended to describe the syntax and semantics of FFS, but if you write code that would work in C, generally it will work in CoD. In some cases, CoD has some magic that supports FFS in some way, and in other cases CoD leaves out some non-bits of C, mostly that can be replicated in some other way. For example, CoD currently doesn't support switch, do-while, bit-wise complement (~) or conditional expression (?:). The principal magic bit off CoD relates to the fact that FFS allows dynamically sized arrays with the size given by integer-typed field in the same structure. In order to maintain the relationship between the value of this "control field" and the array whose size it specifies, CoD will automatically reallocate the arrays whenever there is assignment done on the control fields. This allows some form of dynamic memory allocation in CoD without the use of malloc/realloc.

However, all of these are really properties of CoD and not specific to its use in EVPath. The unique aspects of CoD code in EVPath are related to the special functions and variables that EVPath makes visible to CoD code in filter and transformation functions (in addition to the input and output parameters that have been noted previously.)

In particular, within an EVPath CoD function, the following global entries are visible and available for use:

Aside from the last two attr_list entries, all of these are just making the routines from the standard C library visible. (Note that other entries are not, by default, visible or available, but mechanisms exist to add things to the set of available items.) With respect to the attr_list entries, there are attribute lists associated with each event in evpath and associated with each stone. These data values make those attributes available to CoD functions. The event_attrs is actually a parameter, while the stone_attrs is an external variable, but the difference is immaterial inside the function. WRT the stone_attrs list, it may find an example use as a communication mechanism between CoD functions associated with the same stone, or it may simply be used to query other EVPath-supplied stone attributes. event_attrs may be used to communicate between CoD functions on different stones on the event path, or simply carry event attributes from the source to the final destination.

In order to operate on these attribute lists, the following routines are also made available:

Multi-stone Functions

In addition to the CoD functions listed above that are available in all CoD-based EVPath actions, multi-stones have the following extra functions that allow for inspection and operation upon the stone's event queue.

Generally speaking, there are multiple forms each basic action. For example, the function EVcount_full() returns the total count of events in the stone queue, but for each of the datatypes specified for the multi-stone, there are also EVcount_<name>() functions available (where <name> is the string name of each datatype in the queue list). These functions return the count of events in the queue that match that particular data type. EVcount(int queue) takes a parameter which is the integer index (zero-based) of a datatype. So in the example above, EVcount(0) is equivalent to EVcount_a_rec(). There is also an integer constant <name>_ID created for each of the queue types, so EVcount(0), EVcount(a_rec_ID) and EVcount_a_rec() are all equivalent forms. In addition, because some actions may wish to manipulate events whose types are unknown, the queue_list may include a special FMStructDescList with the top-level name "anonymous" and a NULL field list. If this FMStructList entry is present, EVcount_anonymous() will return the number of events in the stone's event queue that do not match any of the other data types in the queue list. (If the "anonymous" entry is not present in the queue list, events with unmatched types will be rejected by the multi-stone.)

In addition, functions which operate on entries in the event queue generally have an index parameter that specifies which queue entry they apply to. Index is zero-based and lower numbers are the oldest (first submitted) events. For routines like EVdiscard_full(int index), index is interpreted as indicating the Nth element in the queue, regardless of type. However, EVdiscard_a_rec(int index) applies to the Nth "a_rec" in the queue, ignoring events which are of other types. (While this essentially this treats the "a_rec" elements as if they are in their own queue, there is only ever one, unified, event queue.) Note that all modifications to the event queue take effect immediately and affect the results of subsequent calls, so that, for example, the code segment

while(EVcount_full()) EVdiscard_full(0);

will discard the oldest extant event in the stone queue until there are none left.

The full list of available functions are:

int EVcount_full();
// return the number of events in the event queue.
int EVcount(int queue);
int EVcount_<em><type></em>();
// return the number of envents in the event queue of type <type> (or corresponding to the queue'th entry in the queue_list)
void EVdiscard_full(int index);
// Discard (permanently remove from the queue) the event at absolute position \em index.
void EVdiscard(int queue, int index);
void EVdiscard_<type>(int index);
// discard the index'th event of type <type> (or corresponding to the queue'th entry in the queue_list)
void EVdiscard_and_submit_full(int target, int index);
// Remove the event at absolute position \em index and submit it to output port \em target
void EVdiscard_and_submit(int target, int queue, int index);
void EVdiscard_and_submit_<type>(int target, int index);
// Remove the index'th event of type <type> (or corresponding to the queue'th entry in the queue list) and submit it to output port \em target
void EVsubmit_full(int target, int index);
// submit the event at absolute position \em index to output port \em
target, without removing it from the queue
void EVsubmit_<type>(int target, int index);
// submit the index'th event of type <type> to output port \em target, without removing it from the queue
void EVsubmit(int target, <any struct-type> data);
// submit as an event, data other than something from the event queue to output port target
void EVsubmit_attr(int target, <any struct-type> data, attr_list l);
// submit as an event with attribute list \em l, data other than something from the event queue to output port \em target
<type> *EVdata_<type>(int index);
void *EVdata(int queue, int index);
// Return the data associated the index'th event of type <type> (or corresponding to the queue'th entry in the queue list)
<type> *EVdata_full_<type>(int index);
void *EVdata_full(int queue, int index);
// Return the data associated the event in absolute position \em index (use with caution for type safety)
int EVconforms(int queue, int index);
// returns true if the event at absolute position \em index has a type which corresponds to the queue'th entry in the queue_list.
attr_list EVget_attrs_%s(int index);
attr_list EVget_attrs(int queue, int index);
// Return the attribute list associated the index'th event of type <type> (or corresponding to the queue'th entry in the queue list)
attr_list EVget_attrs_full(int queue, int index);
// Return the attribute list associated the event in absolute position \em index

If anonymous event availability has been enabled, the type-specific routines above are also available for the "anonymous" meta-type, except that the corresponding EVdata() routines are not present.