From 4a278cce7ce5b7f32360e85fd41219df95cc9a86 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 22 May 2024 15:05:26 -0400 Subject: [PATCH] chore(pageserver): add force aux file policy switch handler (#7842) For existing users, we want to allow doing a force switch for their aux file policy. Part of #7462 --------- Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 78 ++++++++++++++++++----------- pageserver/src/pgdatadir_mapping.rs | 14 +----- pageserver/src/tenant.rs | 61 ++++++++++++++++++++++ pageserver/src/tenant/timeline.rs | 8 +++ 4 files changed, 119 insertions(+), 42 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 34b9806a26d2..7b55e8809638 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -16,6 +16,7 @@ use hyper::header; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::IngestAuxFilesRequest; use pageserver_api::models::ListAuxFilesRequest; use pageserver_api::models::LocationConfig; @@ -2307,6 +2308,31 @@ async fn post_tracing_event_handler( json_response(StatusCode::OK, ()) } +async fn force_aux_policy_switch_handler( + mut r: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + check_permission(&r, None)?; + let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?; + let policy: AuxFilePolicy = json_request(&mut r).await?; + + let state = get_state(&r); + + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + let timeline = + active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) + .await?; + timeline + .do_switch_aux_policy(policy) + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + async fn put_io_engine_handler( mut r: Request, _cancel: CancellationToken, @@ -2384,19 +2410,9 @@ async fn list_aux_files( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let process = || async move { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let files = timeline.list_aux_files(body.lsn, &ctx).await?; - Ok::<_, anyhow::Error>(files) - }; - - match process().await { - Ok(st) => json_response(StatusCode::OK, st), - Err(err) => json_response( - StatusCode::INTERNAL_SERVER_ERROR, - ApiError::InternalServerError(err).to_string(), - ), - } + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let files = timeline.list_aux_files(body.lsn, &ctx).await?; + json_response(StatusCode::OK, files) } async fn ingest_aux_files( @@ -2414,24 +2430,22 @@ async fn ingest_aux_files( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let process = || async move { - let mut modification = timeline.begin_modification(Lsn( - timeline.get_last_record_lsn().0 + 8 - ) /* advance LSN by 8 */); - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - for (fname, content) in body.aux_files { - modification - .put_file(&fname, content.as_bytes(), &ctx) - .await?; - } - modification.commit(&ctx).await?; - Ok::<_, anyhow::Error>(()) - }; - - match process().await { - Ok(st) => json_response(StatusCode::OK, st), - Err(err) => Err(ApiError::InternalServerError(err)), + let mut modification = timeline.begin_modification( + Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */ + ); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + for (fname, content) in body.aux_files { + modification + .put_file(&fname, content.as_bytes(), &ctx) + .await + .map_err(ApiError::InternalServerError)?; } + modification + .commit(&ctx) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) } /// Report on the largest tenants on this pageserver, for the storage controller to identify @@ -2814,6 +2828,10 @@ pub fn make_router( |r| api_handler(r, timeline_collect_keyspace), ) .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler)) + .put( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch", + |r| api_handler(r, force_aux_policy_switch_handler), + ) .get("/v1/utilization", |r| api_handler(r, get_utilization)) .post( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files", diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 7dea687c469e..afba34c6d154 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1489,14 +1489,7 @@ impl<'a> DatadirModification<'a> { if aux_files_key_v1.is_empty() { None } else { - self.tline - .last_aux_file_policy - .store(Some(AuxFilePolicy::V1)); - self.tline - .remote_client - .schedule_index_upload_for_aux_file_policy_update(Some( - AuxFilePolicy::V1, - ))?; + self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?; Some(AuxFilePolicy::V1) } } else { @@ -1504,10 +1497,7 @@ impl<'a> DatadirModification<'a> { }; if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) { - self.tline.last_aux_file_policy.store(Some(switch_policy)); - self.tline - .remote_client - .schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?; + self.tline.do_switch_aux_policy(switch_policy)?; info!(current=?current_policy, next=?switch_policy, "switching aux file policy"); switch_policy } else { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index caf26e0a0b95..2bb199b228fc 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5999,6 +5999,67 @@ mod tests { ); } + #[tokio::test] + async fn aux_file_policy_force_switch() { + let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap(); + harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1; + let (tenant, ctx) = harness.load().await; + + let mut lsn = Lsn(0x08); + + let tline: Arc = tenant + .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + assert_eq!( + tline.last_aux_file_policy.load(), + None, + "no aux file is written so it should be unset" + ); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test1", b"first", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap(); + + assert_eq!( + tline.last_aux_file_policy.load(), + Some(AuxFilePolicy::V2), + "dirty index_part.json reflected state is yet to be updated" + ); + + // lose all data from v1 + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!(files.get("pg_logical/mappings/test1"), None); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test2", b"second", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + // read data ingested in v2 + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test2"), + Some(&bytes::Bytes::from_static(b"second")) + ); + // lose all data from v1 + assert_eq!(files.get("pg_logical/mappings/test1"), None); + } + #[tokio::test] async fn aux_file_policy_auto_detect() { let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1f8ee9ffc4e7..63d03b3c68ae 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4606,6 +4606,14 @@ impl Timeline { ) -> Result, anyhow::Error> { detach_ancestor::complete(self, tenant, prepared, ctx).await } + + /// Switch aux file policy and schedule upload to the index part. + pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> { + self.last_aux_file_policy.store(Some(policy)); + self.remote_client + .schedule_index_upload_for_aux_file_policy_update(Some(policy))?; + Ok(()) + } } /// Top-level failure to compact.