Skip to content

Commit

Permalink
Add update timestamp to Shard model (#5332)
Browse files Browse the repository at this point in the history
* Add update timestamp to shard model

* Generate timestamp in metastore node
  • Loading branch information
rdettai authored Aug 27, 2024
1 parent 0d4d143 commit 948329b
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 20 deletions.
7 changes: 3 additions & 4 deletions docs/internals/backward-compatibility.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Backward compatibility in Quickwit.

If you are reading this, chances are you want to make a change to one of the resource
of quickwit's meta/config:
of Quickwit's meta/config:

User edited:
- QuickwitConfig
Expand All @@ -19,7 +19,7 @@ Quickwit currently manages backward compatibility of all of these resources but
This document describes how to handle a change, and how to make test such a change,
and spot eventual regression.

# How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`?
## How do I update `{IndexMetadata, SplitMetadata, FileBackedIndex, SourceConfig, IndexConfig}`?

There are two types of upgrades.

Expand All @@ -45,6 +45,7 @@ non-regression.

When introducing such a change:
- modify your model with the help of the attributes above.
- modify the example for the model by editing its `TestableForRegression` trait implementation.
- commit the 2 files that were updated by build.rs
- eyeball the diff on the `.expected.json` that failed, and send it with your PR.

Expand Down Expand Up @@ -121,5 +122,3 @@ most recent version.

The unit test will start making sense in future updates thanks to the update phase
described in the previous section.


4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
}],
}],
};
Expand Down Expand Up @@ -2054,6 +2055,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
}],
}],
};
Expand Down Expand Up @@ -2342,6 +2344,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
}),
}],
};
Expand Down Expand Up @@ -2495,6 +2498,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
}),
}],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ impl IngestController {
doc_mapping_uid: Some(doc_mapping_uid),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 0, // assigned later by the metastore
};
let init_shard_subrequest = InitShardSubrequest {
subrequest_id: subrequest_id as u32,
Expand Down Expand Up @@ -2136,6 +2137,7 @@ mod tests {
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
};
let response = OpenShardsResponse {
subresponses: vec![OpenShardSubresponse {
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(10u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand All @@ -752,6 +753,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand All @@ -776,6 +778,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -787,6 +790,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(12u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1078,6 +1082,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::eof(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -1089,6 +1094,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning.as_eof()),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1221,6 +1227,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
Shard {
leader_id: "test-ingester-0".to_string(),
Expand All @@ -1232,6 +1239,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::eof(22u64)),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
},
],
};
Expand Down Expand Up @@ -1575,6 +1583,7 @@ mod tests {
doc_mapping_uid: Some(DocMappingUid::default()),
publish_position_inclusive: Some(Position::Beginning),
publish_token: Some(publish_token.to_string()),
update_timestamp: 1724158996,
}],
};
Ok(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub mod shared_state_for_tests {
doc_mapping_uid: sub_req.doc_mapping_uid,
publish_position_inclusive: Some(position),
shard_state: ShardState::Open as i32,
update_timestamp: 1724158996,
}),
}
})
Expand Down Expand Up @@ -233,6 +234,7 @@ pub mod shared_state_for_tests {
doc_mapping_uid: None,
publish_position_inclusive: Some(position),
shard_state: ShardState::Open as i32,
update_timestamp: 1724158996,
}
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,7 @@ mod tests {
doc_mapping_uid: Some(doc_mapping_uid),
publish_position_inclusive: None,
publish_token: None,
update_timestamp: 1724158996,
};
let init_shards_request = InitShardsRequest {
subrequests: vec![InitShardSubrequest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE shards
DROP IF EXISTS COLUMN update_timestamps;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE shards
-- We prefer a fix value here because it makes tests simpler.
-- Very few users use the shard API in versions <0.9 anyway.
ADD COLUMN IF NOT EXISTS update_timestamp TIMESTAMP NOT NULL DEFAULT '2024-01-01 00:00:00+00';
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex {
follower_id: Some("follower-ingester".to_string()),
doc_mapping_uid: Some(DocMappingUid::for_test(1)),
publish_position_inclusive: Some(Position::Beginning),
update_timestamp: 1724240908,
..Default::default()
};
let shards = Shards::from_shards_vec(index_uid.clone(), source_id.clone(), vec![shard]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use quickwit_proto::metastore::{
OpenShardSubrequest, OpenShardSubresponse,
};
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
use time::OffsetDateTime;
use tracing::{info, warn};

use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta};
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Shards {
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: subrequest.publish_token.clone(),
update_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
};
mutation_occurred = true;
entry.insert(shard.clone());
Expand Down Expand Up @@ -288,6 +290,7 @@ impl Shards {
shard.shard_state = ShardState::Closed as i32;
}
shard.publish_position_inclusive = Some(publish_position_inclusive);
shard.update_timestamp = OffsetDateTime::now_utc().unix_timestamp();
}
Ok(MutationOccurred::Yes(()))
}
Expand Down
44 changes: 36 additions & 8 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId,
use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType};
use sea_query_binder::SqlxBinder;
use sqlx::{Acquire, Executor, Postgres, Transaction};
use time::OffsetDateTime;
use tracing::{debug, info, instrument, warn};

use super::error::convert_sqlx_err;
Expand Down Expand Up @@ -255,13 +256,15 @@ async fn try_apply_delta_v2(
shard_ids.push(shard_id.to_string());
new_positions.push(new_position.to_string());
}

sqlx::query(
r#"
UPDATE
shards
SET
publish_position_inclusive = new_positions.position,
shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END
shard_state = CASE WHEN new_positions.position LIKE '~%' THEN 'closed' ELSE shards.shard_state END,
update_timestamp = $5
FROM
UNNEST($3, $4)
AS new_positions(shard_id, position)
Expand All @@ -275,6 +278,8 @@ async fn try_apply_delta_v2(
.bind(source_id)
.bind(shard_ids)
.bind(new_positions)
// Use a timestamp generated by the metastore node to avoid clock drift issues
.bind(OffsetDateTime::now_utc())
.execute(tx.as_mut())
.await?;
Ok(())
Expand Down Expand Up @@ -1638,6 +1643,8 @@ async fn open_or_fetch_shard<'e>(
.bind(&subrequest.follower_id)
.bind(subrequest.doc_mapping_uid)
.bind(&subrequest.publish_token)
// Use a timestamp generated by the metastore node to avoid clock drift issues
.bind(OffsetDateTime::now_utc())
.fetch_optional(executor.clone())
.await?;

Expand Down Expand Up @@ -1794,16 +1801,37 @@ mod tests {
const INSERT_SHARD_QUERY: &str = include_str!("queries/shards/insert.sql");

for shard in shards {
assert_eq!(&shard.source_id, source_id);
assert_eq!(shard.index_uid(), index_uid);
// explicit destructuring to ensure new fields are properly handled
let Shard {
doc_mapping_uid,
follower_id,
index_uid,
leader_id,
publish_position_inclusive,
publish_token,
shard_id,
shard_state,
source_id,
update_timestamp,
} = shard;
let shard_state_name = ShardState::from_i32(shard_state)
.unwrap()
.as_json_str_name();
let update_timestamp = OffsetDateTime::from_unix_timestamp(update_timestamp)
.expect("Bad timestamp format");
sqlx::query(INSERT_SHARD_QUERY)
.bind(index_uid)
.bind(source_id)
.bind(shard.shard_id())
.bind(shard.shard_state().as_json_str_name())
.bind(&shard.leader_id)
.bind(&shard.follower_id)
.bind(shard.doc_mapping_uid)
.bind(&shard.publish_position_inclusive().to_string())
.bind(&shard.publish_token)
.bind(shard_id.unwrap())
.bind(shard_state_name)
.bind(leader_id)
.bind(follower_id)
.bind(doc_mapping_uid)
.bind(publish_position_inclusive.unwrap().to_string())
.bind(publish_token)
.bind(update_timestamp)
.execute(&self.connection_pool)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub(super) struct PgShard {
pub doc_mapping_uid: DocMappingUid,
pub publish_position_inclusive: String,
pub publish_token: Option<String>,
pub update_timestamp: sqlx::types::time::PrimitiveDateTime,
}

impl From<PgShard> for Shard {
Expand All @@ -277,6 +278,7 @@ impl From<PgShard> for Shard {
doc_mapping_uid: Some(pg_shard.doc_mapping_uid),
publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()),
publish_token: pg_shard.publish_token,
update_timestamp: pg_shard.update_timestamp.assume_utc().unix_timestamp(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token)
VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9)
INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token, update_timestamp)
VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9, $10)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid, publish_token, update_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT
DO NOTHING
RETURNING
Expand Down
Loading

0 comments on commit 948329b

Please sign in to comment.