Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/daemon/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,13 @@ fn handle_admin_token(
ki.private_key(),
token_exp,
)?;
info!("Admin token: {token}");
let default_token_path = config.client.default_rpc_token_path();
if let Err(e) =
crate::utils::io::write_new_sensitive_file(token.as_bytes(), &default_token_path)
{
tracing::warn!("Failed to save the default admin token file: {e}");
} else {
info!("Admin token is saved to {}", default_token_path.display());
}
if let Some(path) = opts.save_token.as_ref() {
if let Some(dir) = path.parent()
Expand All @@ -354,6 +355,7 @@ fn handle_admin_token(
}
std::fs::write(path, &token)
.with_context(|| format!("Failed to save admin token to {}", path.display()))?;
info!("Admin token is saved to {}", path.display());
}

Ok(token)
Expand Down
20 changes: 17 additions & 3 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul
let start_time = chrono::Utc::now();
let mut terminate = signal(SignalKind::terminate())?;
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);

let result = tokio::select! {
ret = start(start_time, opts, config, shutdown_send) => ret,
_ = ctrl_c() => {
Expand Down Expand Up @@ -375,6 +374,7 @@ fn maybe_start_rpc_service(
chain_follower: &ChainFollower<DbType>,
start_time: chrono::DateTime<chrono::Utc>,
shutdown: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
ctx: &AppContext,
) -> anyhow::Result<()> {
if config.client.enable_rpc {
Expand Down Expand Up @@ -402,6 +402,12 @@ fn maybe_start_rpc_service(
let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
async move {
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
.await
.map_err(|e| {
anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
})
.unwrap();
start_rpc(
RPCState {
state_manager,
Expand All @@ -417,7 +423,8 @@ fn maybe_start_rpc_service(
tipset_send,
snapshot_progress_tracker,
},
rpc_address,
rpc_listener,
rpc_stop_handle,
filter_list,
)
.await
Expand Down Expand Up @@ -559,11 +566,16 @@ pub(super) async fn start(
});
}
loop {
let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
tokio::select! {
_ = snap_gc_reboot_rx.recv_async() => {
// gracefully shutdown RPC server
if let Err(e) = rpc_server_handle.stop() {
tracing::warn!("failed to stop RPC server: {e}");
}
snap_gc.cleanup_before_reboot().await;
}
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| {
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| {
snap_gc.set_db(ctx.db.clone());
snap_gc.set_sync_status(sync_status);
snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
Expand All @@ -579,6 +591,7 @@ pub(super) async fn start_services(
opts: &CliOpts,
mut config: Config,
shutdown_send: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
) -> anyhow::Result<()> {
// Cleanup the collector prometheus metrics registry on start
Expand Down Expand Up @@ -608,6 +621,7 @@ pub(super) async fn start_services(
&chain_follower,
start_time,
shutdown_send.clone(),
rpc_stop_handle,
&ctx,
)?;

Expand Down
24 changes: 19 additions & 5 deletions src/rpc/methods/auth.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::auth::*;
use crate::lotus_json::lotus_json_with_self;
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
use crate::{
KeyStore,
auth::*,
lotus_json::lotus_json_with_self,
rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError},
};
use anyhow::Result;
use chrono::Duration;
use enumflags2::BitFlags;
Expand All @@ -14,6 +17,18 @@ use serde_with::{DurationSeconds, serde_as};

/// RPC call to create a new JWT Token
pub enum AuthNew {}

impl AuthNew {
pub fn create_token(
keystore: &KeyStore,
token_exp: Duration,
permissions: Vec<String>,
) -> anyhow::Result<String> {
let ki = keystore.get(JWT_IDENTIFIER)?;
Ok(create_token(permissions, ki.private_key(), token_exp)?)
}
}

impl RpcMethod<2> for AuthNew {
const NAME: &'static str = "Filecoin.AuthNew";
const N_REQUIRED_PARAMS: usize = 1;
Expand All @@ -28,14 +43,13 @@ impl RpcMethod<2> for AuthNew {
(permissions, expiration_secs): Self::Params,
) -> Result<Self::Ok, ServerError> {
let ks = ctx.keystore.read();
let ki = ks.get(JWT_IDENTIFIER)?;
// Lotus admin tokens do not expire but Forest requires all JWT tokens to
// have an expiration date. So we set the expiration date to 100 years in
// the future to match user-visible behavior of Lotus.
let token_exp = expiration_secs
.map(chrono::Duration::seconds)
.unwrap_or_else(|| chrono::Duration::days(365 * 100));
let token = create_token(permissions, ki.private_key(), token_exp)?;
let token = Self::create_token(&ks, token_exp, permissions)?;
Ok(token.as_bytes().to_vec())
}
}
Expand Down
75 changes: 66 additions & 9 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,10 @@ use fvm_ipld_blockstore::Blockstore;
use jsonrpsee::{
Methods,
core::middleware::RpcServiceBuilder,
server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel},
server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
};
use parking_lot::RwLock;
use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -524,7 +523,8 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {

pub async fn start_rpc<DB>(
state: RPCState<DB>,
rpc_endpoint: SocketAddr,
rpc_listener: tokio::net::TcpListener,
stop_handle: StopHandle,
filter_list: Option<FilterList>,
) -> anyhow::Result<()>
where
Expand All @@ -551,8 +551,6 @@ where
let methods: Arc<HashMap<ApiPaths, Methods>> =
Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());

let (stop_handle, _server_handle) = stop_channel();

let per_conn = PerConnection {
stop_handle: stop_handle.clone(),
svc_builder: Server::builder()
Expand All @@ -575,12 +573,10 @@ where
.to_service_builder(),
keystore,
};

let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap();
tracing::info!("Ready for RPC connections");
loop {
let sock = tokio::select! {
res = listener.accept() => {
res = rpc_listener.accept() => {
match res {
Ok((stream, _remote_addr)) => stream,
Err(e) => {
Expand Down Expand Up @@ -782,7 +778,13 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR

#[cfg(test)]
mod tests {
use crate::rpc::ApiPaths;
use super::*;
use crate::{
db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
tool::offline_server::server::offline_rpc_state,
};
use jsonrpsee::server::stop_channel;
use std::net::{Ipv4Addr, SocketAddr};

// `cargo test --lib -- --exact 'rpc::tests::openrpc'`
// `cargo insta review`
Expand All @@ -801,4 +803,59 @@ mod tests {
insta::assert_yaml_snapshot!(_spec);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_rpc_server() {
let chain = NetworkChain::Calibnet;
let db = Arc::new(MemoryDB::default());
let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None).await.unwrap();
let block_delay_secs = state.chain_config().block_delay_secs;
let shutdown_send = state.shutdown.clone();
let jwt_read_permissions = vec!["read".to_owned()];
let jwt_read = super::methods::auth::AuthNew::create_token(
&state.keystore.read(),
chrono::Duration::hours(1),
jwt_read_permissions.clone(),
)
.unwrap();
let rpc_listener =
tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
.await
.unwrap();
let rpc_address = rpc_listener.local_addr().unwrap();
let (stop_handle, server_handle) = stop_channel();

// Start an RPC server

let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));

// Send a few requests

let client = Client::from_url(
format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
.parse()
.unwrap(),
);

let response = super::methods::common::Version::call(&client, ())
.await
.unwrap();
assert_eq!(
&response.version,
&*crate::utils::version::FOREST_VERSION_STRING
);
assert_eq!(response.block_delay, block_delay_secs);
assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));

let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
.await
.unwrap();
assert_eq!(response, jwt_read_permissions);

// Gracefully shutdown the RPC server
shutdown_send.send(()).await.unwrap();
shutdown_recv.recv().await;
server_handle.stop().unwrap();
handle.await.unwrap().unwrap();
}
}
2 changes: 1 addition & 1 deletion src/tool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT
pub mod main;
mod offline_server;
pub mod offline_server;
pub mod subcommands;
Loading
Loading