EVPath
dfg_master4.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "evpath.h"
#include "ev_dfg.h"
typedef struct _simple_rec {
int integer_field;
} simple_rec, *simple_rec_ptr;
static FMField simple_field_list[] =
{
{"integer_field", "integer", sizeof(int), FMOffset(simple_rec_ptr, integer_field)},
{NULL, NULL, 0, 0}
};
static FMStructDescRec simple_format_list[] =
{
{"simple", simple_field_list, sizeof(simple_rec), NULL},
{NULL, NULL}
};
EVclient test_client;
static int
simple_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
{
simple_rec_ptr event = vevent;
printf("I got %d\n", event->integer_field);
EVclient_shutdown(test_client, event->integer_field == 318);
return 1;
}
void
join_handler_func(EVmaster master, char *identifier, void*cur_unused1, void*cur_unused2)
{
EVdfg_stone src, mid, sink;
static int i = 0;
EVdfg dfg;
char *expected_nodes[] = {"a", "b", "c"};
/* Nth client joining is named for that element in expected_nodes[] */
EVmaster_assign_canonical_name(master, identifier, expected_nodes[i]);
i++;
if (i < sizeof(expected_nodes)/sizeof(expected_nodes[0])) {
/* not everyone is here yet, so return */
return;
}
/* actually create the DFG and assign stones to nodes */
dfg = EVdfg_create(master);
/* EVdfg stone creation and assignment occurs here */
src = EVdfg_create_source_stone(dfg, "event source");
EVdfg_assign_node(src, "a");
mid = EVdfg_create_stone(dfg, NULL);
EVdfg_assign_node(mid, "b");
EVdfg_link_port(src, 0, mid);
sink = EVdfg_create_sink_stone(dfg, "simple_handler");
EVdfg_assign_node(sink, "c");
EVdfg_link_port(mid, 0, sink);
}
/* this file is evpath/examples/dfg_master4.c */
int main(int argc, char **argv)
{
CManager cm;
char *str_contact;
EVsource source_handle;
EVmaster test_master;
EVclient_sinks sink_capabilities;
EVclient_sources source_capabilities;
(void)argc; (void)argv;
cm = CManager_create();
CMlisten(cm);
test_master = EVmaster_create(cm);
str_contact = EVmaster_get_contact_list(test_master);
EVmaster_node_join_handler (test_master, join_handler_func);
/*
** LOCAL DFG SUPPORT Sources and sinks that might or might not be utilized.
*/
source_handle = EVcreate_submit_handle(cm, -1, simple_format_list);
source_capabilities = EVclient_register_source("event source", source_handle);
sink_capabilities = EVclient_register_sink_handler(cm, "simple_handler", simple_format_list,
(EVSimpleHandlerFunc) simple_handler, NULL);
/*
** DFG CREATION
*/
/* We're node "a" in the DFG */
test_client = EVclient_assoc_local(cm, "a", test_master, source_capabilities, sink_capabilities);
printf("Contact list is \"%s\"\n", str_contact);
if (EVclient_ready_wait(test_client) != 1) {
/* dfg initialization failed! */
exit(1);
}
if (EVclient_source_active(source_handle)) {
simple_rec rec;
rec.integer_field = 318;
/* submit would be quietly ignored if source is not active */
EVsubmit(source_handle, &rec, NULL);
}
if (EVclient_active_sink_count(test_client) > 0) {
/* if there are active sinks, the handler will call EVclient_shutdown() */
} else {
if (EVclient_source_active(source_handle)) {
/* we had a source and have already submitted, indicate success */
EVclient_shutdown(test_client, 0 /* success */);
} else {
/* we had neither a source or sink, ready to shutdown, no opinion */
}
}
return(EVclient_wait_for_shutdown(test_client));
}