Skip to content

Commit 2066211

Browse files
committed
Track sent components for each entity
We'll need this for per-component visibility, but this change is useful on its own because it fixes replication for rules with multiple components that were inserted on different ticks. For example, you have a replication rule for `(A, B)`. You spawn an entity with `A`, and on the next tick you insert `B`. Without this change, `B` is replicated, but `A` is wrongly considered as sent. I added a test for it.
1 parent 394b438 commit 2066211

File tree

7 files changed

+298
-139
lines changed

7 files changed

+298
-139
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2020
### Fixed
2121

2222
- Avoid sending despawn for entities that weren't sent.
23+
- Replication for rules with multiple components when their insertions were split across multiple ticks.
2324

2425
### Removed
2526

src/server.rs

Lines changed: 99 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ use core::{mem, ops::Range, time::Duration};
1111
use bevy::{
1212
ecs::{
1313
archetype::Archetypes,
14-
component::CheckChangeTicks,
14+
component::{CheckChangeTicks, ComponentId},
1515
entity::{Entities, EntityHashMap},
1616
intern::Interned,
1717
schedule::ScheduleLabel,
1818
system::SystemChangeTick,
1919
},
20+
platform::collections::{HashSet, hash_map::Entry},
2021
prelude::*,
2122
ptr::Ptr,
2223
time::common_conditions::on_timer,
@@ -35,9 +36,9 @@ use crate::{
3536
backend::channels::ClientChannel,
3637
message::server_message::message_buffer::MessageBuffer,
3738
replication::{
38-
client_ticks::ClientTicks,
39+
client_ticks::{ClientTicks, EntityTicks},
3940
registry::{
40-
ReplicationRegistry, component_fns::ComponentFns, ctx::SerializeCtx,
41+
FnsId, ReplicationRegistry, component_fns::ComponentFns, ctx::SerializeCtx,
4142
rule_fns::UntypedRuleFns,
4243
},
4344
rules::{ReplicationRules, component::ComponentRule},
@@ -241,7 +242,9 @@ fn cleanup_acks(
241242
let min_timestamp = time.elapsed().saturating_sub(mutations_timeout);
242243
for mut ticks in &mut clients {
243244
ticks.cleanup_older_mutations(min_timestamp, |mutate_info| {
244-
mutate_info.entities.clear();
245+
for (_, components) in mutate_info.entities.drain(..) {
246+
pools.components.push(components);
247+
}
245248
pools.entities.push(mem::take(&mut mutate_info.entities));
246249
});
247250
}
@@ -263,7 +266,9 @@ fn receive_acks(
263266
)
264267
});
265268
if let Some(mut entities) = ticks.ack_mutate_message(client, mutate_index) {
266-
entities.clear();
269+
for (_, components) in entities.drain(..) {
270+
pools.components.push(components);
271+
}
267272
pools.entities.push(entities);
268273
}
269274
}
@@ -309,7 +314,9 @@ fn check_mutation_ticks(check: On<CheckChangeTicks>, mut clients: Query<&mut Cli
309314
check.present_tick()
310315
);
311316
for mut ticks in &mut clients {
312-
ticks.check_mutation_ticks(*check);
317+
for entity_ticks in ticks.entities.values_mut() {
318+
entity_ticks.system_tick.check_tick(*check);
319+
}
313320
}
314321
}
315322

@@ -349,8 +356,13 @@ fn send_replication(
349356
}
350357

351358
collect_mappings(&mut serialized, &mut clients, &despawn_buffer, &entities)?;
352-
collect_despawns(&mut serialized, &mut clients, &mut despawn_buffer)?;
353-
collect_removals(&mut serialized, &mut clients, &removal_buffer)?;
359+
collect_despawns(
360+
&mut serialized,
361+
&mut pools,
362+
&mut clients,
363+
&mut despawn_buffer,
364+
)?;
365+
collect_removals(&mut serialized, &mut pools, &mut clients, &removal_buffer)?;
354366
collect_changes(
355367
&mut serialized,
356368
&mut pools,
@@ -503,6 +515,7 @@ fn collect_mappings(
503515
/// Collect entity despawns from this tick into update messages.
504516
fn collect_despawns(
505517
serialized: &mut SerializedData,
518+
pools: &mut ClientPools,
506519
clients: &mut Query<(
507520
Entity,
508521
&mut Updates,
@@ -519,11 +532,12 @@ fn collect_despawns(
519532
for (client_entity, mut message, .., mut ticks, mut priority, mut visibility) in
520533
&mut *clients
521534
{
522-
if ticks.remove_entity(entity) {
535+
if let Some(entity_ticks) = ticks.entities.remove(&entity) {
523536
// Write despawn only if the entity was previously sent because
524537
// spawn and despawn could happen during the same tick.
525538
trace!("writing despawn for `{entity}` for client `{client_entity}`");
526539
message.add_despawn(entity_range.clone());
540+
pools.component_sets.push(entity_ticks.components);
527541
}
528542
visibility.remove_despawned(entity);
529543
priority.remove(&entity);
@@ -532,10 +546,11 @@ fn collect_despawns(
532546

533547
for (client_entity, mut message, .., mut ticks, mut priority, mut visibility) in clients {
534548
for entity in visibility.drain_lost() {
535-
if ticks.remove_entity(entity) {
549+
if let Some(entity_ticks) = ticks.entities.remove(&entity) {
536550
trace!("writing visibility lost for `{entity}` for client `{client_entity}`");
537551
let entity_range = serialized.write_entity(entity)?;
538552
message.add_despawn(entity_range);
553+
pools.component_sets.push(entity_ticks.components);
539554
}
540555
priority.remove(&entity);
541556
}
@@ -547,6 +562,7 @@ fn collect_despawns(
547562
/// Collects component removals from this tick into update messages.
548563
fn collect_removals(
549564
serialized: &mut SerializedData,
565+
pools: &mut ClientPools,
550566
clients: &mut Query<(
551567
Entity,
552568
&mut Updates,
@@ -559,15 +575,29 @@ fn collect_removals(
559575
removal_buffer: &RemovalBuffer,
560576
) -> Result<()> {
561577
for (&entity, remove_ids) in removal_buffer.iter() {
562-
let entity_range = serialized.write_entity(entity)?;
563-
let ids_len = remove_ids.len();
564-
let fn_ids = serialized.write_fn_ids(remove_ids.iter().map(|&(_, fns_id)| fns_id))?;
565-
for (client_entity, mut message, .., visibility) in &mut *clients {
566-
if !visibility.is_hidden(entity) {
567-
trace!(
568-
"writing removals for `{entity}` with `{remove_ids:?}` for client `{client_entity}`"
569-
);
570-
message.add_removals(entity_range.clone(), ids_len, fn_ids.clone());
578+
let mut entity_range = None;
579+
for (_, mut message, ..) in &mut *clients {
580+
message.start_entity_removals();
581+
}
582+
583+
for &(component_id, fns_id) in remove_ids {
584+
let mut fns_id_range = None;
585+
for (client_entity, mut message, .., mut ticks, _, _) in &mut *clients {
586+
// Only send removals for components that were previously sent.
587+
let Some(entity_ticks) = ticks.entities.get_mut(&entity) else {
588+
continue;
589+
};
590+
if !entity_ticks.components.remove(&component_id) {
591+
continue;
592+
}
593+
594+
trace!("writing `{fns_id:?}` removal for `{entity}` for client `{client_entity}`");
595+
if !message.removals_entity_added() {
596+
let entity_range = write_entity_cached(&mut entity_range, serialized, entity)?;
597+
message.add_removals_entity(pools, entity_range);
598+
}
599+
let fns_id_range = write_fns_id_cached(&mut fns_id_range, serialized, fns_id)?;
600+
message.add_removal(fns_id_range);
571601
}
572602
}
573603
}
@@ -636,17 +666,15 @@ fn collect_changes(
636666
if visibility.is_hidden(entity.id()) {
637667
continue;
638668
}
639-
640-
if let Some((last_system_tick, last_server_tick)) =
641-
client_ticks.mutation_tick(entity.id())
642-
&& !ticks.is_added(change_tick.last_run(), change_tick.this_run())
669+
if let Some(entity_ticks) = client_ticks.entities.get(&entity.id())
670+
&& entity_ticks.components.contains(&component_id)
643671
{
644672
let base_priority = priority.get(&entity.id()).copied().unwrap_or(1.0);
645673

646-
let tick_diff = server_tick - last_server_tick;
674+
let tick_diff = server_tick - entity_ticks.server_tick;
647675
if rule.mode != ReplicationMode::Once
648676
&& base_priority * tick_diff as f32 >= 1.0
649-
&& ticks.is_changed(last_system_tick, change_tick.this_run())
677+
&& ticks.is_changed(entity_ticks.system_tick, change_tick.this_run())
650678
{
651679
trace!(
652680
"writing `{:?}` mutation for `{}` for client `{client_entity}`",
@@ -695,7 +723,7 @@ fn collect_changes(
695723
rule,
696724
component,
697725
)?;
698-
updates.add_inserted_component(component_range);
726+
updates.add_inserted_component(component_range, component_id);
699727
}
700728
}
701729
}
@@ -706,8 +734,9 @@ fn collect_changes(
706734
if visibility.is_hidden(entity.id()) {
707735
continue;
708736
}
709-
let new_for_client = ticks.is_new_for_client(entity.id());
710737

738+
let entity_entry = ticks.entities.entry(entity.id());
739+
let new_for_client = matches!(entity_entry, Entry::Vacant(_));
711740
if new_for_client
712741
|| updates.changed_entity_added()
713742
|| removal_buffer.contains_key(&entity.id())
@@ -721,7 +750,26 @@ fn collect_changes(
721750
);
722751
updates.take_added_entity(pools, &mut mutations);
723752
}
724-
ticks.set_mutation_tick(entity.id(), change_tick.this_run(), server_tick);
753+
754+
match entity_entry {
755+
Entry::Occupied(entry) => {
756+
let entity_ticks = entry.into_mut();
757+
entity_ticks.system_tick = change_tick.this_run();
758+
entity_ticks.server_tick = server_tick;
759+
entity_ticks
760+
.components
761+
.extend(updates.drain_changed_entity_ids());
762+
}
763+
Entry::Vacant(entry) => {
764+
let mut components = pools.component_sets.pop().unwrap_or_default();
765+
components.extend(updates.drain_changed_entity_ids());
766+
entry.insert(EntityTicks {
767+
server_tick,
768+
system_tick: change_tick.this_run(),
769+
components,
770+
});
771+
}
772+
}
725773
}
726774

727775
if new_for_client && !updates.changed_entity_added() {
@@ -755,7 +803,7 @@ fn should_send_mapping(
755803
return false;
756804
}
757805

758-
signature.is_added() || ticks.is_new_for_client(entity)
806+
signature.is_added() || !ticks.entities.contains_key(&entity)
759807
}
760808

761809
/// Writes a mapping or re-uses previously written range if exists.
@@ -791,6 +839,22 @@ fn write_entity_cached(
791839
Ok(range)
792840
}
793841

842+
/// Writes an ID or re-uses previously written range if exists.
843+
fn write_fns_id_cached(
844+
fns_id_range: &mut Option<Range<usize>>,
845+
serialized: &mut SerializedData,
846+
fns_id: FnsId,
847+
) -> Result<Range<usize>> {
848+
if let Some(range) = fns_id_range.clone() {
849+
return Ok(range);
850+
}
851+
852+
let range = serialized.write_fns_id(fns_id)?;
853+
*fns_id_range = Some(range.clone());
854+
855+
Ok(range)
856+
}
857+
794858
/// Writes a component or re-uses previously written range if exists.
795859
fn write_component_cached(
796860
component_range: &mut Option<Range<usize>>,
@@ -910,8 +974,12 @@ pub struct PriorityMap(EntityHashMap<f32>);
910974
/// All data is cleared before the insertion.
911975
#[derive(Resource, Default)]
912976
struct ClientPools {
913-
/// Entities from [`MutateInfo`](crate::shared::replication::client_ticks::MutateInfo)s.
914-
entities: Vec<Vec<Entity>>,
977+
/// Entities with components from [`MutateInfo`](crate::shared::replication::client_ticks::MutateInfo)s.
978+
entities: Vec<Vec<(Entity, Vec<ComponentId>)>>,
979+
/// Components from [`Self::entities`].
980+
components: Vec<Vec<ComponentId>>,
981+
/// Components from [`ClientTicks`].
982+
component_sets: Vec<HashSet<ComponentId>>,
915983
/// Ranges from [`Updates`] and [`Mutations`].
916984
ranges: Vec<Vec<Range<usize>>>,
917985
/// Entities from [`Mutations`].

0 commit comments

Comments
 (0)