Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal interface experiments #137

Closed
wants to merge 14 commits into from
31 changes: 17 additions & 14 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
extern crate prost_build;
extern crate protobuf_src;

fn main() {
// we use the protobuf-src which provides the protoc compiler. This line makes it available
// to prost-build
std::env::set_var("PROTOC", protobuf_src::protoc());

// this line comes from the prost-build example and compiles items.proto into corresponding types.
// the generated code is under ./target/<goal, e.g. debug>/build/<project-name>-<some-hash>/out
prost_build::compile_protos(
&[
"src/simulation/wire_types/messages.proto",
"src/simulation/wire_types/events.proto",
"src/simulation/wire_types/ids.proto",
"src/simulation/wire_types/network.proto",
"src/simulation/wire_types/population.proto",
"src/simulation/wire_types/vehicles.proto",
],
&["src/"],
)
.unwrap();
let proto_files = [
"src/simulation/wire_types/messages.proto",
"src/simulation/wire_types/events.proto",
"src/simulation/wire_types/ids.proto",
"src/simulation/wire_types/network.proto",
"src/simulation/wire_types/population.proto",
"src/simulation/wire_types/vehicles.proto",
];

// tell cargo to rerun this build script if any of the proto files change
for proto in &proto_files {
println!("cargo:rerun-if-changed={}", proto);
}

// Compiling the protobuf files
prost_build::compile_protos(&proto_files, &["src/"]).unwrap();
}
170 changes: 170 additions & 0 deletions src/experiments/internal_interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::cell::RefCell;
use std::rc::{Rc, Weak};

enum State {
ACTIVITY,
TELEPORTATION,
}

struct Agent {
id: String,
state: State,
}

impl Agent {
fn new(id: String, state: State) -> Self {
Agent { id, state }
}
}

struct InternalInterface {
activity_engine: Rc<RefCell<dyn Engine>>,
teleportation_engine: Rc<RefCell<dyn Engine>>,
}

impl InternalInterface {
fn arrange_next_agent_state(&self, agent: Agent) {
match agent.state {
State::ACTIVITY => self.activity_engine.borrow_mut().receive_agent(agent),
State::TELEPORTATION => self.teleportation_engine.borrow_mut().receive_agent(agent),
}
}
}

struct Simulation {
engines: Vec<Rc<RefCell<dyn Engine>>>,
internal_interface: Rc<RefCell<InternalInterface>>,
}

impl Simulation {
fn new(
engines: Vec<Rc<RefCell<dyn Engine>>>,
internal_interface: Rc<RefCell<InternalInterface>>,
) -> Self {
Simulation {
internal_interface,
engines,
}
}

fn run(&mut self) {
let mut now = 0;
while now < 20 {
for engine in &self.engines {
engine.borrow_mut().do_step(now);
}
now += 1;
}
}
}

trait Engine {
fn do_step(&mut self, now: u32);
fn set_internal_interface(&mut self, internal_interface: Weak<RefCell<InternalInterface>>);
fn receive_agent(&mut self, agent: Agent);
}

struct ActivityEngine {
agents: Vec<Agent>,
//to prevent memory leaks, we use Weak instead of Rc (https://doc.rust-lang.org/book/ch15-06-reference-cycles.html)
internal_interface: Weak<RefCell<InternalInterface>>,
}

impl Engine for ActivityEngine {
fn do_step(&mut self, now: u32) {
if now % 10 == 0 {
println!("Activity engine: Time step {}, stop activity", now);
self.agents.get_mut(0).unwrap().state = State::TELEPORTATION;
self.internal_interface
.upgrade()
.unwrap()
.borrow_mut()
.arrange_next_agent_state(self.agents.remove(0))
} else {
// println!("Activity engine: Time step {}, doing nothing", now)
}
}

fn set_internal_interface(&mut self, internal_interface: Weak<RefCell<InternalInterface>>) {
self.internal_interface = internal_interface;
}

fn receive_agent(&mut self, agent: Agent) {
println!("Activity engine: Received agent");
self.agents.push(agent);
}
}

struct TeleportationEngine {
agents: Vec<Agent>,
//to prevent memory leaks, we use Weak instead of Rc (https://doc.rust-lang.org/book/ch15-06-reference-cycles.html)
internal_interface: Weak<RefCell<InternalInterface>>,
}

impl Engine for TeleportationEngine {
fn do_step(&mut self, now: u32) {
if now % 10 == 5 {
println!(
"Teleportation engine: Time step {}, stop teleportation",
now
);
self.agents.get_mut(0).unwrap().state = State::ACTIVITY;
self.internal_interface
.upgrade()
.unwrap()
.borrow_mut()
.arrange_next_agent_state(self.agents.remove(0))
} else {
// println!("Teleportation engine: Time step {}, doing nothing", now)
}
}

fn set_internal_interface(&mut self, internal_interface: Weak<RefCell<InternalInterface>>) {
self.internal_interface = internal_interface;
}

fn receive_agent(&mut self, agent: Agent) {
println!("Teleportation engine: Received agent");
self.agents.push(agent);
}
}

#[cfg(test)]
mod tests {
use crate::experiments::internal_interface::{
ActivityEngine, Agent, Engine, InternalInterface, Simulation, State, TeleportationEngine,
};
use std::cell::RefCell;
use std::rc::{Rc, Weak};

#[test]
fn test_run() {
let activity_engine: Rc<RefCell<dyn Engine>> = Rc::new(RefCell::new(ActivityEngine {
agents: vec![Agent::new(String::from("agent"), State::ACTIVITY)],
internal_interface: Weak::new(),
}));
let teleportation_engine: Rc<RefCell<dyn Engine>> =
Rc::new(RefCell::new(TeleportationEngine {
agents: Vec::new(),
internal_interface: Weak::new(),
}));
let internal_interface = Rc::new(RefCell::new(InternalInterface {
activity_engine: Rc::clone(&activity_engine),
teleportation_engine: Rc::clone(&teleportation_engine),
}));

activity_engine
.borrow_mut()
.set_internal_interface(Rc::downgrade(&internal_interface));
teleportation_engine
.borrow_mut()
.set_internal_interface(Rc::downgrade(&internal_interface));

let mut sim = Simulation::new(
vec![activity_engine, teleportation_engine],
internal_interface,
);

sim.run();
}
}
1 change: 1 addition & 0 deletions src/experiments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod concurrent_dispatch;
mod concurrent_ping_pong;
mod internal_interface;
mod lifetimes_example;
mod link_automat_with_adjacency_list;
mod link_automat_with_references;
Expand Down
6 changes: 4 additions & 2 deletions src/simulation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Config {
let default = Simulation::default();
self.modules
.borrow_mut()
.insert("simulation".to_string(), Box::new(default));
.insert("simulation".to_string(), Box::new(default.clone()));
default
}
}
Expand Down Expand Up @@ -177,12 +177,13 @@ pub struct Routing {
pub mode: RoutingMode,
}

#[derive(Serialize, Deserialize, Clone, Copy)]
#[derive(Serialize, Deserialize, Clone)]
pub struct Simulation {
pub start_time: u32,
pub end_time: u32,
pub sample_size: f32,
pub stuck_threshold: u32,
pub passenger_modes: Vec<String>,
}

#[typetag::serde(tag = "type")]
Expand Down Expand Up @@ -232,6 +233,7 @@ impl Default for Simulation {
end_time: 86400,
sample_size: 1.0,
stuck_threshold: u32::MAX,
passenger_modes: vec![],
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/simulation/controller.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::thread::{sleep, JoinHandle};
Expand Down Expand Up @@ -123,15 +124,15 @@ fn execute_partition<C: SimCommunicator + 'static>(comm: C, args: &CommandLineAr
population.persons.len()
);

let mut events = EventsPublisher::new();
let events = Rc::new(RefCell::new(EventsPublisher::new()));

if config.output().write_events == WriteEvents::Proto {
let events_file = format!("events.{rank}.binpb");
let events_path = output_path.join(events_file);
events.add_subscriber(Box::new(ProtoEventsWriter::new(&events_path)));
events.borrow_mut().add_subscriber(Box::new(ProtoEventsWriter::new(&events_path)));
}
let travel_time_collector = Box::new(TravelTimeCollector::new());
events.add_subscriber(travel_time_collector);
events.borrow_mut().add_subscriber(travel_time_collector);

let rc = Rc::new(comm);

Expand Down
87 changes: 87 additions & 0 deletions src/simulation/engines/activity_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use crate::simulation::engines::AgentStateTransitionLogic;
use crate::simulation::id::Id;
use crate::simulation::messaging::communication::communicators::SimCommunicator;
use crate::simulation::messaging::events::EventsPublisher;
use crate::simulation::time_queue::TimeQueue;
use crate::simulation::wire_types::events::Event;
use crate::simulation::wire_types::population::Person;
use std::cell::RefCell;
use std::rc::{Rc, Weak};

pub struct ActivityEngine<C: SimCommunicator> {
activity_q: TimeQueue<Person>,
events: Rc<RefCell<EventsPublisher>>,
agent_state_transition_logic: Weak<RefCell<AgentStateTransitionLogic<C>>>,
}

impl<C: SimCommunicator + 'static> ActivityEngine<C> {
pub fn new(activity_q: TimeQueue<Person>, events: Rc<RefCell<EventsPublisher>>) -> Self {
ActivityEngine {
activity_q,
events,
agent_state_transition_logic: Weak::new(),
}
}

pub(crate) fn do_step(&mut self, now: u32) {
let agents = self.wake_up(now);
for mut agent in agents {
agent.advance_plan();

self.agent_state_transition_logic
.upgrade()
.unwrap()
.borrow_mut()
.arrange_next_agent_state(now, agent);
}
}

pub(crate) fn receive_agent(&mut self, now: u32, agent: Person) {
self.events.borrow_mut().publish_event(
now,
&Event::new_arrival(
agent.id,
agent.curr_act().link_id,
agent.previous_leg().mode,
),
);

// emmit act start event
let act = agent.curr_act();
let act_type: Id<String> = Id::get(act.act_type);
self.events.borrow_mut().publish_event(
now,
&Event::new_act_start(agent.id, act.link_id, act_type.internal()),
);
self.activity_q.add(agent, now);
}

pub(crate) fn set_agent_state_transition_logic(
&mut self,
agent_state_transition_logic: Weak<RefCell<AgentStateTransitionLogic<C>>>,
) {
self.agent_state_transition_logic = agent_state_transition_logic
}

pub fn agents(&mut self) -> Vec<&mut Person> {
//TODO
vec![]
}

fn wake_up(&mut self, now: u32) -> Vec<Person> {
let mut agents = self.activity_q.pop(now);

for agent in agents.iter_mut() {
// self.update_agent(&mut agent, now);
//TODO (used for routing)

let act_type: Id<String> = Id::get(agent.curr_act().act_type);
self.events.borrow_mut().publish_event(
now,
&Event::new_act_end(agent.id, agent.curr_act().link_id, act_type.internal()),
);
}

agents
}
}
Loading
Loading