#include #include #include #include #include #include #include #include #include #include #include #include #include #include char *MyWhereProc (long , void *, long ); void InitObjects(void); /************************************************************************** * This is an example illustrating use of BRTI in a time stepped * distributed simulation. It demonstrates use of TimeAdvanceRequest/Grant * and Update/ReflectAttributeValues as well as publishing and subscribing * to object classes. It is based on the tm_ping program in the * FDK distribution. * Modifications to tm_ping were developed by Jeonghwa Yang and Richard * Fujimoto, 10.03.2002 */ /************************************************************************** * Local state of the federate * TM_Time is the data type for logical time values used by the RTI * MB_BufferPool is a pointer to a pool of memory buffers. The * FDK Buffer Pool library provides a memory allocation service * The prefix MB denotes things defined in the memory buffer library * The prefix RTI denotes things defined in the RTI */ struct FedState { BOOLEAN WaitingOnTAR; /* true if waiting for TAR to complete */ int Count; /* count number of messages */ TM_Time Now; /* current time */ TM_Time LastTS; /* time stamp of last TSO message received */ TM_Time TargetTime; /* print message when passing this time */ TWSeed Seed; /* random number generator seeds */ MB_BufferPool FreePool; /* pool of usused event buffers */ int TagCount; /* number of time advance grants */ /* class for attribute owned by fed */ RTI_ObjClassDesignator MyClass; /* class for subscribed attribute */ RTI_ObjClassDesignator YourClass; /* object instance owned by fed */ RTI_ObjInstanceDesignator MyInstance; } FED; /************************************************************************** * The data structure MyMessage defines the format of messages * sent via the RTI. The first field MUST be the struct MsgS. This is * defined within BRTI. It is a message header used by the RTI * (e.g., it includes time stamp information). * The remainder of the message defines the data portion of the message, * and is defined by the federate (i.e., the application). */ /* Application defined part of message. Three attributes are published */ typedef struct _MobileUnit { short id; TM_Time x; short y; }MU; /* Message including RTI header and application defined part */ typedef struct _MyMessage { struct MsgS msgS; MU mu; } MyMessage; /* Buffer used for constructing outgoing messages */ MyMessage MBuf; /* * Note: TimeAdvanceRequest and NextEventRequest do not allow zero * lookahewad messages; set lookahead to zero to allow arbitrarily * small time stamp increments */ /* Lookahead value */ static double LOOKAHEAD = 0; /* end of simulation time */ static TM_Time ENDTIME = 5.0; /* print message every UPDATE_INTERVAL time units */ static double UPDATE_INTERVAL = 1.0; int main (int argc, char *argv[]) { //int appl_argc = 0; //char **appl_argv = 0; /* init FDK's random number generator seeds */ TWRandInit(&(FED.Seed), 10); RTI_Init(argc, argv); RTI_SetLookAhead (LOOKAHEAD); printf( "--- Simulating upto %f simulation time units.\n", ENDTIME ); fflush( stdout ); /* initialize federate state */ FED.WaitingOnTAR = FALSE; FED.Count = 0; FED.Now = 0.0; FED.TargetTime = UPDATE_INTERVAL; FED.LastTS = 0.0; FED.TagCount = 0; /* register object instances; subscribe to object class */ InitObjects(); /* * Create a pool of 1000 buffers, each to hold one incoming message. * MSGS_SIZE is passed a payload size, and returns the size of a message, * including both payload and header */ FED.FreePool = MB_MakePool (1000, MSGS_SIZE(sizeof(MU))); if (! FED.FreePool) {printf("Couldn't create buffer pool\n"); exit(1);} /* wait until everyone subscribed before sending messages */ RTIKIT_Barrier(); printf( "Entering Simulation Loop...\n" ); fflush(stdout); /* main event processing loop */ /* TM_LE is a less-than-or-equal-to predicate comparing time values */ while (TM_LE(FED.Now, ENDTIME)) { /* The message must have non-zero lookahead using TAR/NER */ MBuf.msgS.TimeStamp = FED.Now + 0.0001; /* fill in payload for message */ /* RTIKIT_nodeid gives ID of this federate (0, 1, 2, ...) */ if ((int)RTIKIT_nodeid == 0) { MBuf.mu.id = 0; MBuf.mu.x = FED.Now; MBuf.mu.y = 12; } if ((int)RTIKIT_nodeid == 1) { MBuf.mu.id = 1; MBuf.mu.x = FED.Now; MBuf.mu.y = 34; } /* send message */ RTI_UpdateAttributeValues(FED.MyInstance, (struct MsgS *)&MBuf,MSGS_SIZE(sizeof(MU)), 0); FED.WaitingOnTAR = TRUE; RTI_TimeAdvanceRequest(TM_Add(FED.Now,1.0)); /* TM_Add adds time values */ if( FED.Now>=ENDTIME ) {break;} /* wait for time advance grant */ while (FED.WaitingOnTAR) { BRTI_Tick(); } } printf( "Exited the simulation loop at now=%f ***\n", FED.Now ); fflush( stdout ); return (0); } /************************************************************************* * SET UP PUBLICATIONS AND SUBSCRIPTIONS *************************************************************************/ void InitObjects(void) { char MyClassName[100]; char YourClassName[100]; /* publish my object */ if ((int)RTIKIT_nodeid == 0){ sprintf (MyClassName, "RedMU"); } else if ((int)RTIKIT_nodeid == 1){ sprintf (MyClassName, "BlueMU"); } FED.MyClass = RTI_GetObjClassHandle (MyClassName); if (FED.MyClass == NULL) {printf ("Couldn't get class handle for %s\n", MyClassName); exit(1);} /* declare intent to publish to class, instantiate one object instance */ RTI_PublishObjClass (FED.MyClass); FED.MyInstance = RTI_RegisterObjInstance(FED.MyClass); /* subscribe to a object */ if ((int)RTIKIT_nodeid == 0) { sprintf (YourClassName,"BlueMU"); } else if ((int)RTIKIT_nodeid == 1) { sprintf (YourClassName,"RedMU"); } FED.YourClass = RTI_GetObjClassHandle (YourClassName); if (FED.YourClass == NULL) {printf ("Couldn't get class handle for %s\n", YourClassName); exit(1);} if (! RTI_IsClassSubscriptionInitialized (FED.YourClass)) RTI_InitObjClassSubscription (FED.YourClass, MyWhereProc, NULL); RTI_SubscribeObjClassAttributes(FED.YourClass); RTIKIT_Barrier(); } /************************************************************************* ************************************************************************* * FEDERATE DEFINED MESSAGE HANDLERS ************************************************************************* *************************************************************************/ /************************************************************************** * TimeAdvanceGrant is called by the RTI when it advances the federate's * logical time, in response to a request such as TimeAdvanceRequest or * NextEventRequest. * ReflectAttributeValues is called when the RTI is delivering a message * to the RTI that was send via UpdateAttributeValues. */ void TimeAdvanceGrant (TM_Time GrantTime) { if (TM_LT(GrantTime, FED.Now)) { printf ("Error: logical time decreases\n"); exit(1); } if (TM_LT(GrantTime, FED.LastTS)) { printf ("Error: time advance < prior update\n"); exit(1); } if (TM_GT(GrantTime, FED.TargetTime)) { printf ("\n%d:%f\n", (int)RTIKIT_nodeid, GrantTime); fflush(stdout); FED.TargetTime += UPDATE_INTERVAL; } FED.TagCount++; FED.Now = GrantTime; FED.WaitingOnTAR = FALSE; } void ReflectAttributeValues (TM_Time TimeStamp, struct MsgS *Msg, long MsgSize, long MsgType) { MyMessage *msg = (MyMessage *)Msg; if (TM_LT(TimeStamp, FED.Now)) { printf ("Error: attribute update occurs in past\n"); exit(1); } if (TM_LT(TimeStamp, FED.LastTS)) { printf ("Error: attribute updates out of order\n"); exit(1); } FED.LastTS = TimeStamp; printf ("\n[FedID:%d] ReflectAttr: %d %f %d \n", (int)RTIKIT_nodeid, msg->mu.id, msg->mu.x, msg->mu.y); /* release memory allocated by whereproc procedure */ MB_FreeBuffer (FED.FreePool, Msg); } /************************************************************************** * WhereProc procedure is called by the RTI prior to receive a message. * It must return a pointer to where the incoming message is to be stored */ char *MyWhereProc (long MsgSize, void *Context, long MsgType) { char * buf; /* consistency checks */ if (MsgType != 0) {printf ("WhereProc: unexpected msg type\n"); fflush(stdout); exit (1);} /* allocate a message buffer and return a pointer to it */ if ((buf=MB_GetBuffer(FED.FreePool)) == NULL) { printf ("Error: queue overflow, quiting\n"); exit(1);} return ((char *) buf); } /************************************************************************** * The following procedures are used by the message retraction services. * RequestRetraction is called by the RTI when a previously delivered * message has been retracted by the sender. * MyHereProc is called when the RTI has annihilated a message because * it was retracted, and is returning the message buffer that was * allocated via WhereProc */ void RequestRetraction(EventRetractionHandle erh) { printf ("Error: unexpected retraction requested, quiting\n"); exit(1); } void MyHereProc (long MsgSize, struct MsgS *Msg) { MB_FreeBuffer (FED.FreePool, Msg); printf ("Error: unexpected HereProc call, quiting\n"); exit(1); }