Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: Implement clock sync reactors #114

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/posix/federated/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size
}

DEFINE_REACTION_STRUCT(Receiver, 0, 1)
DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0)
DEFINE_INPUT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_CTOR(In, 1, msg_t, 0)

typedef struct {
Reactor super;
Expand Down
8 changes: 4 additions & 4 deletions examples/posix/federated/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigne
DEFINE_TIMER_STRUCT(Timer1, 1)
DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1))
DEFINE_REACTION_STRUCT(Sender, 0, 1)
DEFINE_OUTPUT_PORT_STRUCT(Out, 1, 1)
DEFINE_OUTPUT_PORT_CTOR(Out, 1)
DEFINE_OUTPUT_STRUCT(Out, 1, 1)
DEFINE_OUTPUT_CTOR(Out, 1)

typedef struct {
Reactor super;
Expand Down Expand Up @@ -63,8 +63,8 @@ void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **c
}

DEFINE_REACTION_STRUCT(Receiver, 0, 1)
DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0)
DEFINE_INPUT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_CTOR(In, 1, msg_t, 0)

typedef struct {
Reactor super;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ typedef struct {
} msg_t;

DEFINE_REACTION_STRUCT(Receiver, 0, 0)
DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0)
DEFINE_INPUT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_CTOR(In, 1, msg_t, 0)

typedef struct {
Reactor super;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ typedef struct {
} msg_t;

DEFINE_REACTION_STRUCT(Receiver, 0, 0)
DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0)
DEFINE_INPUT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_CTOR(In, 1, msg_t, 0)

typedef struct {
Reactor super;
Expand Down
4 changes: 2 additions & 2 deletions examples/zephyr/basic_federated/federated_sender/src/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ static const struct gpio_dt_spec led = GPIO_DT_SPEC_GET(LED0_NODE, gpios);
DEFINE_ACTION_STRUCT(Action1, PHYSICAL_ACTION, 1, 0, bool, 2)
DEFINE_ACTION_CTOR_FIXED(Action1, PHYSICAL_ACTION, 1, 0, bool, 2, MSEC(0))
DEFINE_REACTION_STRUCT(Sender, 0, 1)
DEFINE_OUTPUT_PORT_STRUCT(Out, 1, 2)
DEFINE_OUTPUT_PORT_CTOR(Out, 1)
DEFINE_OUTPUT_STRUCT(Out, 1, 2)
DEFINE_OUTPUT_CTOR(Out, 1)
Action1 *action_ptr = NULL;

void button_pressed(const struct device *dev, struct gpio_callback *cb, uint32_t pins) {
Expand Down
119 changes: 88 additions & 31 deletions include/reactor-uc/macros.h

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions include/reactor-uc/port.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ struct Port {

struct Input {
Port super;
TriggerEffects effects; // The reactions triggered by this Input port.
void *value_ptr; // Pointer to the `buffer` field in the user Input port struct.
size_t value_size; // Size of the data stored in this Input Port.
void *value_ptr; // Pointer to the `buffer` field in the user Input port struct.
size_t value_size; // Size of the data stored in this Input Port.
};

struct Output {
Port super;
TriggerSources sources; // The reactions that can write to this Output port.
};

void Input_ctor(Input *self, Reactor *parent, Reaction **effects, size_t effects_size, Connection **conns_out,
Expand All @@ -38,6 +36,7 @@ void Output_ctor(Output *self, Reactor *parent, Reaction **sources, size_t sourc
size_t conns_out_size);

void Port_ctor(Port *self, TriggerType type, Reactor *parent, Connection **conns_out, size_t conns_out_size,
Reaction **sources, size_t sources_size, Reaction **effects, size_t effects_size,
void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *));

#endif
7 changes: 5 additions & 2 deletions include/reactor-uc/trigger.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ typedef struct {
struct Trigger {
TriggerType type;
Reactor *parent;
TriggerSources sources;
TriggerEffects effects;
bool is_present;
bool is_registered_for_cleanup; // Field used by Scheduler to avoid adding the same trigger multiple times to the
// linked list of triggers registered for cleanup
Expand All @@ -64,7 +66,8 @@ struct Trigger {
void (*cleanup)(Trigger *);
} __attribute__((aligned(MEM_ALIGNMENT)));

void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, EventPayloadPool *payload_pool,
void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *));
void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, EventPayloadPool *payload_pool, Reaction **sources,
size_t sources_size, Reaction **effects, size_t effects_size, void (*prepare)(Trigger *, Event *),
void (*cleanup)(Trigger *));

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class UcPortGenerator(private val reactor: Reactor, private val connectionGenera
fun getEffects(port: Input) = reactor.reactions.filter { it.triggers.filter { it.name == port.name }.isNotEmpty() }
fun getSources(port: Output) = reactor.reactions.filter { it.effects.filter { it.name == port.name }.isNotEmpty() }

fun generateSelfStruct(input: Input) = "DEFINE_INPUT_PORT_STRUCT(${input.codeType}, ${getEffects(input).size}, ${input.type.toText()})"
fun generateInputCtor(input: Input) = "DEFINE_INPUT_PORT_CTOR(${input.codeType}, ${getEffects(input).size}, ${input.type.toText()})"
fun generateSelfStruct(output: Output) = "DEFINE_OUTPUT_PORT_STRUCT(${output.codeType}, ${getSources(output).size})"
fun generateOutputCtor(output: Output) = "DEFINE_OUTPUT_PORT_CTOR(${output.codeType}, ${getSources(output).size})"
fun generateSelfStruct(input: Input) = "DEFINE_INPUT_STRUCT(${input.codeType}, ${getEffects(input).size}, ${input.type.toText()})"
fun generateInputCtor(input: Input) = "DEFINE_INPUT_CTOR(${input.codeType}, ${getEffects(input).size}, ${input.type.toText()})"
fun generateSelfStruct(output: Output) = "DEFINE_OUTPUT_STRUCT(${output.codeType}, ${getSources(output).size})"
fun generateOutputCtor(output: Output) = "DEFINE_OUTPUT_CTOR(${output.codeType}, ${getSources(output).size})"

fun generateSelfStructs() = reactor.inputs.plus(reactor.outputs).joinToString(prefix = "// Port structs\n", separator = "\n", postfix = "\n") {
when (it) {
Expand Down
12 changes: 12 additions & 0 deletions lib/clock-sync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Clock Synchronization library
These library reactors are inspired by the PTP protocol. Currently the sync reactors only have
one input and output, meaning that we need one per connection to remote federate.
Connections between the slave and master should be physical.

Three messages are exchanged between Master and Slave:
1. First the master sends a SYNC message to the slave. The SYNC message contains the timestamp t1
2. The slave records the time of reception of SYNC as t2 (this will be the logical tag at which the reaction triggered by the SYNC message is executed.)
3. The sync responds with a DELAY_REQ message. The slave stores t3, the time of transmission.
4. The master records the time of receiving the DELAY_REQ as t4. Again this will be the logical tag
of the reaction triggered by the receiving event. The master sends this timestamp in a DELAY_RESP message sent back.

106 changes: 106 additions & 0 deletions lib/clock-sync/clock-master.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include "clock-master.h"

DEFINE_ACTION_CTOR_FIXED(ClockMaster, a_periodic, LOGICAL_ACTION, 1, 1, ClockSyncMessageType, 1, MSEC(0));
DEFINE_ACTION_CTOR_FIXED(ClockMaster, a_send, LOGICAL_ACTION, 1, 1, ClockSyncMessageType, 1, MSEC(0));
DEFINE_OUTPUT_CTOR(ClockMaster, p_out, 1);
DEFINE_INPUT_CTOR(ClockMaster, p_in, 1, ClockSyncMessage, 1);

DEFINE_REACTION_CTOR(ClockMaster, r_startup, 0);
DEFINE_REACTION_CTOR(ClockMaster, r_action, 1);
DEFINE_REACTION_CTOR(ClockMaster, r_input, 2);
DEFINE_REACTION_CTOR(ClockMaster, r_send, 3);

// Startup reaction
DEFINE_REACTION_BODY(ClockMaster, r_startup) {
SCOPE_SELF(ClockMaster);
SCOPE_ACTION(ClockMaster, a_periodic);
SCOPE_ACTION(ClockMaster, a_send);
SCOPE_ENV();

// Schedule next round
lf_schedule(a_periodic, CLOCK_SYNC, self->period);

// Schedule the sending
lf_schedule(a_send, CLOCK_SYNC, MSEC(0));
}

// Reaction triggered by action. Should start a new round of clock sync.
DEFINE_REACTION_BODY(ClockMaster, r_action) {
SCOPE_SELF(ClockMaster);
SCOPE_ACTION(ClockMaster, a_send);
SCOPE_ACTION(ClockMaster, a_periodic);
assert(a_periodic->value == CLOCK_SYNC);

// Schedule the sending
lf_schedule(a_send, CLOCK_SYNC, MSEC(0));
}

// Reaction triggered by input
DEFINE_REACTION_BODY(ClockMaster, r_input) {
SCOPE_SELF(ClockMaster);
SCOPE_ENV();
SCOPE_ACTION(ClockMaster, a_send);
SCOPE_PORT(ClockMaster, p_in);

assert(p_in->value.type == CLOCK_DELAY_REQ);
lf_schedule(a_send, CLOCK_DELAY_RESP, MSEC(0));
}

// Reaction triggered by send action
DEFINE_REACTION_BODY(ClockMaster, r_send) {
SCOPE_SELF(ClockMaster);
SCOPE_ENV();
SCOPE_ACTION(ClockMaster, a_send);
SCOPE_PORT(ClockMaster, p_out);

ClockSyncMessage msg;
msg.type = a_send->value;

switch (a_send->value) {
case CLOCK_SYNC:
msg.time = env->get_physical_time(env);
break;
case CLOCK_DELAY_RESP:
msg.time = env->get_logical_time(env);
break;
default:
assert(0);
}

lf_set(p_out, msg);
}

void ClockMaster_ctor(ClockMaster *self, Reactor *parent, Environment *env, Connection **conn_out, interval_t period) {
int _reactions_idx = 0;
int _triggers_idx = 0;
int _child_idx = 0;

INIT_REACTION(ClockMaster, r_startup);
INIT_REACTION(ClockMaster, r_action);
INIT_REACTION(ClockMaster, r_input);
INIT_REACTION(ClockMaster, r_send);

INIT_STARTUP(ClockMaster);
INIT_ACTION(ClockMaster, a_send);
INIT_ACTION(ClockMaster, a_periodic);
INIT_OUTPUT(ClockMaster, p_out, conn_out, 1);
INIT_INPUT(ClockMaster, p_in);

self->period = period;

Reactor_ctor(&self->super, "ClockSync", env, parent, self->_children, _child_idx, self->_reactions, _reactions_idx,
self->_triggers, _triggers_idx);

REACTION_TRIGGER(r_action, a_periodic);
REACTION_EFFECT(r_action, r_send);

REACTION_TRIGGER(r_input, p_in);
REACTION_EFFECT(r_input, r_send);

REACTION_TRIGGER(r_startup, startup);
REACTION_EFFECT(r_startup, a_periodic);
REACTION_EFFECT(r_startup, a_send);

REACTION_TRIGGER(r_send, a_send);
REACTION_EFFECT(r_send, p_out);
}
43 changes: 43 additions & 0 deletions lib/clock-sync/clock-master.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef REACTOR_UC_CLOCK_MASTER_H
#define REACTOR_UC_CLOCK_MASTER_H

#include "reactor-uc/reactor-uc.h"
#include "clock-sync.h"

DEFINE_ACTION_STRUCT(ClockMaster, a_send, LOGICAL_ACTION, 1, 1, ClockSyncMessageType, 1);
DEFINE_ACTION_STRUCT(ClockMaster, a_periodic, LOGICAL_ACTION, 1, 1, ClockSyncMessageType, 1);
DEFINE_STARTUP_STRUCT(ClockMaster, 1);
DEFINE_OUTPUT_STRUCT(ClockMaster, p_out, 2, 1);
DEFINE_INPUT_STRUCT(ClockMaster, p_in, 1, ClockSyncMessage, 1);

DEFINE_REACTION_STRUCT(ClockMaster, r_startup, 1);
DEFINE_REACTION_STRUCT(ClockMaster, r_action, 2);
DEFINE_REACTION_STRUCT(ClockMaster, r_send, 1);
DEFINE_REACTION_STRUCT(ClockMaster, r_input, 1);

#define CLOCK_MASTER_NUM_REACTIONS 4
#define CLOCK_MASTER_NUM_TRIGGERS 4
#define CLOCK_MASTER_NUM_CHILDREN 0

typedef struct {
Reactor super;

INSTANTIATE_REACTION(ClockMaster, r_startup);
INSTANTIATE_REACTION(ClockMaster, r_action);
INSTANTIATE_REACTION(ClockMaster, r_send);
INSTANTIATE_REACTION(ClockMaster, r_input);
INSTANTIATE_ACTION(ClockMaster, a_send);
INSTANTIATE_ACTION(ClockMaster, a_periodic);
INSTANTIATE_STARTUP(ClockMaster);
INSTANTIATE_PORT(ClockMaster, p_out);
INSTANTIATE_PORT(ClockMaster, p_in);

Reaction *_reactions[CLOCK_MASTER_NUM_REACTIONS];
Trigger *_triggers[CLOCK_MASTER_NUM_TRIGGERS];
Reactor *_children[CLOCK_MASTER_NUM_CHILDREN];
interval_t period;
} ClockMaster;

void ClockMaster_ctor(ClockMaster *self, Environment *env, Connection **conn_out, interval_t period);

#endif
83 changes: 83 additions & 0 deletions lib/clock-sync/clock-slave.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

#include "clock-slave.h"

DEFINE_OUTPUT_CTOR(Out, 2);
DEFINE_ACTION_CTOR_FIXED(Action0, LOGICAL_ACTION, 1, 1, ClockSyncMessageType, 1, MSEC(0));
DEFINE_OUTPUT_CTOR(Out, 1);
DEFINE_INPUT_CTOR(In, 1, ClockSyncMessage, 1);
// TODO: Make sure that we dont overwrite timestamps before we have calculated the offset.
// maybe the slave should request this whole thing, and not the master?

// Startup reaction
DEFINE_REACTION_BODY(ClockSlave, 0) {
ClockSlave *self = (ClockSlave *)_self->parent;
Action0 *send_action = &self->send_action;
Environment *env = self->super.env;
}
DEFINE_REACTION_CTOR(ClockSlave, 0);

// Send reaction
DEFINE_REACTION_BODY(ClockSlave, 1) {
ClockSlave *self = (ClockSlave *)_self->parent;
Environment *env = self->super.env;

Action0 *action = &self->send_action;
Out *out = &self->out;
assert(action->value == CLOCK_DELAY_REQ);

ClockSyncMessage msg;
msg.type = action->value;
msg.time = 0;
self->t3 = env->get_physical_time(env);
lf_set(out, msg);
}
DEFINE_REACTION_CTOR(ClockSlave, 1);

// Reaction triggered by input
DEFINE_REACTION_BODY(ClockSlave, 2) {
ClockSlave *self = (ClockSlave *)_self->parent;
Environment *env = self->super.env;
In *in = &self->in;
Action0 *send_action = &self->send_action;

if (in->value.type == CLOCK_SYNC) {
self->t1 = in->value.time;
self->t2 = env->get_logical_time(env);
lf_schedule(send_action, CLOCK_DELAY_REQ, MSEC(0));
} else if (in->value.type == CLOCK_DELAY_RESP) {
self->t4 = in->value.time;
// FIXME: Calculate offset and adjust clock.
} else {
assert(0);
}
}
DEFINE_REACTION_CTOR(ClockSlave, 2);

void ClockSlave_ctor(ClockSlave *self, Environment *env, Connection **conn_out) {
self->_reactions[0] = (Reaction *)&self->reaction0;
self->_reactions[1] = (Reaction *)&self->reaction1;
self->_reactions[2] = (Reaction *)&self->reaction2;
self->_triggers[0] = (Trigger *)&self->startup;
self->_triggers[1] = (Trigger *)&self->send_action;
self->_triggers[2] = (Trigger *)&self->out;
self->_triggers[3] = (Trigger *)&self->in;

Reactor_ctor(&self->super, "ClockSlave", env, NULL, NULL, 0, self->_reactions, CLOCK_SLAVE_NUM_REACTIONS,
self->_triggers, CLOCK_SLAVE_NUM_TRIGGERS);

ClockSlave_Reaction0_ctor(&self->reaction0, &self->super);
ClockSlave_Reaction1_ctor(&self->reaction1, &self->super);
ClockSlave_Reaction2_ctor(&self->reaction2, &self->super);

Action0_ctor(&self->send_action, &self->super);
Startup0_ctor(&self->startup, &self->super);
In_ctor(&self->in, &self->super);
Out_ctor(&self->out, &self->super, conn_out, 1);

BUILTIN_REGISTER_EFFECT(self->startup, self->reaction0);
ACTION_REGISTER_EFFECT(self->send_action, self->reaction1);
INPUT_REGISTER_EFFECT(self->in, self->reaction2);
OUTPUT_REGISTER_SOURCE(self->out, self->reaction1);
REACTION_REGISTER_EFFECT(self->reaction2, self->send_action);
REACTION_REGISTER_EFFECT(self->reaction1, self->out);
}
Loading
Loading