Skip to content

Commit

Permalink
chore(pageserver): add force aux file policy switch handler (#7842)
Browse files Browse the repository at this point in the history
For existing users, we want to allow doing a force switch for their aux
file policy.

Part of #7462 

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh authored May 22, 2024
1 parent f98fdd2 commit 4a278cc
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 42 deletions.
78 changes: 48 additions & 30 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
Expand Down Expand Up @@ -2307,6 +2308,31 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}

async fn force_aux_policy_switch_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
let policy: AuxFilePolicy = json_request(&mut r).await?;

let state = get_state(&r);

let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
timeline
.do_switch_aux_policy(policy)
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, ())
}

async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
Expand Down Expand Up @@ -2384,19 +2410,9 @@ async fn list_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let process = || async move {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
Ok::<_, anyhow::Error>(files)
};

match process().await {
Ok(st) => json_response(StatusCode::OK, st),
Err(err) => json_response(
StatusCode::INTERNAL_SERVER_ERROR,
ApiError::InternalServerError(err).to_string(),
),
}
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
json_response(StatusCode::OK, files)
}

async fn ingest_aux_files(
Expand All @@ -2414,24 +2430,22 @@ async fn ingest_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let process = || async move {
let mut modification = timeline.begin_modification(Lsn(
timeline.get_last_record_lsn().0 + 8
) /* advance LSN by 8 */);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
for (fname, content) in body.aux_files {
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await?;
}
modification.commit(&ctx).await?;
Ok::<_, anyhow::Error>(())
};

match process().await {
Ok(st) => json_response(StatusCode::OK, st),
Err(err) => Err(ApiError::InternalServerError(err)),
let mut modification = timeline.begin_modification(
Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
for (fname, content) in body.aux_files {
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
}
modification
.commit(&ctx)
.await
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, ())
}

/// Report on the largest tenants on this pageserver, for the storage controller to identify
Expand Down Expand Up @@ -2814,6 +2828,10 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
)
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
Expand Down
14 changes: 2 additions & 12 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,25 +1489,15 @@ impl<'a> DatadirModification<'a> {
if aux_files_key_v1.is_empty() {
None
} else {
self.tline
.last_aux_file_policy
.store(Some(AuxFilePolicy::V1));
self.tline
.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(
AuxFilePolicy::V1,
))?;
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
} else {
current_policy
};

if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.last_aux_file_policy.store(Some(switch_policy));
self.tline
.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
self.tline.do_switch_aux_policy(switch_policy)?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
Expand Down
61 changes: 61 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5999,6 +5999,67 @@ mod tests {
);
}

#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;

let mut lsn = Lsn(0x08);

let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);

{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}

tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();

assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"dirty index_part.json reflected state is yet to be updated"
);

// lose all data from v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);

{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}

// read data ingested in v2
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// lose all data from v1
assert_eq!(files.get("pg_logical/mappings/test1"), None);
}

#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap();
Expand Down
8 changes: 8 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4606,6 +4606,14 @@ impl Timeline {
) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}

/// Switch aux file policy and schedule upload to the index part.
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
self.last_aux_file_policy.store(Some(policy));
self.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
Ok(())
}
}

/// Top-level failure to compact.
Expand Down

1 comment on commit 4a278cc

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3196 tests run: 3055 passed, 1 failed, 140 skipped (full report)


Failures on Postgres 14

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg14-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 31.4% (6450 of 20515 functions)
  • lines: 48.3% (49854 of 103237 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
4a278cc at 2024-05-22T20:22:51.711Z :recycle:

Please sign in to comment.