Skip to content

Commit e9896c4

Browse files
Merge pull request #138 from matsim-vsp/streaming-population
Streaming population
2 parents cdbd70e + dbc5edc commit e9896c4

32 files changed

+338
-168
lines changed
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<vehicleDefinitions xmlns="http://www.matsim.org/files/dtd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://www.matsim.org/files/dtd http://www.matsim.org/files/dtd/vehicleDefinitions_v2.0.xsd">
4+
<vehicleType id="car">
5+
<attributes>
6+
</attributes>
7+
<description>abc</description>
8+
<length meter="9.5"/>
9+
<width meter="3.0"/>
10+
<maximumVelocity meterPerSecond="20.0"/>
11+
<passengerCarEquivalents pce="1.0"/>
12+
<networkMode networkMode="car"/>
13+
<flowEfficiencyFactor factor="1.5"/>
14+
</vehicleType>
15+
<vehicleType id="bike">
16+
<attributes>
17+
</attributes>
18+
<description>This is a bike</description>
19+
<length meter="2.0"/>
20+
<width meter="1.0"/>
21+
<maximumVelocity meterPerSecond="5.0"/>
22+
<passengerCarEquivalents pce="0.25"/>
23+
<networkMode networkMode="bike"/>
24+
<flowEfficiencyFactor factor="1.5"/>
25+
</vehicleType>
26+
<vehicleType id="walk">
27+
<attributes>
28+
<attribute name="lod" class="java.lang.String">teleported</attribute>
29+
</attributes>
30+
<description>This is a pair of shoes</description>
31+
<length meter="0.5"/>
32+
<width meter="1.0"/>
33+
<maximumVelocity meterPerSecond="1.2"/>
34+
<passengerCarEquivalents pce="0.1"/>
35+
<networkMode networkMode="walk"/>
36+
<flowEfficiencyFactor factor="10.0"/>
37+
</vehicleType>
38+
</vehicleDefinitions>

src/bin/convert_to_binary.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ fn main() {
2828
rust_q_sim::simulation::logging::init_std_out_logging();
2929
let args = InputArgs::parse();
3030

31-
let mut net = Network::from_file_path(&args.network, 1, PartitionMethod::None);
3231
let mut veh = Garage::from_file(&args.vehicles);
32+
let mut net = Network::from_file_path(&args.network, 1, PartitionMethod::None);
3333
let pop = Population::from_file(&args.population, &mut veh);
3434

3535
let cmp_weights = compute_computational_weights(&pop);
@@ -39,6 +39,7 @@ fn main() {
3939
net.to_file(&create_file_path(&args, "network"));
4040
veh.to_file(&create_file_path(&args, "vehicles"));
4141
pop.to_file(&create_file_path(&args, "plans"));
42+
info!("Finished conversion. Exiting.")
4243
}
4344

4445
fn create_file_path(args: &InputArgs, extension: &str) -> PathBuf {

src/simulation/controller.rs

+6-43
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ use nohash_hasher::IntMap;
1010
use tracing::info;
1111

1212
use crate::simulation::config::{CommandLineArgs, Config, PartitionMethod, RoutingMode};
13-
use crate::simulation::id::Id;
1413
use crate::simulation::io::proto_events::ProtoEventsWriter;
1514
use crate::simulation::messaging::communication::communicators::{
1615
ChannelSimCommunicator, MpiSimCommunicator, SimCommunicator,
1716
};
1817
use crate::simulation::messaging::communication::message_broker::NetMessageBroker;
1918
use crate::simulation::messaging::events::EventsPublisher;
20-
use crate::simulation::network::global_network::{Link, Network};
19+
use crate::simulation::network::global_network::Network;
2120
use crate::simulation::network::sim_network::SimNetworkPartition;
2221
use crate::simulation::population::population::Population;
2322
use crate::simulation::replanning::replanner::{DummyReplanner, ReRouteTripReplanner, Replanner};
@@ -110,13 +109,11 @@ fn execute_partition<C: SimCommunicator + 'static>(comm: C, args: &CommandLineAr
110109
));
111110
let mut garage = Garage::from_file(&PathBuf::from(config.proto_files().vehicles));
112111

113-
let population: Population = Population::from_file(
114-
&get_numbered_output_filename(
115-
&output_path,
116-
&PathBuf::from(config.proto_files().population),
117-
rank,
118-
),
112+
let population = Population::from_file_filtered_part(
113+
&PathBuf::from(config.proto_files().population),
114+
&network,
119115
&mut garage,
116+
comm.rank(),
120117
);
121118

122119
let network_partition = SimNetworkPartition::from_network(&network, rank, config.simulation());
@@ -183,15 +180,13 @@ fn try_join(mut handles: IntMap<u32, JoinHandle<()>>) {
183180

184181
pub fn partition_input(config: &Config) {
185182
id::load_from_file(&PathBuf::from(config.proto_files().ids));
186-
let net = if let PartitionMethod::Metis(_) = config.partitioning().method {
183+
let _net = if let PartitionMethod::Metis(_) = config.partitioning().method {
187184
info!("Config param Partition method was set to metis. Loading input network, running metis conversion and then store it into output folder");
188185
partition_network(config)
189186
} else {
190187
info!("Config param Partition method was set to none. Loading network from input, assuming it has partitioning information");
191188
copy_network_into_output(config)
192189
};
193-
194-
partition_population(config, &net);
195190
}
196191

197192
fn partition_network(config: &Config) -> Network {
@@ -217,38 +212,6 @@ fn copy_network_into_output(config: &Config) -> Network {
217212
network
218213
}
219214

220-
fn partition_population(config: &Config, network: &Network) {
221-
info!(
222-
"Partition population into {} parts",
223-
config.partitioning().num_parts
224-
);
225-
let pop_in_path = PathBuf::from(config.proto_files().population);
226-
let num_parts = config.partitioning().num_parts;
227-
let pop = Population::from_file(&pop_in_path, &mut Garage::new());
228-
let mut part_pops = vec![];
229-
for _i in 0..num_parts {
230-
part_pops.push(Population::new())
231-
}
232-
233-
let pop_out_path =
234-
create_output_filename(&PathBuf::from(config.output().output_dir), &pop_in_path);
235-
236-
for (id, person) in pop.persons.into_iter() {
237-
let link_id: Id<Link> = Id::get(person.curr_act().link_id);
238-
let partition = network.get_link(&link_id).partition;
239-
part_pops
240-
.get_mut(partition as usize)
241-
.unwrap()
242-
.persons
243-
.insert(id, person);
244-
}
245-
246-
for (i, population) in part_pops.iter().enumerate() {
247-
let part_out_path = insert_number_in_proto_filename(&pop_out_path, i as u32);
248-
population.to_file(&part_out_path);
249-
}
250-
}
251-
252215
pub fn get_numbered_output_filename(output_dir: &Path, input_file: &Path, part: u32) -> PathBuf {
253216
let out = create_output_filename(output_dir, input_file);
254217
insert_number_in_proto_filename(&out, part)

src/simulation/io/proto.rs

+34-30
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,39 @@ pub fn write_to_file<T: Message>(message: T, path: &Path) {
3737
info!("Finished writing message to file: {path:?}");
3838
}
3939

40+
pub fn read_delimiter<R>(reader: &mut BufReader<R>) -> Option<usize>
41+
where
42+
R: Read + Seek,
43+
{
44+
// read the delimiter of the message. Prost says delimiter is between 1 and 10 bytes
45+
// so, read the first 10 bytes of the buffer
46+
let mut delim_buffer: [u8; 10] = [0; 10];
47+
// this could crash
48+
match reader.read_exact(&mut delim_buffer) {
49+
Ok(_) => {} // go on.
50+
Err(e) => match e.kind() {
51+
ErrorKind::UnexpectedEof => return None,
52+
_ => {
53+
panic!("Error while reading file: {}", e);
54+
}
55+
},
56+
};
57+
58+
let delimiter =
59+
prost::decode_length_delimiter(delim_buffer.as_slice()).expect("error reading delimiter");
60+
61+
// since the delimiter is a varint figure out how many bytes the delimiter was actually taking
62+
// up in the buffer. Set the buffers position to the first byte after the delimiter, which
63+
// should be the start of the TimeStep message
64+
let delim_encoded_len = prost::encoding::encoded_len_varint(delimiter as u64) as i64;
65+
let offset = delim_encoded_len - (delim_buffer.len() as i64);
66+
reader
67+
.seek_relative(offset)
68+
.expect("Seeking relative failed");
69+
70+
Some(delimiter)
71+
}
72+
4073
pub struct MessageIter<T, R>
4174
where
4275
T: Message + Default,
@@ -54,7 +87,7 @@ where
5487
type Item = T;
5588

5689
fn next(&mut self) -> Option<Self::Item> {
57-
if let Some(delimiter) = self.read_delimiter() {
90+
if let Some(delimiter) = read_delimiter(&mut self.internal_reader) {
5891
let mut bytes: Vec<u8> = vec![0; delimiter];
5992
self.internal_reader
6093
.read_exact(&mut bytes)
@@ -78,33 +111,4 @@ where
78111
internal_reader: BufReader::new(reader),
79112
}
80113
}
81-
fn read_delimiter(&mut self) -> Option<usize> {
82-
// read the delimiter of the message. Prost says delimiter is between 1 and 10 bytes
83-
// so, read the first 10 bytes of the buffer
84-
let mut delim_buffer: [u8; 10] = [0; 10];
85-
// this could crash
86-
match self.internal_reader.read_exact(&mut delim_buffer) {
87-
Ok(_) => {} // go on.
88-
Err(e) => match e.kind() {
89-
ErrorKind::UnexpectedEof => return None,
90-
_ => {
91-
panic!("Error while reading file: {}", e);
92-
}
93-
},
94-
};
95-
96-
let delimiter = prost::decode_length_delimiter(delim_buffer.as_slice())
97-
.expect("error reading delimiter");
98-
99-
// since the delimiter is a varint figure out how many bytes the delimiter was actually taking
100-
// up in the buffer. Set the buffers position to the first byte after the delimiter, which
101-
// should be the start of the TimeStep message
102-
let delim_encoded_len = prost::encoding::encoded_len_varint(delimiter as u64) as i64;
103-
let offset = delim_encoded_len - (delim_buffer.len() as i64);
104-
self.internal_reader
105-
.seek_relative(offset)
106-
.expect("Seeking relative failed");
107-
108-
Some(delimiter)
109-
}
110114
}

0 commit comments

Comments
 (0)