Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,291 changes: 1,103 additions & 188 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions proto/gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ protoc -I . --go_out=. --go_opt=$GO_OPT \

protoc -I . --go_out=. --go_opt=$GO_OPT \
./encore/runtime/v1/secretdata.proto

# Prometheus protos for metrics exporter
protoc -I . --go_out=../runtimes/go/appruntime/infrasdk/metrics/prometheus --go_opt=$GO_OPT \
./prompb/types.proto
protoc -I . --go_out=../runtimes/go/appruntime/infrasdk/metrics/prometheus --go_opt=$GO_OPT \
./prompb/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package prometheus;

option go_package = "encore.dev/appruntime/metrics/prometheus/prompb";

import "types.proto";
import "prompb/types.proto";

message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1;
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
reserved 2;
repeated prometheus.MetricMetadata metadata = 3;
}

Expand All @@ -31,8 +31,9 @@ message ReadRequest {
repeated Query queries = 1;

enum ResponseType {
// Server will return a single ReadResponse message with matched series that includes list of raw samples.
// It's recommended to use streamed response types instead.
// Server will return a single ReadResponse message with matched series that
// includes list of raw samples. It's recommended to use streamed response
// types instead.
//
// Response headers:
// Content-Type: "application/x-protobuf"
Expand All @@ -44,16 +45,18 @@ message ReadRequest {
// uint32 for CRC32 Castagnoli checksum.
//
// Response headers:
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
// Content-Type: "application/x-streamed-protobuf;
// proto=prometheus.ChunkedReadResponse" Content-Encoding: ""
STREAMED_XOR_CHUNKS = 1;
}

// accepted_response_types allows negotiating the content type of the response.
// accepted_response_types allows negotiating the content type of the
// response.
//
// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
// implemented by server, error is returned.
// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
// Response types are taken from the list in the FIFO order. If no response
// type in `accepted_response_types` is implemented by server, error is
// returned. For request that do not contain `accepted_response_types` field
// the SAMPLES response type will be used.
repeated ResponseType accepted_response_types = 2;
}

Expand All @@ -75,13 +78,16 @@ message QueryResult {
repeated prometheus.TimeSeries timeseries = 1;
}

// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
// ChunkedReadResponse is a response when response_type equals
// STREAMED_XOR_CHUNKS. We strictly stream full series after series, optionally
// split by time. This means that a single frame can contain partition of the
// single series, but once a new series is started to be streamed it means that
// no more chunks will be sent for previous one. Series are returned sorted in
// the same way TSDB block are internally.
message ChunkedReadResponse {
repeated prometheus.ChunkedSeries chunked_series = 1;

// query_index represents an index of the query from ReadRequest.queries these chunks relates to.
// query_index represents an index of the query from ReadRequest.queries these
// chunks relates to.
int64 query_index = 2;
}
File renamed without changes.
10 changes: 10 additions & 0 deletions runtimes/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rttrace = []
[dependencies]
pingora = { version = "0.4", features = ["lb", "openssl"] }
anyhow = "1.0.76"
async-trait = "0.1"
base64 = "0.21.5"
gjson = "0.8.1"
prost = "0.12.3"
Expand Down Expand Up @@ -123,6 +124,15 @@ email_address = "0.2.9"
cookie = "0.18.1"
malachite = "0.6.1"
byteorder = "1.5.0"
metrics = "0.24.2"
dashmap = "6.1.0"
google-cloud-monitoring-v3 = "1.0.0"
google-cloud-api = "1.0.0"
google-cloud-wkt = "1.0.0"
sysinfo = "0.37.2"
aws-sdk-cloudwatch = "1.94.0"
datadog-api-client = "0.20.0"
snap = "1.1.1"

[build-dependencies]
prost-build = "0.12.3"
Expand Down
1 change: 1 addition & 0 deletions runtimes/core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ fn main() -> std::io::Result<()> {
&[
"../../proto/encore/runtime/v1/runtime.proto",
"../../proto/encore/parser/meta/v1/meta.proto",
"../../proto/prompb/remote.proto",
],
&["../../proto/"],
)?;
Expand Down
8 changes: 8 additions & 0 deletions runtimes/core/src/api/auth/local.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::metrics::counter;

use crate::api::auth::{AuthHandler, AuthPayload, AuthRequest, AuthResponse};
use crate::api::schema::encoding::Schema;
use crate::api::{APIResult, HandlerResponse, HandlerResponseInner, PValues};
Expand All @@ -14,6 +16,7 @@ pub struct LocalAuthHandler {
pub schema: Schema,
pub handler: RwLock<Option<Arc<dyn api::TypedHandler>>>,
pub tracer: Tracer,
pub requests_total: counter::Schema<u64>,
}

impl LocalAuthHandler {
Expand Down Expand Up @@ -152,7 +155,9 @@ impl AuthHandler for LocalAuthHandler {
user_id: auth_uid.clone(),
})),
};

self.tracer.request_span_end(&model_resp, false);
self.requests_total.with([("code", "ok")]).increment();
Ok(AuthResponse::Authenticated {
auth_uid,
auth_data,
Expand All @@ -165,6 +170,9 @@ impl AuthHandler for LocalAuthHandler {
data: model::ResponseData::Auth(Err(e.clone())),
};
self.tracer.request_span_end(&model_resp, false);
self.requests_total
.with([("code", e.code.to_string())])
.increment();
Err(e)
}
}
Expand Down
19 changes: 11 additions & 8 deletions runtimes/core/src/api/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::api::{jsonschema, schema, ErrCode, Error};
use crate::encore::parser::meta::v1::rpc;
use crate::encore::parser::meta::v1::{self as meta, selector};
use crate::log::LogFromRust;
use crate::metrics::counter;
use crate::model::StreamDirection;
use crate::names::EndpointName;
use crate::trace;
Expand Down Expand Up @@ -406,6 +407,7 @@ pub(super) struct EndpointHandler {
pub endpoint: Arc<Endpoint>,
pub handler: Arc<dyn BoxedHandler>,
pub shared: Arc<SharedEndpointData>,
pub requests_total: counter::Schema<u64>,
}

#[derive(Debug)]
Expand All @@ -427,6 +429,7 @@ impl Clone for EndpointHandler {
endpoint: self.endpoint.clone(),
handler: self.handler.clone(),
shared: self.shared.clone(),
requests_total: self.requests_total.clone(),
}
}
}
Expand Down Expand Up @@ -601,7 +604,6 @@ impl EndpointHandler {
let duration = tokio::time::Instant::now().duration_since(request.start);

// If we had a request failure, log that separately.

if let ResponseData::Typed(Err(err)) = &resp {
logger.error(Some(&request), "request failed", Some(err), {
let mut fields = crate::log::Fields::new();
Expand Down Expand Up @@ -629,6 +631,12 @@ impl EndpointHandler {
});
}

let code = match &resp {
ResponseData::Typed(Ok(_)) => "ok".to_string(),
ResponseData::Typed(Err(err)) => err.code.to_string(),
ResponseData::Raw(resp) => ErrCode::from(resp.status()).to_string(),
};

logger.info(Some(&request), "request completed", {
let mut fields = crate::log::Fields::new();
let dur_ms = (duration.as_secs() as f64 * 1000f64)
Expand All @@ -644,13 +652,7 @@ impl EndpointHandler {
)),
);

let code = match &resp {
ResponseData::Typed(Ok(_)) => "ok".to_string(),
ResponseData::Typed(Err(err)) => err.code.to_string(),
ResponseData::Raw(resp) => ErrCode::from(resp.status()).to_string(),
};

fields.insert("code".into(), serde_json::Value::String(code));
fields.insert("code".into(), serde_json::Value::String(code.clone()));
Some(fields)
});

Expand Down Expand Up @@ -685,6 +687,7 @@ impl EndpointHandler {
}),
};
self.shared.tracer.request_span_end(&model_resp, sensitive);
self.requests_total.with([("code", code)]).increment();
}

if let Ok(val) = HeaderValue::from_str(request.span.0.serialize_encore().as_str()) {
Expand Down
15 changes: 14 additions & 1 deletion runtimes/core/src/api/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::api::{
use crate::encore::parser::meta::v1 as meta;
use crate::encore::runtime::v1 as runtime;
use crate::trace::Tracer;
use crate::{api, model, pubsub, secrets, EncoreName, EndpointName, Hosted};
use crate::{api, metrics, model, pubsub, secrets, EncoreName, EndpointName, Hosted};

use super::encore_routes::healthz;
use super::websocket_client::WebSocketClient;
Expand All @@ -43,6 +43,7 @@ pub struct ManagerConfig<'a> {
pub runtime: tokio::runtime::Handle,
pub testing: bool,
pub proxied_push_subs: HashMap<String, EncoreName>,
pub metrics: &'a metrics::Manager,
}

pub struct Manager {
Expand All @@ -57,6 +58,7 @@ pub struct Manager {

gateways: HashMap<EncoreName, Gateway>,
testing: bool,
metrics: metrics::Manager,
}

impl ManagerConfig<'_> {
Expand Down Expand Up @@ -164,6 +166,7 @@ impl ManagerConfig<'_> {
&service_registry,
self.http_client.clone(),
self.tracer.clone(),
self.metrics.registry(),
)
.context("unable to build authenticator")?;

Expand Down Expand Up @@ -200,6 +203,7 @@ impl ManagerConfig<'_> {
inbound_svc_auth,
self.tracer.clone(),
auth_data_schemas,
Arc::clone(self.metrics.registry()),
)
.context("unable to create API server")?;
Some(server)
Expand All @@ -217,6 +221,7 @@ impl ManagerConfig<'_> {
runtime: self.runtime,
healthz: healthz_handler,
testing: self.testing,
metrics: self.metrics.clone(),
})
}
}
Expand Down Expand Up @@ -245,6 +250,7 @@ fn build_auth_handler(
service_registry: &ServiceRegistry,
http_client: reqwest::Client,
tracer: Tracer,
metrics_registry: &Arc<metrics::Registry>,
) -> anyhow::Result<Option<auth::Authenticator>> {
let Some(explicit) = &gw.explicit else {
return Ok(None);
Expand Down Expand Up @@ -283,6 +289,8 @@ fn build_auth_handler(
// let is_local = hosted_services.contains(&explicit.service_name);
let is_local = true;
let name = EndpointName::new(explicit.service_name.clone(), auth.name.clone());
let requests_total =
metrics::requests_total_counter(metrics_registry, &explicit.service_name, &auth.name);

let auth_data = registry.schema(auth_data_schema_idx);
let auth_handler = if is_local {
Expand All @@ -294,6 +302,7 @@ fn build_auth_handler(
schema,
handler: Default::default(),
tracer,
requests_total,
},
)?
} else {
Expand Down Expand Up @@ -330,6 +339,10 @@ impl Manager {
self.service_registry.endpoints()
}

pub fn metrics_registry(&self) -> &Arc<metrics::Registry> {
self.metrics.registry()
}

pub fn stream(
&self,
endpoint_name: EndpointName,
Expand Down
17 changes: 17 additions & 0 deletions runtimes/core/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Server {

/// Data shared between all endpoints.
shared: Arc<SharedEndpointData>,

/// Metrics registry for creating metrics
metrics_registry: Arc<crate::metrics::Registry>,
}

impl Server {
Expand All @@ -47,6 +50,7 @@ impl Server {
inbound_svc_auth: Vec<Arc<dyn svcauth::ServiceAuthMethod>>,
tracer: trace::Tracer,
auth_data_schemas: HashMap<String, Option<JSONSchema>>,
metrics_registry: Arc<crate::metrics::Registry>,
) -> anyhow::Result<Self> {
// Register the routes, and track the handlers in a map so we can easily
// set the request handler when registered.
Expand Down Expand Up @@ -92,11 +96,17 @@ impl Server {
// For static asset routes, configure the static asset handler directly.
// There's no need to defer it for dynamic runtime registration.
let static_handler = StaticAssetsHandler::new(assets);
let requests_total = crate::metrics::requests_total_counter(
&metrics_registry,
ep.name.service(),
ep.name.endpoint(),
);

let handler = EndpointHandler {
endpoint: ep.clone(),
handler: Arc::new(static_handler),
shared: shared.clone(),
requests_total,
};
server_handler.set(handler);
}
Expand Down Expand Up @@ -130,6 +140,7 @@ impl Server {
hosted_endpoints: Mutex::new(handler_map),
router: Mutex::new(Some(router)),
shared,
metrics_registry,
})
}

Expand All @@ -153,11 +164,17 @@ impl Server {
None => Ok(()), // anyhow::bail!("no handler found for endpoint: {}", endpoint_name),
Some(h) => {
let endpoint = self.endpoints.get(&endpoint_name).unwrap().to_owned();
let requests_total = crate::metrics::requests_total_counter(
&self.metrics_registry,
endpoint.name.service(),
endpoint.name.endpoint(),
);

let handler = EndpointHandler {
endpoint,
handler,
shared: self.shared.clone(),
requests_total,
};

h.add(handler);
Expand Down
Loading