diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 34b9806a26d2..7b55e8809638 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -16,6 +16,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
+use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
@@ -2307,6 +2308,31 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}
+async fn force_aux_policy_switch_handler(
+ mut r: Request
,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
+ check_permission(&r, None)?;
+ let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
+ let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
+ let policy: AuxFilePolicy = json_request(&mut r).await?;
+
+ let state = get_state(&r);
+
+ let tenant = state
+ .tenant_manager
+ .get_attached_tenant_shard(tenant_shard_id)?;
+ tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
+ let timeline =
+ active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
+ .await?;
+ timeline
+ .do_switch_aux_policy(policy)
+ .map_err(ApiError::InternalServerError)?;
+
+ json_response(StatusCode::OK, ())
+}
+
async fn put_io_engine_handler(
mut r: Request,
_cancel: CancellationToken,
@@ -2384,19 +2410,9 @@ async fn list_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
- let process = || async move {
- let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
- let files = timeline.list_aux_files(body.lsn, &ctx).await?;
- Ok::<_, anyhow::Error>(files)
- };
-
- match process().await {
- Ok(st) => json_response(StatusCode::OK, st),
- Err(err) => json_response(
- StatusCode::INTERNAL_SERVER_ERROR,
- ApiError::InternalServerError(err).to_string(),
- ),
- }
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
+ let files = timeline.list_aux_files(body.lsn, &ctx).await?;
+ json_response(StatusCode::OK, files)
}
async fn ingest_aux_files(
@@ -2414,24 +2430,22 @@ async fn ingest_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
- let process = || async move {
- let mut modification = timeline.begin_modification(Lsn(
- timeline.get_last_record_lsn().0 + 8
- ) /* advance LSN by 8 */);
- let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
- for (fname, content) in body.aux_files {
- modification
- .put_file(&fname, content.as_bytes(), &ctx)
- .await?;
- }
- modification.commit(&ctx).await?;
- Ok::<_, anyhow::Error>(())
- };
-
- match process().await {
- Ok(st) => json_response(StatusCode::OK, st),
- Err(err) => Err(ApiError::InternalServerError(err)),
+ let mut modification = timeline.begin_modification(
+ Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
+ );
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
+ for (fname, content) in body.aux_files {
+ modification
+ .put_file(&fname, content.as_bytes(), &ctx)
+ .await
+ .map_err(ApiError::InternalServerError)?;
}
+ modification
+ .commit(&ctx)
+ .await
+ .map_err(ApiError::InternalServerError)?;
+
+ json_response(StatusCode::OK, ())
}
/// Report on the largest tenants on this pageserver, for the storage controller to identify
@@ -2814,6 +2828,10 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
+ .put(
+ "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
+ |r| api_handler(r, force_aux_policy_switch_handler),
+ )
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs
index 7dea687c469e..afba34c6d154 100644
--- a/pageserver/src/pgdatadir_mapping.rs
+++ b/pageserver/src/pgdatadir_mapping.rs
@@ -1489,14 +1489,7 @@ impl<'a> DatadirModification<'a> {
if aux_files_key_v1.is_empty() {
None
} else {
- self.tline
- .last_aux_file_policy
- .store(Some(AuxFilePolicy::V1));
- self.tline
- .remote_client
- .schedule_index_upload_for_aux_file_policy_update(Some(
- AuxFilePolicy::V1,
- ))?;
+ self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
} else {
@@ -1504,10 +1497,7 @@ impl<'a> DatadirModification<'a> {
};
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
- self.tline.last_aux_file_policy.store(Some(switch_policy));
- self.tline
- .remote_client
- .schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
+ self.tline.do_switch_aux_policy(switch_policy)?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index caf26e0a0b95..2bb199b228fc 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -5999,6 +5999,67 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn aux_file_policy_force_switch() {
+ let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap();
+ harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
+ let (tenant, ctx) = harness.load().await;
+
+ let mut lsn = Lsn(0x08);
+
+ let tline: Arc = tenant
+ .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ tline.last_aux_file_policy.load(),
+ None,
+ "no aux file is written so it should be unset"
+ );
+
+ {
+ lsn += 8;
+ let mut modification = tline.begin_modification(lsn);
+ modification
+ .put_file("pg_logical/mappings/test1", b"first", &ctx)
+ .await
+ .unwrap();
+ modification.commit(&ctx).await.unwrap();
+ }
+
+ tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();
+
+ assert_eq!(
+ tline.last_aux_file_policy.load(),
+ Some(AuxFilePolicy::V2),
+ "dirty index_part.json reflected state is yet to be updated"
+ );
+
+ // lose all data from v1
+ let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
+ assert_eq!(files.get("pg_logical/mappings/test1"), None);
+
+ {
+ lsn += 8;
+ let mut modification = tline.begin_modification(lsn);
+ modification
+ .put_file("pg_logical/mappings/test2", b"second", &ctx)
+ .await
+ .unwrap();
+ modification.commit(&ctx).await.unwrap();
+ }
+
+ // read data ingested in v2
+ let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
+ assert_eq!(
+ files.get("pg_logical/mappings/test2"),
+ Some(&bytes::Bytes::from_static(b"second"))
+ );
+ // lose all data from v1
+ assert_eq!(files.get("pg_logical/mappings/test1"), None);
+ }
+
#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap();
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index 1f8ee9ffc4e7..63d03b3c68ae 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -4606,6 +4606,14 @@ impl Timeline {
) -> Result, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}
+
+ /// Switch aux file policy and schedule upload to the index part.
+ pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
+ self.last_aux_file_policy.store(Some(policy));
+ self.remote_client
+ .schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
+ Ok(())
+ }
}
/// Top-level failure to compact.