Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pageserver): add force aux file policy switch handler #7842

Merged
merged 7 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
Expand Down Expand Up @@ -2307,6 +2308,31 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}

async fn force_aux_policy_switch_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
let policy: AuxFilePolicy = json_request(&mut r).await?;

let state = get_state(&r);

let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
timeline
.do_switch_aux_policy(policy)
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, ())
}

async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
Expand Down Expand Up @@ -2384,19 +2410,9 @@ async fn list_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let process = || async move {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
Ok::<_, anyhow::Error>(files)
};

match process().await {
Ok(st) => json_response(StatusCode::OK, st),
Err(err) => json_response(
StatusCode::INTERNAL_SERVER_ERROR,
ApiError::InternalServerError(err).to_string(),
),
}
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
json_response(StatusCode::OK, files)
}

async fn ingest_aux_files(
Expand All @@ -2414,24 +2430,22 @@ async fn ingest_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let process = || async move {
let mut modification = timeline.begin_modification(Lsn(
timeline.get_last_record_lsn().0 + 8
) /* advance LSN by 8 */);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
for (fname, content) in body.aux_files {
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await?;
}
modification.commit(&ctx).await?;
Ok::<_, anyhow::Error>(())
};

match process().await {
Ok(st) => json_response(StatusCode::OK, st),
Err(err) => Err(ApiError::InternalServerError(err)),
let mut modification = timeline.begin_modification(
Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
for (fname, content) in body.aux_files {
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
}
modification
.commit(&ctx)
.await
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, ())
}

/// Report on the largest tenants on this pageserver, for the storage controller to identify
Expand Down Expand Up @@ -2814,6 +2828,10 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
)
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
Expand Down
14 changes: 2 additions & 12 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,25 +1489,15 @@ impl<'a> DatadirModification<'a> {
if aux_files_key_v1.is_empty() {
None
} else {
self.tline
.last_aux_file_policy
.store(Some(AuxFilePolicy::V1));
self.tline
.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(
AuxFilePolicy::V1,
))?;
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
} else {
current_policy
};

if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.last_aux_file_policy.store(Some(switch_policy));
self.tline
.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
self.tline.do_switch_aux_policy(switch_policy)?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
Expand Down
61 changes: 61 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5999,6 +5999,67 @@ mod tests {
);
}

#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;

let mut lsn = Lsn(0x08);

let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);

{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}

tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();

assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"dirty index_part.json reflected state is yet to be updated"
);

// lose all data from v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);

{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}

// read data ingested in v2
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// lose all data from v1
assert_eq!(files.get("pg_logical/mappings/test1"), None);
}

#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap();
Expand Down
8 changes: 8 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4606,6 +4606,14 @@ impl Timeline {
) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}

/// Switch aux file policy and schedule upload to the index part.
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
self.last_aux_file_policy.store(Some(policy));
self.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
Ok(())
}
}

/// Top-level failure to compact.
Expand Down
Loading