diff --git a/build.rs b/build.rs index 2e91a65..36c9a43 100644 --- a/build.rs +++ b/build.rs @@ -1,22 +1,25 @@ extern crate prost_build; +extern crate protobuf_src; fn main() { // we use the protobuf-src which provides the protoc compiler. This line makes it available // to prost-build std::env::set_var("PROTOC", protobuf_src::protoc()); - // this line comes from the prost-build example and compiles items.proto into corresponding types. - // the generated code is under ./target//build/-/out - prost_build::compile_protos( - &[ - "src/simulation/wire_types/messages.proto", - "src/simulation/wire_types/events.proto", - "src/simulation/wire_types/ids.proto", - "src/simulation/wire_types/network.proto", - "src/simulation/wire_types/population.proto", - "src/simulation/wire_types/vehicles.proto", - ], - &["src/"], - ) - .unwrap(); + let proto_files = [ + "src/simulation/wire_types/messages.proto", + "src/simulation/wire_types/events.proto", + "src/simulation/wire_types/ids.proto", + "src/simulation/wire_types/network.proto", + "src/simulation/wire_types/population.proto", + "src/simulation/wire_types/vehicles.proto", + ]; + + // tell cargo to rerun this build script if any of the proto files change + for proto in &proto_files { + println!("cargo:rerun-if-changed={}", proto); + } + + // Compiling the protobuf files + prost_build::compile_protos(&proto_files, &["src/"]).unwrap(); } diff --git a/src/experiments/internal_interface.rs b/src/experiments/internal_interface.rs new file mode 100644 index 0000000..969debf --- /dev/null +++ b/src/experiments/internal_interface.rs @@ -0,0 +1,170 @@ +use std::cell::RefCell; +use std::rc::{Rc, Weak}; + +enum State { + ACTIVITY, + TELEPORTATION, +} + +struct Agent { + id: String, + state: State, +} + +impl Agent { + fn new(id: String, state: State) -> Self { + Agent { id, state } + } +} + +struct InternalInterface { + activity_engine: Rc>, + teleportation_engine: Rc>, +} + +impl InternalInterface { + fn arrange_next_agent_state(&self, agent: Agent) { + match agent.state { + State::ACTIVITY => self.activity_engine.borrow_mut().receive_agent(agent), + State::TELEPORTATION => self.teleportation_engine.borrow_mut().receive_agent(agent), + } + } +} + +struct Simulation { + engines: Vec>>, + internal_interface: Rc>, +} + +impl Simulation { + fn new( + engines: Vec>>, + internal_interface: Rc>, + ) -> Self { + Simulation { + internal_interface, + engines, + } + } + + fn run(&mut self) { + let mut now = 0; + while now < 20 { + for engine in &self.engines { + engine.borrow_mut().do_step(now); + } + now += 1; + } + } +} + +trait Engine { + fn do_step(&mut self, now: u32); + fn set_internal_interface(&mut self, internal_interface: Weak>); + fn receive_agent(&mut self, agent: Agent); +} + +struct ActivityEngine { + agents: Vec, + //to prevent memory leaks, we use Weak instead of Rc (https://doc.rust-lang.org/book/ch15-06-reference-cycles.html) + internal_interface: Weak>, +} + +impl Engine for ActivityEngine { + fn do_step(&mut self, now: u32) { + if now % 10 == 0 { + println!("Activity engine: Time step {}, stop activity", now); + self.agents.get_mut(0).unwrap().state = State::TELEPORTATION; + self.internal_interface + .upgrade() + .unwrap() + .borrow_mut() + .arrange_next_agent_state(self.agents.remove(0)) + } else { + // println!("Activity engine: Time step {}, doing nothing", now) + } + } + + fn set_internal_interface(&mut self, internal_interface: Weak>) { + self.internal_interface = internal_interface; + } + + fn receive_agent(&mut self, agent: Agent) { + println!("Activity engine: Received agent"); + self.agents.push(agent); + } +} + +struct TeleportationEngine { + agents: Vec, + //to prevent memory leaks, we use Weak instead of Rc (https://doc.rust-lang.org/book/ch15-06-reference-cycles.html) + internal_interface: Weak>, +} + +impl Engine for TeleportationEngine { + fn do_step(&mut self, now: u32) { + if now % 10 == 5 { + println!( + "Teleportation engine: Time step {}, stop teleportation", + now + ); + self.agents.get_mut(0).unwrap().state = State::ACTIVITY; + self.internal_interface + .upgrade() + .unwrap() + .borrow_mut() + .arrange_next_agent_state(self.agents.remove(0)) + } else { + // println!("Teleportation engine: Time step {}, doing nothing", now) + } + } + + fn set_internal_interface(&mut self, internal_interface: Weak>) { + self.internal_interface = internal_interface; + } + + fn receive_agent(&mut self, agent: Agent) { + println!("Teleportation engine: Received agent"); + self.agents.push(agent); + } +} + +#[cfg(test)] +mod tests { + use crate::experiments::internal_interface::{ + ActivityEngine, Agent, Engine, InternalInterface, Simulation, State, TeleportationEngine, + }; + use std::cell::RefCell; + use std::rc::{Rc, Weak}; + + #[test] + fn test_run() { + let activity_engine: Rc> = Rc::new(RefCell::new(ActivityEngine { + agents: vec![Agent::new(String::from("agent"), State::ACTIVITY)], + internal_interface: Weak::new(), + })); + let teleportation_engine: Rc> = + Rc::new(RefCell::new(TeleportationEngine { + agents: Vec::new(), + internal_interface: Weak::new(), + })); + let internal_interface = Rc::new(RefCell::new(InternalInterface { + activity_engine: Rc::clone(&activity_engine), + teleportation_engine: Rc::clone(&teleportation_engine), + })); + + activity_engine + .borrow_mut() + .set_internal_interface(Rc::downgrade(&internal_interface)); + teleportation_engine + .borrow_mut() + .set_internal_interface(Rc::downgrade(&internal_interface)); + + let mut sim = Simulation::new( + vec![activity_engine, teleportation_engine], + internal_interface, + ); + + sim.run(); + } +} diff --git a/src/experiments/mod.rs b/src/experiments/mod.rs index 717e9cc..21976fb 100644 --- a/src/experiments/mod.rs +++ b/src/experiments/mod.rs @@ -1,5 +1,6 @@ mod concurrent_dispatch; mod concurrent_ping_pong; +mod internal_interface; mod lifetimes_example; mod link_automat_with_adjacency_list; mod link_automat_with_references; diff --git a/src/simulation/config.rs b/src/simulation/config.rs index 24f357a..cd5b3c6 100644 --- a/src/simulation/config.rs +++ b/src/simulation/config.rs @@ -120,7 +120,7 @@ impl Config { let default = Simulation::default(); self.modules .borrow_mut() - .insert("simulation".to_string(), Box::new(default)); + .insert("simulation".to_string(), Box::new(default.clone())); default } } @@ -177,12 +177,13 @@ pub struct Routing { pub mode: RoutingMode, } -#[derive(Serialize, Deserialize, Clone, Copy)] +#[derive(Serialize, Deserialize, Clone)] pub struct Simulation { pub start_time: u32, pub end_time: u32, pub sample_size: f32, pub stuck_threshold: u32, + pub passenger_modes: Vec, } #[typetag::serde(tag = "type")] @@ -232,6 +233,7 @@ impl Default for Simulation { end_time: 86400, sample_size: 1.0, stuck_threshold: u32::MAX, + passenger_modes: vec![], } } } diff --git a/src/simulation/controller.rs b/src/simulation/controller.rs index 0cbd2c1..899ef81 100644 --- a/src/simulation/controller.rs +++ b/src/simulation/controller.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::thread::{sleep, JoinHandle}; @@ -123,15 +124,15 @@ fn execute_partition(comm: C, args: &CommandLineAr population.persons.len() ); - let mut events = EventsPublisher::new(); + let events = Rc::new(RefCell::new(EventsPublisher::new())); if config.output().write_events == WriteEvents::Proto { let events_file = format!("events.{rank}.binpb"); let events_path = output_path.join(events_file); - events.add_subscriber(Box::new(ProtoEventsWriter::new(&events_path))); + events.borrow_mut().add_subscriber(Box::new(ProtoEventsWriter::new(&events_path))); } let travel_time_collector = Box::new(TravelTimeCollector::new()); - events.add_subscriber(travel_time_collector); + events.borrow_mut().add_subscriber(travel_time_collector); let rc = Rc::new(comm); diff --git a/src/simulation/engines/activity_engine.rs b/src/simulation/engines/activity_engine.rs new file mode 100644 index 0000000..e800ba2 --- /dev/null +++ b/src/simulation/engines/activity_engine.rs @@ -0,0 +1,87 @@ +use crate::simulation::engines::AgentStateTransitionLogic; +use crate::simulation::id::Id; +use crate::simulation::messaging::communication::communicators::SimCommunicator; +use crate::simulation::messaging::events::EventsPublisher; +use crate::simulation::time_queue::TimeQueue; +use crate::simulation::wire_types::events::Event; +use crate::simulation::wire_types::population::Person; +use std::cell::RefCell; +use std::rc::{Rc, Weak}; + +pub struct ActivityEngine { + activity_q: TimeQueue, + events: Rc>, + agent_state_transition_logic: Weak>>, +} + +impl ActivityEngine { + pub fn new(activity_q: TimeQueue, events: Rc>) -> Self { + ActivityEngine { + activity_q, + events, + agent_state_transition_logic: Weak::new(), + } + } + + pub(crate) fn do_step(&mut self, now: u32) { + let agents = self.wake_up(now); + for mut agent in agents { + agent.advance_plan(); + + self.agent_state_transition_logic + .upgrade() + .unwrap() + .borrow_mut() + .arrange_next_agent_state(now, agent); + } + } + + pub(crate) fn receive_agent(&mut self, now: u32, agent: Person) { + self.events.borrow_mut().publish_event( + now, + &Event::new_arrival( + agent.id, + agent.curr_act().link_id, + agent.previous_leg().mode, + ), + ); + + // emmit act start event + let act = agent.curr_act(); + let act_type: Id = Id::get(act.act_type); + self.events.borrow_mut().publish_event( + now, + &Event::new_act_start(agent.id, act.link_id, act_type.internal()), + ); + self.activity_q.add(agent, now); + } + + pub(crate) fn set_agent_state_transition_logic( + &mut self, + agent_state_transition_logic: Weak>>, + ) { + self.agent_state_transition_logic = agent_state_transition_logic + } + + pub fn agents(&mut self) -> Vec<&mut Person> { + //TODO + vec![] + } + + fn wake_up(&mut self, now: u32) -> Vec { + let mut agents = self.activity_q.pop(now); + + for agent in agents.iter_mut() { + // self.update_agent(&mut agent, now); + //TODO (used for routing) + + let act_type: Id = Id::get(agent.curr_act().act_type); + self.events.borrow_mut().publish_event( + now, + &Event::new_act_end(agent.id, agent.curr_act().link_id, act_type.internal()), + ); + } + + agents + } +} diff --git a/src/simulation/engines/leg_engine.rs b/src/simulation/engines/leg_engine.rs new file mode 100644 index 0000000..7507659 --- /dev/null +++ b/src/simulation/engines/leg_engine.rs @@ -0,0 +1,340 @@ +use crate::simulation::engines::network_engine::NetworkEngine; +use crate::simulation::engines::teleportation_engine::TeleportationEngine; +use crate::simulation::engines::AgentStateTransitionLogic; +use crate::simulation::id::Id; +use crate::simulation::messaging::communication::communicators::SimCommunicator; +use crate::simulation::messaging::communication::message_broker::NetMessageBroker; +use crate::simulation::messaging::events::EventsPublisher; +use crate::simulation::network::sim_network::SimNetworkPartition; +use crate::simulation::vehicles::garage::Garage; +use crate::simulation::wire_types::events::Event; +use crate::simulation::wire_types::messages::Vehicle; +use crate::simulation::wire_types::population::attribute_value::Type; +use crate::simulation::wire_types::population::{Leg, Person}; +use crate::simulation::wire_types::vehicles::LevelOfDetail; +use nohash_hasher::{IntMap, IntSet}; +use std::cell::RefCell; +use std::rc::{Rc, Weak}; + +pub struct LegEngine { + teleportation_engine: TeleportationEngine, + network_engine: NetworkEngine, + garage: Garage, + net_message_broker: NetMessageBroker, + events: Rc>, + agent_state_transition_logic: Weak>>, + departure_handler: Vec>, + waiting_passengers: IntMap, +} + +impl LegEngine { + pub fn new( + network: SimNetworkPartition, + garage: Garage, + net_message_broker: NetMessageBroker, + events: Rc>, + passenger_modes: IntSet, + ) -> Self { + let departure_handler: Vec> = vec![ + Box::new(VehicularDepartureHandler { + events: events.clone(), + passenger_modes: passenger_modes.clone(), + }), + Box::new(PassengerDepartureHandler { + events: events.clone(), + passenger_modes: passenger_modes.clone(), + }), + Box::new(DrtDriverDepartureHandler { + events: events.clone(), + driver_agents: IntSet::default(), //TODO + }), + ]; + + LegEngine { + teleportation_engine: TeleportationEngine::new(events.clone()), + network_engine: NetworkEngine::new(network, events.clone()), + agent_state_transition_logic: Weak::new(), + garage, + net_message_broker, + events, + departure_handler, + waiting_passengers: IntMap::default(), + } + } + + pub(crate) fn do_step(&mut self, now: u32) { + let teleported_vehicles = self.teleportation_engine.do_step(now); + let network_vehicles = self.network_engine.move_nodes(now); + + let mut agents = vec![]; + agents.extend(self.publish_end_events(now, network_vehicles, true)); + agents.extend(self.publish_end_events(now, teleported_vehicles, false)); + + for mut agent in agents { + agent.advance_plan(); + + self.agent_state_transition_logic + .upgrade() + .unwrap() + .borrow_mut() + .arrange_next_agent_state(now, agent); + } + + self.network_engine + .move_links(now, &mut self.net_message_broker); + let sync_messages = self.net_message_broker.send_recv(now); + + for msg in sync_messages { + self.network_engine + .network + .apply_storage_cap_updates(msg.storage_capacities); + + for veh in msg.vehicles { + self.pass_vehicle_to_engine(now, veh, false); + } + } + } + + fn publish_end_events( + &mut self, + now: u32, + vehicles: Vec, + publish_leave_vehicle: bool, + ) -> Vec { + let mut agents = vec![]; + for veh in vehicles { + if publish_leave_vehicle { + self.events + .borrow_mut() + .publish_event(now, &Event::new_person_leaves_veh(veh.driver().id, veh.id)); + } + + for passenger in veh.passengers() { + self.events.borrow_mut().publish_event( + now, + &Event::new_passenger_dropped_off( + passenger.id, + passenger.curr_leg().mode, + 0, //TODO + veh.id, + ), + ); + if publish_leave_vehicle { + self.events + .borrow_mut() + .publish_event(now, &Event::new_person_leaves_veh(passenger.id, veh.id)); + } + } + agents.extend(self.garage.park_veh(veh)); + } + agents + } + + pub(crate) fn receive_agent(&mut self, now: u32, agent: Person) { + let mut departure_handler = None; + for dh in &mut self.departure_handler { + if dh.is_responsible(&agent) { + departure_handler = Some(dh); + break; + } + } + + let vehicle = departure_handler + .expect("No departure handler found") + .handle_departure(now, agent, &mut self.garage, &mut self.waiting_passengers); + + if let Some(vehicle) = vehicle { + self.pass_vehicle_to_engine(now, vehicle, true); + } + } + + pub(crate) fn set_agent_state_transition_logic( + &mut self, + agent_state_transition_logic: Weak>>, + ) { + self.agent_state_transition_logic = agent_state_transition_logic + } + + pub fn agents(&mut self) -> Vec<&mut Person> { + let mut agents = self.network_engine.network.active_agents(); + agents.extend(self.teleportation_engine.agents()); + agents + } + + fn pass_vehicle_to_engine(&mut self, now: u32, vehicle: Vehicle, route_begin: bool) { + let veh_type_id = Id::get(vehicle.r#type); + let veh_type = self.garage.vehicle_types.get(&veh_type_id).unwrap(); + + match veh_type.lod() { + LevelOfDetail::Network => { + self.network_engine + .receive_vehicle(now, vehicle, route_begin); + } + LevelOfDetail::Teleported => { + self.teleportation_engine.receive_vehicle( + now, + vehicle, + &mut self.net_message_broker, + ); + } + } + } + + pub fn net_message_broker(&self) -> &NetMessageBroker { + &self.net_message_broker + } + + pub fn network(&self) -> &SimNetworkPartition { + &self.network_engine.network + } +} + +trait DepartureHandler { + fn is_responsible(&self, agent: &Person) -> bool; + + fn handle_departure( + &mut self, + now: u32, + agent: Person, + garage: &mut Garage, + waiting_passengers: &mut IntMap, + ) -> Option; +} + +struct VehicularDepartureHandler { + events: Rc>, + passenger_modes: IntSet, +} + +impl DepartureHandler for VehicularDepartureHandler { + fn is_responsible(&self, agent: &Person) -> bool { + !self.passenger_modes.contains(&agent.curr_leg().mode) + } + + fn handle_departure( + &mut self, + now: u32, + agent: Person, + garage: &mut Garage, + _: &mut IntMap, + ) -> Option { + assert_ne!(agent.curr_plan_elem % 2, 0); + + let leg = agent.curr_leg(); + let route = leg.route.as_ref().unwrap(); + let leg_mode: Id = Id::get(leg.mode); + let veh_id = Id::get(route.veh_id); + + self.events.borrow_mut().publish_event( + now, + &Event::new_departure(agent.id, route.start_link(), leg_mode.internal()), + ); + + let veh_type_id = garage.vehicles.get(&veh_id).unwrap(); + match LevelOfDetail::try_from(garage.vehicle_types.get(veh_type_id).unwrap().lod).unwrap() { + LevelOfDetail::Network => { + self.events.borrow_mut().publish_event( + now, + &Event::new_person_enters_veh(agent.id, veh_id.internal()), + ); + } + _ => {} + } + + Some(garage.unpark_veh(agent, &veh_id)) + } +} + +struct PassengerDepartureHandler { + events: Rc>, + passenger_modes: IntSet, +} + +impl DepartureHandler for PassengerDepartureHandler { + fn is_responsible(&self, agent: &Person) -> bool { + self.passenger_modes.contains(&agent.curr_leg().mode) + } + + fn handle_departure( + &mut self, + now: u32, + agent: Person, + _: &mut Garage, + waiting_passengers: &mut IntMap, + ) -> Option { + let act_before = agent.previous_act(); + let leg = agent.curr_leg(); + let leg_mode: Id = Id::get(leg.mode); + self.events.borrow_mut().publish_event( + now, + &Event::new_departure(agent.id, act_before.link_id, leg_mode.internal()), + ); + + waiting_passengers.insert(agent.id, agent); + None + } +} + +struct DrtDriverDepartureHandler { + events: Rc>, + driver_agents: IntSet, +} + +impl DepartureHandler for DrtDriverDepartureHandler { + fn is_responsible(&self, agent: &Person) -> bool { + self.driver_agents.contains(&agent.id()) + } + + fn handle_departure( + &mut self, + now: u32, + agent: Person, + garage: &mut Garage, + waiting_passengers: &mut IntMap, + ) -> Option { + // remove passenger from waiting queue, place driver and passenger in vehicle and hand it over to leg engine + let passenger_id = match agent + .curr_leg() + .attributes + .get(Leg::PASSENGER_ID_ATTRIBUTE) + .expect("No passenger id found") + .r#type + .as_ref() + .unwrap() + { + Type::IntValue(id) => id, + _ => { + unreachable!() + } + }; + + let passengers: Vec = vec![waiting_passengers + .remove(&passenger_id) + .expect("No such passenger is waiting.")]; + + let leg = agent.curr_leg(); + let route = leg.route.as_ref().unwrap(); + let leg_mode: Id = Id::get(leg.mode); + let veh_id = agent.curr_leg().route.as_ref().unwrap().veh_id; + + // emit events for passengers + for passenger in &passengers { + let mode = passenger.curr_leg().mode; + self.events + .borrow_mut() + .publish_event(now, &Event::new_person_enters_veh(passenger.id, veh_id)); + self.events.borrow_mut().publish_event( + now, + &Event::new_passenger_picked_up(passenger.id, mode, 0, veh_id), + ); + } + + // emit event for driver + self.events.borrow_mut().publish_event( + now, + &Event::new_departure(agent.id, route.start_link(), leg_mode.internal()), + ); + + Some(garage.unpark_veh_with_passengers(agent, passengers, &Id::get(veh_id))) + } +} diff --git a/src/simulation/engines/mod.rs b/src/simulation/engines/mod.rs new file mode 100644 index 0000000..ce5dff2 --- /dev/null +++ b/src/simulation/engines/mod.rs @@ -0,0 +1,40 @@ +use crate::simulation::engines::activity_engine::ActivityEngine; +use crate::simulation::engines::leg_engine::LegEngine; +use crate::simulation::messaging::communication::communicators::SimCommunicator; +use crate::simulation::population::population_data::State; +use crate::simulation::wire_types::population::Person; +use std::cell::RefCell; +use std::rc::Rc; + +pub mod activity_engine; +pub mod leg_engine; +pub mod network_engine; +pub mod teleportation_engine; + +pub trait ReplanEngine { + fn do_sim_step(&mut self, now: u32, agents: &Vec<&mut Person>); +} + +pub struct AgentStateTransitionLogic { + activity_engine: Rc>>, + pub leg_engine: Rc>>, +} + +impl AgentStateTransitionLogic { + fn arrange_next_agent_state(&self, now: u32, agent: Person) { + match agent.state() { + State::ACTIVITY => self.activity_engine.borrow_mut().receive_agent(now, agent), + State::LEG => self.leg_engine.borrow_mut().receive_agent(now, agent), + } + } + + pub(crate) fn new( + activity_engine: Rc>>, + teleportation_engine: Rc>>, + ) -> Self { + AgentStateTransitionLogic { + activity_engine, + leg_engine: teleportation_engine, + } + } +} diff --git a/src/simulation/engines/network_engine.rs b/src/simulation/engines/network_engine.rs new file mode 100644 index 0000000..e588866 --- /dev/null +++ b/src/simulation/engines/network_engine.rs @@ -0,0 +1,54 @@ +use crate::simulation::messaging::communication::communicators::SimCommunicator; +use crate::simulation::messaging::communication::message_broker::NetMessageBroker; +use crate::simulation::messaging::events::EventsPublisher; +use crate::simulation::network::sim_network::SimNetworkPartition; +use crate::simulation::wire_types::messages::Vehicle; +use std::cell::RefCell; +use std::ops::DerefMut; +use std::rc::Rc; + +pub struct NetworkEngine { + pub(crate) network: SimNetworkPartition, + pub events: Rc>, +} + +impl NetworkEngine { + pub fn new(network: SimNetworkPartition, events: Rc>) -> Self { + NetworkEngine { network, events } + } + + pub fn receive_vehicle(&mut self, now: u32, vehicle: Vehicle, route_begin: bool) { + let events = if route_begin { + //if route has just begun, no link enter event should be published + None + } else { + //if route is already in progress, this method gets vehicles from another partition and should publish link enter event + //this is because the receiving partition is the owner of this link and should publish the event + Some(self.events.clone()) + }; + self.network.send_veh_en_route(vehicle, events, now) + } + + pub(crate) fn move_nodes(&mut self, now: u32) -> Vec { + let exited_vehicles = self + .network + .move_nodes(self.events.borrow_mut().deref_mut(), now); + exited_vehicles + } + + pub(crate) fn move_links( + &mut self, + now: u32, + net_message_broker: &mut NetMessageBroker, + ) { + let (vehicles, storage_cap_updates) = self.network.move_links(now); + + for veh in vehicles { + net_message_broker.add_veh(veh, now); + } + + for cap in storage_cap_updates { + net_message_broker.add_cap_update(cap, now); + } + } +} diff --git a/src/simulation/engines/teleportation_engine.rs b/src/simulation/engines/teleportation_engine.rs new file mode 100644 index 0000000..4e4e056 --- /dev/null +++ b/src/simulation/engines/teleportation_engine.rs @@ -0,0 +1,64 @@ +use crate::simulation::id::Id; +use crate::simulation::messaging::communication::communicators::SimCommunicator; +use crate::simulation::messaging::communication::message_broker::NetMessageBroker; +use crate::simulation::messaging::events::EventsPublisher; +use crate::simulation::simulation::Simulation; +use crate::simulation::time_queue::TimeQueue; +use crate::simulation::wire_types::events::Event; +use crate::simulation::wire_types::messages::Vehicle; +use crate::simulation::wire_types::population::Person; +use std::cell::RefCell; +use std::rc::Rc; + +pub struct TeleportationEngine { + queue: TimeQueue, + pub events: Rc>, +} + +impl TeleportationEngine { + pub fn new(events: Rc>) -> Self { + TeleportationEngine { + queue: TimeQueue::new(), + events, + } + } + + pub fn receive_vehicle( + &mut self, + now: u32, + mut vehicle: Vehicle, + net_message_broker: &mut NetMessageBroker, + ) { + if Simulation::is_local_route(&vehicle, net_message_broker) { + self.queue.add(vehicle, now); + } else { + // set the pointer of the route to the last element, so that the current link + // is the destination of this leg. Setting this to the last element makes this + // logic independent of whether the agent has a Generic-Route with only start + // and end link or a full Network-Route, which is often the case for ride modes. + vehicle.route_index_to_last(); + net_message_broker.add_veh(vehicle, now); + } + } + + pub fn do_step(&mut self, now: u32) -> Vec { + let teleportation_vehicles = self.queue.pop(now); + for vehicle in &teleportation_vehicles { + let agent = vehicle.driver.as_ref().unwrap(); + + // emmit travelled + let leg = agent.curr_leg(); + let route = leg.route.as_ref().unwrap(); + let mode: Id = Id::get(leg.mode); + self.events.borrow_mut().publish_event( + now, + &Event::new_travelled(agent.id, route.distance, mode.internal()), + ); + } + teleportation_vehicles + } + + pub fn agents(&self) -> Vec<&mut Person> { + todo!() + } +} diff --git a/src/simulation/io/xml_events.rs b/src/simulation/io/xml_events.rs index 4b51528..f7e8a64 100644 --- a/src/simulation/io/xml_events.rs +++ b/src/simulation/io/xml_events.rs @@ -92,6 +92,20 @@ impl XmlEventsWriter { e.distance, Id::::get(e.mode).external()) } + Type::PassengerPickedUp(e) => { + format!("\n", + Id::::get(e.person).external(), + Id::::get(e.mode).external(), + Id::::get(e.request).external(), + Id::::get(e.vehicle).external()) + } + Type::PassengerDroppedOff(e) => { + format!("\n", + Id::::get(e.person).external(), + Id::::get(e.mode).external(), + Id::::get(e.request).external(), + Id::::get(e.vehicle).external()) + } } } diff --git a/src/simulation/messaging/communication/message_broker.rs b/src/simulation/messaging/communication/message_broker.rs index 1dba0ba..593278e 100644 --- a/src/simulation/messaging/communication/message_broker.rs +++ b/src/simulation/messaging/communication/message_broker.rs @@ -359,6 +359,7 @@ mod tests { end_time: 0, sample_size: 0.0, stuck_threshold: 0, + passenger_modes: vec![], }; let broker = NetMessageBroker::new( Rc::new(communicator), diff --git a/src/simulation/messaging/events.rs b/src/simulation/messaging/events.rs index 239f6f7..e5572c4 100644 --- a/src/simulation/messaging/events.rs +++ b/src/simulation/messaging/events.rs @@ -5,13 +5,13 @@ use std::fmt::Debug; use tracing::{info, instrument}; use crate::simulation::wire_types::events::event::Type::{ - ActEnd, ActStart, Arrival, Departure, Generic, LinkEnter, LinkLeave, PersonEntersVeh, - PersonLeavesVeh, Travelled, + ActEnd, ActStart, Arrival, Departure, Generic, LinkEnter, LinkLeave, PassengerDroppedOff, + PassengerPickedUp, PersonEntersVeh, PersonLeavesVeh, Travelled, }; use crate::simulation::wire_types::events::{ ActivityEndEvent, ActivityStartEvent, ArrivalEvent, DepartureEvent, Event, GenericEvent, - LinkEnterEvent, LinkLeaveEvent, PersonEntersVehicleEvent, PersonLeavesVehicleEvent, - TravelledEvent, + LinkEnterEvent, LinkLeaveEvent, PassengerDroppedOffEvent, PassengerPickedUpEvent, + PersonEntersVehicleEvent, PersonLeavesVehicleEvent, TravelledEvent, }; pub trait EventsSubscriber { @@ -203,4 +203,26 @@ impl Event { })), } } + + pub fn new_passenger_picked_up(person: u64, mode: u64, request: u64, vehicle: u64) -> Event { + Event { + r#type: Some(PassengerPickedUp(PassengerPickedUpEvent { + person, + mode, + request, + vehicle, + })), + } + } + + pub fn new_passenger_dropped_off(person: u64, mode: u64, request: u64, vehicle: u64) -> Event { + Event { + r#type: Some(PassengerDroppedOff(PassengerDroppedOffEvent { + person, + mode, + request, + vehicle, + })), + } + } } diff --git a/src/simulation/messaging/messages.rs b/src/simulation/messaging/messages.rs index b5bea72..ecf3f41 100644 --- a/src/simulation/messaging/messages.rs +++ b/src/simulation/messaging/messages.rs @@ -126,6 +126,10 @@ impl Vehicle { self.driver.as_ref().unwrap() } + pub fn passengers(&self) -> &Vec { + &self.passengers + } + pub fn id(&self) -> usize { self.id as usize } diff --git a/src/simulation/mod.rs b/src/simulation/mod.rs index 15ea9c3..6059eaa 100644 --- a/src/simulation/mod.rs +++ b/src/simulation/mod.rs @@ -1,5 +1,6 @@ pub mod config; pub mod controller; +pub mod engines; pub mod id; pub mod io; pub mod logging; diff --git a/src/simulation/network/link.rs b/src/simulation/network/link.rs index cc5912a..e881560 100644 --- a/src/simulation/network/link.rs +++ b/src/simulation/network/link.rs @@ -165,7 +165,7 @@ struct VehicleQEntry { } impl LocalLink { - pub fn from_link(link: &Link, effective_cell_size: f32, config: config::Simulation) -> Self { + pub fn from_link(link: &Link, effective_cell_size: f32, config: &config::Simulation) -> Self { LocalLink::new( link.id.clone(), link.capacity, @@ -200,7 +200,7 @@ impl LocalLink { perm_lanes: f32, length: f64, effective_cell_size: f32, - config: config::Simulation, + config: &config::Simulation, from: Id, to: Id, ) -> Self { @@ -392,7 +392,7 @@ mod sim_link_tests { 3., 100., 7.5, - test_utils::config(), + &test_utils::config(), Id::new_internal(1), Id::new_internal(2), )); @@ -414,7 +414,7 @@ mod sim_link_tests { 3., 100., 7.5, - test_utils::config(), + &test_utils::config(), Id::new_internal(1), Id::new_internal(2), )); @@ -445,7 +445,7 @@ mod sim_link_tests { 3., 100., 7.5, - test_utils::config(), + &test_utils::config(), Id::new_internal(1), Id::new_internal(2), )); @@ -485,7 +485,7 @@ mod sim_link_tests { 3., 100., 7.5, - test_utils::config(), + &test_utils::config(), Id::new_internal(1), Id::new_internal(2), )); @@ -515,7 +515,7 @@ mod sim_link_tests { 1., 15.0, 10.0, - test_utils::config(), + &test_utils::config(), Id::new_internal(0), Id::new_internal(0), )); @@ -549,6 +549,7 @@ mod sim_link_tests { end_time: 0, sample_size: 1.0, stuck_threshold, + passenger_modes: vec![], }; let mut link = SimLink::Local(LocalLink::new( Id::create("stuck-link"), @@ -557,7 +558,7 @@ mod sim_link_tests { 1.0, 10.0, 7.5, - config, + &config, Id::create("from-node"), Id::create("to-node"), )); @@ -587,6 +588,7 @@ mod sim_link_tests { end_time: 0, sample_size: 1.0, stuck_threshold, + passenger_modes: vec![], }; let mut link = SimLink::Local(LocalLink::new( Id::create("stuck-link"), @@ -595,7 +597,7 @@ mod sim_link_tests { 1.0, earliest_exit as f64, 7.5, - config, + &config, Id::create("from-node"), Id::create("to-node"), )); diff --git a/src/simulation/network/sim_network.rs b/src/simulation/network/sim_network.rs index b1dd6cd..92902c1 100644 --- a/src/simulation/network/sim_network.rs +++ b/src/simulation/network/sim_network.rs @@ -1,4 +1,6 @@ +use std::cell::RefCell; use std::collections::HashSet; +use std::rc::Rc; use nohash_hasher::{IntMap, IntSet}; use rand::rngs::ThreadRng; @@ -68,7 +70,7 @@ impl SimNetworkPartition { link, partition, global_network.effective_cell_size, - config, + &config, global_network, ), ) @@ -96,7 +98,7 @@ impl SimNetworkPartition { link: &Link, partition: u32, effective_cell_size: f32, - config: config::Simulation, + config: &config::Simulation, global_network: &Network, ) -> SimLink { let from_part = global_network.get_node(&link.from).partition; //all_nodes.get(link.from.internal()).unwrap().partition; @@ -151,6 +153,26 @@ impl SimNetworkPartition { self.active_links.len() } + pub fn active_agents(&mut self) -> Vec<&mut Person> { + // one has to iterate here over all links and filter then, because otherwise the borrow checker will complain + // something like self.active_links.map(|id| self.links.get(id)) borrows self mutably in FnMut closure + + // self.links + // .iter_mut() + // .filter(|(id, link)| self.active_links.contains(id)) + // .map(|(_, link)| link) + // .map(|link| match link { + // SimLink::Local(ll) => ll, + // SimLink::In(il) => &mut il.local_link, + // SimLink::Out(ol) => todo!(), + // }) + // .flat_map(|link| link.q.iter_mut()) + // .map(|v| &mut v.vehicle) + // .flat_map(|v| v.passengers.iter_mut().chain(v.driver.iter_mut())) + // .collect::>() + vec![] + } + pub fn veh_on_net(&self) -> usize { self.veh_counter } @@ -175,7 +197,7 @@ impl SimNetworkPartition { pub fn send_veh_en_route( &mut self, vehicle: Vehicle, - events_publisher: Option<&mut EventsPublisher>, + events_publisher: Option>>, now: u32, ) { let link_id = vehicle.curr_link_id().unwrap_or_else(|| { @@ -194,7 +216,7 @@ impl SimNetworkPartition { }); if let Some(publisher) = events_publisher { - publisher.publish_event( + publisher.borrow_mut().publish_event( now, &Event::new_link_enter(link.id().internal(), vehicle.id), ); diff --git a/src/simulation/population/population_data.rs b/src/simulation/population/population_data.rs index 510427d..2d44838 100644 --- a/src/simulation/population/population_data.rs +++ b/src/simulation/population/population_data.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use tracing::debug; use crate::simulation::id::Id; @@ -12,6 +13,11 @@ use crate::simulation::wire_types::messages::Vehicle; use crate::simulation::wire_types::population::{Activity, Leg, Person, Plan, Route}; use crate::simulation::wire_types::vehicles::VehicleType; +pub enum State { + ACTIVITY, + LEG, +} + impl Person { pub fn from_io(io_person: &IOPerson) -> Person { let person_id = Id::get_from_ext(&io_person.id); @@ -41,6 +47,14 @@ impl Person { self.id } + pub fn state(&self) -> State { + if self.curr_plan_elem % 2 == 0 { + State::ACTIVITY + } else { + State::LEG + } + } + pub fn add_act_after_curr(&mut self, to_add: Vec) { let next_act_index = self.next_act_index() as usize; self.plan @@ -71,6 +85,11 @@ impl Person { self.get_act_at_index(act_index) } + pub fn previous_act(&self) -> &Activity { + let act_index = self.next_act_index() - 1; + self.get_act_at_index(act_index) + } + pub fn curr_act_mut(&mut self) -> &mut Activity { if self.curr_plan_elem % 2 != 0 { panic!("Current element is not an activity"); @@ -118,6 +137,11 @@ impl Person { .unwrap() } + pub fn previous_leg(&self) -> &Leg { + let leg_index = self.next_leg_index() - 1; + self.get_leg_at_index(leg_index) + } + pub fn next_leg(&self) -> &Leg { let next_leg_index = self.next_leg_index(); self.get_leg_at_index(next_leg_index) @@ -337,6 +361,8 @@ impl Activity { } impl Leg { + pub const PASSENGER_ID_ATTRIBUTE: &'static str = "passenger_id"; + fn from_io(io_leg: &IOLeg, person_id: &Id) -> Self { let routing_mode_ext = Attrs::find_or_else_opt(&io_leg.attributes, "routingMode", || "car"); @@ -350,6 +376,7 @@ impl Leg { trav_time: Self::parse_trav_time(&io_leg.trav_time, &io_leg.route.trav_time), dep_time: parse_time_opt(&io_leg.dep_time), routing_mode: routing_mode.internal(), + attributes: HashMap::new(), } } @@ -360,6 +387,7 @@ impl Leg { trav_time, dep_time, routing_mode: 0, + attributes: HashMap::new(), } } @@ -374,6 +402,7 @@ impl Leg { distance: 0.0, route: Vec::new(), }), + attributes: HashMap::new(), } } diff --git a/src/simulation/simulation.rs b/src/simulation/simulation.rs index fa0f7fa..8eea81b 100644 --- a/src/simulation/simulation.rs +++ b/src/simulation/simulation.rs @@ -1,9 +1,14 @@ +use std::cell::RefCell; use std::fmt::Debug; use std::fmt::Formatter; +use std::rc::Rc; -use tracing::{info, instrument}; +use tracing::info; use crate::simulation::config::Config; +use crate::simulation::engines::activity_engine::ActivityEngine; +use crate::simulation::engines::leg_engine::LegEngine; +use crate::simulation::engines::{AgentStateTransitionLogic, ReplanEngine}; use crate::simulation::id::Id; use crate::simulation::messaging::communication::communicators::SimCommunicator; use crate::simulation::messaging::communication::message_broker::NetMessageBroker; @@ -13,22 +18,14 @@ use crate::simulation::population::population::Population; use crate::simulation::replanning::replanner::Replanner; use crate::simulation::time_queue::TimeQueue; use crate::simulation::vehicles::garage::Garage; -use crate::simulation::wire_types::events::Event; use crate::simulation::wire_types::messages::Vehicle; -use crate::simulation::wire_types::population::Person; -use crate::simulation::wire_types::vehicles::LevelOfDetail; -pub struct Simulation -where - C: SimCommunicator, -{ - activity_q: TimeQueue, - teleportation_q: TimeQueue, - network: SimNetworkPartition, - garage: Garage, - net_message_broker: NetMessageBroker, - events: EventsPublisher, - replanner: Box, +pub struct Simulation { + activity_engine: Rc>>, + leg_engine: Rc>>, + internal_interface: Rc>>, + events: Rc>, + replan_engines: Vec>, start_time: u32, end_time: u32, } @@ -43,7 +40,7 @@ where garage: Garage, mut population: Population, net_message_broker: NetMessageBroker, - events: EventsPublisher, + events: Rc>, replanner: Box, ) -> Self { let mut activity_q = TimeQueue::new(); @@ -56,27 +53,56 @@ where activity_q.add(agent, config.simulation().start_time); } - Simulation { + let activity_engine: Rc>> = Rc::new(RefCell::new( + ActivityEngine::new(activity_q, events.clone()), + )); + + let passenger_modes = config + .simulation() + .passenger_modes + .iter() + .map(|mode| Id::::get_from_ext(mode).internal()) + .collect(); + + let leg_engine = Rc::new(RefCell::new(LegEngine::new( network, garage, - teleportation_q: TimeQueue::new(), - activity_q, net_message_broker, + events.clone(), + passenger_modes, + ))); + + let internal_interface = Rc::new(RefCell::new(AgentStateTransitionLogic::new( + activity_engine.clone(), + leg_engine.clone(), + ))); + + activity_engine + .borrow_mut() + .set_agent_state_transition_logic(Rc::downgrade(&internal_interface)); + leg_engine + .borrow_mut() + .set_agent_state_transition_logic(Rc::downgrade(&internal_interface)); + + Simulation { + activity_engine, + leg_engine, + internal_interface, events, - replanner, + replan_engines: vec![], start_time: config.simulation().start_time, end_time: config.simulation().end_time, } } - #[tracing::instrument(level = "info", skip(self), fields(rank = self.net_message_broker.rank()))] + #[tracing::instrument(level = "info", skip(self), fields(rank = self.leg_engine.borrow().net_message_broker().rank()))] pub fn run(&mut self) { // use fixed start and end times let mut now = self.start_time; info!( "Starting #{}. Network neighbors: {:?}, Start time {}, End time {}", - self.net_message_broker.rank(), - self.network.neighbors(), + self.leg_engine.borrow().net_message_broker().rank(), + self.leg_engine.borrow().network().neighbors(), self.start_time, self.end_time, ); @@ -87,185 +113,37 @@ where let _min = (now % 3600) / 60; info!( "#{} of Qsim at {_hour:02}:{_min:02}; Active Nodes: {}, Active Links: {}, Vehicles on Network Partition: {}", - self.net_message_broker.rank(), - self.network.active_nodes(), - self.network.active_links(), - self.network.veh_on_net() + self.leg_engine.borrow().net_message_broker().rank(), + self.leg_engine.borrow().network().active_nodes(), + self.leg_engine.borrow().network().active_links(), + self.leg_engine.borrow().network().veh_on_net() ); } - self.wakeup(now); - self.terminate_teleportation(now); - self.move_nodes(now); - self.move_links(now); - - self.replanner.update_time(now, &mut self.events); - - now += 1; - } - - // maybe this belongs into the controller? Then this would have to be a &mut instead of owned. - self.events.finish(); - } - - #[tracing::instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] - fn wakeup(&mut self, now: u32) { - let agents = self.activity_q.pop(now); - - for mut agent in agents { - self.update_agent(&mut agent, now); - - let act_type: Id = Id::get(agent.curr_act().act_type); - self.events.publish_event( - now, - &Event::new_act_end(agent.id, agent.curr_act().link_id, act_type.internal()), - ); - - let mut vehicle = self.departure(agent, now); - let veh_type_id = Id::get(vehicle.r#type); - let veh_type = self.garage.vehicle_types.get(&veh_type_id).unwrap(); - - match veh_type.lod() { - LevelOfDetail::Network => { - self.events.publish_event( - now, - &Event::new_person_enters_veh(vehicle.driver().id, vehicle.id), - ); - //we don't pass the event publisher because a link enter event should not be published - self.network.send_veh_en_route(vehicle, None, now); - } - LevelOfDetail::Teleported => { - if Simulation::is_local_route(&vehicle, &self.net_message_broker) { - self.teleportation_q.add(vehicle, now); - } else { - // set the pointer of the route to the last element, so that the current link - // is the destination of this leg. Setting this to the last element makes this - // logic independent of whether the agent has a Generic-Route with only start - // and end link or a full Network-Route, which is often the case for ride modes. - vehicle.route_index_to_last(); - self.net_message_broker.add_veh(vehicle, now); - } - } - } - } - } - - fn departure(&mut self, mut agent: Person, now: u32) -> Vehicle { - //here, current element counter is going to be increased - agent.advance_plan(); - - assert_ne!(agent.curr_plan_elem % 2, 0); - let leg = agent.curr_leg(); - let route = leg.route.as_ref().unwrap(); - let leg_mode: Id = Id::get(leg.mode); - self.events.publish_event( - now, - &Event::new_departure(agent.id, route.start_link(), leg_mode.internal()), - ); - - let veh_id = Id::get(route.veh_id); - self.garage.unpark_veh(agent, &veh_id) - } - - fn update_agent(&mut self, agent: &mut Person, now: u32) { - self.replanner.replan(now, agent, &self.garage) - } - - #[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] - fn terminate_teleportation(&mut self, now: u32) { - let teleportation_vehicles = self.teleportation_q.pop(now); - for vehicle in teleportation_vehicles { - // park the vehice - get the agent out of the vehicle - let mut agent = self.garage.park_veh(vehicle); - - // emmit travelled - let leg = agent.curr_leg(); - let route = leg.route.as_ref().unwrap(); - let mode: Id = Id::get(leg.mode); - self.events.publish_event( - now, - &Event::new_travelled(agent.id, route.distance, mode.internal()), - ); - - // emmit arrival - self.events.publish_event( - now, - &Event::new_arrival(agent.id, route.end_link(), mode.internal()), - ); - - // advance plan to activity and put agent into activity q. - agent.advance_plan(); - - // emmit act start event - let act = agent.curr_act(); - let act_type: Id = Id::get(act.act_type); - self.events.publish_event( - now, - &Event::new_act_start(agent.id, act.link_id, act_type.internal()), - ); - self.activity_q.add(agent, now); - } - } - - //#[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] - fn move_nodes(&mut self, now: u32) { - let exited_vehicles = self.network.move_nodes(&mut self.events, now); - - for veh in exited_vehicles { - self.events - .publish_event(now, &Event::new_person_leaves_veh(veh.driver().id, veh.id)); - let veh_type_id = Id::get(veh.r#type); - let veh_type = self.garage.vehicle_types.get(&veh_type_id).unwrap(); - let mode = veh_type.net_mode; - let mut agent = self.garage.park_veh(veh); + // let mut act_ref = self.activity_engine.borrow_mut(); + // let mut leg_ref = self.leg_engine.borrow_mut(); - // move to next activity - agent.advance_plan(); - let act = agent.curr_act(); - self.events - .publish_event(now, &Event::new_arrival(agent.id, act.link_id, mode)); - let act_type: Id = Id::get(act.act_type); - self.events.publish_event( - now, - &Event::new_act_start(agent.id, act.link_id, act_type.internal()), - ); - self.activity_q.add(agent, now); - } - } + // let mut agents = act_ref.agents(); + // agents.extend(leg_ref.agents()); + // + // for engine in &mut self.replan_engines { + // engine.do_sim_step(now, &agents); + // } - #[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] - fn move_links(&mut self, now: u32) { - let (vehicles, storage_cap_updates) = self.network.move_links(now); + self.activity_engine.borrow_mut().do_step(now); + self.leg_engine.borrow_mut().do_step(now); - for veh in vehicles { - self.net_message_broker.add_veh(veh, now); - } + //TODO + // self.replanner.update_time(now, &mut self.events); - for cap in storage_cap_updates { - self.net_message_broker.add_cap_update(cap, now); + now += 1; } - let sync_messages = self.net_message_broker.send_recv(now); - - for msg in sync_messages { - self.network - .apply_storage_cap_updates(msg.storage_capacities); - - for veh in msg.vehicles { - let veh_type_id = Id::get(veh.r#type); - let veh_type = self.garage.vehicle_types.get(&veh_type_id).unwrap(); - match veh_type.lod() { - LevelOfDetail::Network => { - self.network - .send_veh_en_route(veh, Some(&mut self.events), now) - } - LevelOfDetail::Teleported => self.teleportation_q.add(veh, now), - } - } - } + // maybe this belongs into the controller? Then this would have to be a &mut instead of owned. + self.events.borrow_mut().finish(); } - fn is_local_route(veh: &Vehicle, message_broker: &NetMessageBroker) -> bool { + pub(crate) fn is_local_route(veh: &Vehicle, message_broker: &NetMessageBroker) -> bool { let leg = veh.driver.as_ref().unwrap().curr_leg(); let route = leg.route.as_ref().unwrap(); let to = message_broker.rank_for_link(route.end_link()); @@ -273,12 +151,12 @@ where } } -impl Debug for Simulation { +impl Debug for Simulation { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, "Simulation with Rank #{}", - self.net_message_broker.rank() + self.leg_engine.borrow().net_message_broker().rank() ) } } diff --git a/src/simulation/vehicles/garage.rs b/src/simulation/vehicles/garage.rs index 8263b6d..885aa4f 100644 --- a/src/simulation/vehicles/garage.rs +++ b/src/simulation/vehicles/garage.rs @@ -87,7 +87,7 @@ impl Garage { Id::get_from_ext(&external) } - pub(crate) fn park_veh(&mut self, vehicle: Vehicle) -> Person { + pub(crate) fn park_veh(&mut self, vehicle: Vehicle) -> Vec { /*let id = self.vehicle_ids.get(vehicle.id); let veh_type = self.vehicle_type_ids.get(vehicle.r#type); let garage_veh = GarageVehicle { id, veh_type }; @@ -98,11 +98,18 @@ impl Garage { // the above logic would park a vehicle within a garage. This only works if we have mass // conservation enabled. The scenario we're testing with doesn't. Therfore, we just take - // the agent out of the vehicle and pretend we have parked the car. - vehicle.driver.unwrap() + // the agents out of the vehicle and pretend we have parked the car. + let mut agents = vehicle.passengers; + agents.push(vehicle.driver.unwrap()); + agents } - pub fn unpark_veh(&mut self, person: Person, id: &Id) -> Vehicle { + pub fn unpark_veh_with_passengers( + &mut self, + person: Person, + passengers: Vec, + id: &Id, + ) -> Vehicle { let veh_type_id = self .vehicles .get(id) @@ -134,9 +141,13 @@ impl Garage { max_v: veh_type.max_v, pce: veh_type.pce, driver: Some(person), - passengers: vec![], + passengers, } } + + pub fn unpark_veh(&mut self, person: Person, id: &Id) -> Vehicle { + self.unpark_veh_with_passengers(person, vec![], id) + } } #[cfg(test)] diff --git a/src/simulation/wire_types/events.proto b/src/simulation/wire_types/events.proto index 512c776..6f3b99b 100644 --- a/src/simulation/wire_types/events.proto +++ b/src/simulation/wire_types/events.proto @@ -18,6 +18,8 @@ message Event { DepartureEvent departure = 8; ArrivalEvent arrival = 9; TravelledEvent travelled = 10; + PassengerPickedUpEvent passengerPickedUp = 11; + PassengerDroppedOffEvent passengerDroppedOff = 12; } } @@ -76,4 +78,18 @@ message TravelledEvent { uint64 person = 1; uint64 mode = 3; double distance = 2; +} + +message PassengerPickedUpEvent{ + uint64 person = 1; + uint64 mode = 2; + uint64 request = 3; + uint64 vehicle = 4; +} + +message PassengerDroppedOffEvent{ + uint64 person = 1; + uint64 mode = 2; + uint64 request = 3; + uint64 vehicle = 4; } \ No newline at end of file diff --git a/src/simulation/wire_types/population.proto b/src/simulation/wire_types/population.proto index e520aeb..962dfdb 100644 --- a/src/simulation/wire_types/population.proto +++ b/src/simulation/wire_types/population.proto @@ -33,10 +33,21 @@ message Leg { optional uint32 dep_time = 3; uint32 trav_time = 4; Route route = 5; + map attributes = 6; } message Route { uint64 veh_id = 1; double distance = 2; repeated uint64 route = 3; +} + +message AttributeValue { + oneof type { + uint64 int_value = 1; + string string_value = 2; + double double_value = 3; + bool bool_value = 4; + // to be expanded + } } \ No newline at end of file diff --git a/src/test_utils.rs b/src/test_utils.rs index 7c40db9..25584ff 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -48,5 +48,6 @@ pub fn config() -> config::Simulation { end_time: 0, sample_size: 1.0, stuck_threshold: u32::MAX, + passenger_modes: vec![], } } diff --git a/tests/test_simulation.rs b/tests/test_simulation.rs index dbe1d3b..331b18b 100644 --- a/tests/test_simulation.rs +++ b/tests/test_simulation.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::cell::RefCell; use std::fs::File; use std::io::{BufRead, BufReader}; use std::path::PathBuf; @@ -100,9 +101,11 @@ pub fn execute_sim( ); let sim_net = SimNetworkPartition::from_network(&network, rank, config.simulation()); - let mut events = EventsPublisher::new(); - events.add_subscriber(test_subscriber); - events.add_subscriber(Box::new(TravelTimeCollector::new())); + let events = Rc::new(RefCell::new(EventsPublisher::new())); + events.borrow_mut().add_subscriber(test_subscriber); + events + .borrow_mut() + .add_subscriber(Box::new(TravelTimeCollector::new())); let rc = Rc::new(comm); let broker = NetMessageBroker::new(rc.clone(), &network, &sim_net); @@ -146,18 +149,6 @@ fn try_join(mut handles: IntMap>) { } } -struct EmptySubscriber {} - -impl EventsSubscriber for EmptySubscriber { - fn receive_event(&mut self, _time: u32, _event: &Event) { - // nothing. - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } -} - pub struct TestSubscriber { next_index: usize, expected_events: Vec,