Skip to content

Commit

Permalink
feat: otlp logging support http endpoint (#15636)
Browse files Browse the repository at this point in the history
* feat: otlp logging http endpoint

* fix: update config table for ci
  • Loading branch information
everpcpc authored May 24, 2024
1 parent 1079b7e commit c8e3c20
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 176 deletions.
27 changes: 22 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ arrow-udf-wasm = { package = "arrow-udf-wasm", git = "https://github.com/datafus
prost = { version = "0.12.1" }
prost-build = { version = "0.12.1" }
serde = { version = "1.0.164", features = ["derive", "rc"] }
serde_with = { version = "3.8.1" }
serde_json = { version = "1.0.85", default-features = false, features = ["preserve_order"] }
tonic-build = { version = "0.11" }

Expand All @@ -247,7 +248,13 @@ logcall = "0.1.5"
minitrace = { version = "0.6.5", features = ["enable"] }
minitrace-opentelemetry = "0.6.5"
opentelemetry = { version = "0.22", features = ["trace", "logs"] }
opentelemetry-otlp = { version = "0.15", features = ["trace", "logs", "grpc-tonic"] }
opentelemetry-otlp = { version = "0.15", features = [
"trace",
"logs",
"grpc-tonic",
"http-proto",
"reqwest-client",
] }
opentelemetry_sdk = { version = "0.22", features = ["trace", "logs", "rt-tokio"] }
tracing = "0.1.40"
tracing-appender = "0.2.3"
Expand Down
8 changes: 6 additions & 2 deletions src/binaries/meta/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,12 @@ pub async fn entry(conf: Config) -> anyhow::Result<()> {
println!("Log:");
println!(" File: {}", conf.log.file);
println!(" Stderr: {}", conf.log.stderr);
println!(" OTLP: {}", conf.log.otlp);
println!(" Tracing: {}", conf.log.tracing);
if conf.log.otlp.on {
println!(" OpenTelemetry: {}", conf.log.otlp);
}
if conf.log.tracing.on {
println!(" Tracing: {}", conf.log.tracing);
}
println!("Id: {}", conf.raft_config.id);
println!("Raft Cluster Name: {}", conf.raft_config.cluster_name);
println!("Raft Dir: {}", conf.raft_config.raft_dir);
Expand Down
16 changes: 13 additions & 3 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,18 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
println!("Logging:");
println!(" file: {}", conf.log.file);
println!(" stderr: {}", conf.log.stderr);
println!(" otlp: {}", conf.log.otlp);
println!(" query: {}", conf.log.query);
println!(" tracing: {}", conf.log.tracing);
if conf.log.otlp.on {
println!(" otlp: {}", conf.log.otlp);
}
if conf.log.query.on {
println!(" query: {}", conf.log.query);
}
if conf.log.profile.on {
println!(" profile: {}", conf.log.profile);
}
if conf.log.structlog.on {
println!(" structlog: {}", conf.log.structlog);
}

println!();
println!(
Expand Down Expand Up @@ -355,6 +364,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
.await;
}
info!("Shutdown server.");
log::logger().flush();
Ok(())
}

Expand Down
147 changes: 101 additions & 46 deletions src/common/tracing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,15 @@ impl Default for StderrConfig {
pub struct OTLPConfig {
pub on: bool,
pub level: String,
pub endpoint: String,
pub labels: BTreeMap<String, String>,
pub endpoint: OTLPEndpointConfig,
}

impl Display for OTLPConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"enabled={}, level={}, endpoint={}, labels={}",
self.on, self.level, self.endpoint, labels
"enabled={}, level={}, endpoint={}",
self.on, self.level, self.endpoint
)
}
}
Expand All @@ -146,8 +139,7 @@ impl Default for OTLPConfig {
Self {
on: false,
level: "INFO".to_string(),
endpoint: "http://127.0.0.1:4317".to_string(),
labels: BTreeMap::new(),
endpoint: OTLPEndpointConfig::default(),
}
}
}
Expand All @@ -156,23 +148,16 @@ impl Default for OTLPConfig {
pub struct QueryLogConfig {
pub on: bool,
pub dir: String,
pub otlp_endpoint: String,
pub labels: BTreeMap<String, String>,
pub otlp: Option<OTLPEndpointConfig>,
}

impl Display for QueryLogConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"enabled={}, dir={}, otlp_endpoint={}, labels={}",
self.on, self.dir, self.otlp_endpoint, labels,
)
write!(f, "enabled={}, dir={}", self.on, self.dir)?;
if let Some(endpoint) = &self.otlp {
write!(f, ", otlp={}", endpoint)?;
}
Ok(())
}
}

Expand All @@ -181,8 +166,7 @@ impl Default for QueryLogConfig {
Self {
on: false,
dir: "".to_string(),
otlp_endpoint: "".to_string(),
labels: BTreeMap::new(),
otlp: None,
}
}
}
Expand All @@ -191,23 +175,16 @@ impl Default for QueryLogConfig {
pub struct ProfileLogConfig {
pub on: bool,
pub dir: String,
pub otlp_endpoint: String,
pub labels: BTreeMap<String, String>,
pub otlp: Option<OTLPEndpointConfig>,
}

impl Display for ProfileLogConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"enabled={}, dir={}, otlp_endpoint={}, labels={}",
self.on, self.dir, self.otlp_endpoint, labels,
)
write!(f, "enabled={}, dir={}", self.on, self.dir)?;
if let Some(endpoint) = &self.otlp {
write!(f, ", otlp={}", endpoint)?;
}
Ok(())
}
}

Expand All @@ -216,8 +193,7 @@ impl Default for ProfileLogConfig {
Self {
on: false,
dir: "".to_string(),
otlp_endpoint: "".to_string(),
labels: BTreeMap::new(),
otlp: None,
}
}
}
Expand Down Expand Up @@ -247,15 +223,15 @@ impl Default for StructLogConfig {
pub struct TracingConfig {
pub on: bool,
pub capture_log_level: String,
pub otlp_endpoint: String,
pub otlp: OTLPEndpointConfig,
}

impl Display for TracingConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"enabled={}, capture_log_level={}, otlp_endpoint={}",
self.on, self.capture_log_level, self.otlp_endpoint
"enabled={}, capture_log_level={}, otlp={}",
self.on, self.capture_log_level, self.otlp
)
}
}
Expand All @@ -265,7 +241,86 @@ impl Default for TracingConfig {
Self {
on: false,
capture_log_level: "INFO".to_string(),
otlp_endpoint: "http://127.0.0.1:4317".to_string(),
otlp: OTLPEndpointConfig::default(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OTLPProtocol {
Http,
Grpc,
}

impl Default for OTLPProtocol {
fn default() -> Self {
Self::Grpc
}
}

impl Display for OTLPProtocol {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
OTLPProtocol::Http => write!(f, "http"),
OTLPProtocol::Grpc => write!(f, "grpc"),
}
}
}

impl serde::Serialize for OTLPProtocol {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_str(match self {
OTLPProtocol::Http => "http",
OTLPProtocol::Grpc => "grpc",
})
}
}

impl<'de> serde::Deserialize<'de> for OTLPProtocol {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: serde::Deserializer<'de> {
let protocol = String::deserialize(deserializer)?;
match protocol.as_str() {
"http" => Ok(OTLPProtocol::Http),
"grpc" => Ok(OTLPProtocol::Grpc),
_ => Err(serde::de::Error::custom(format!(
"unknown protocol: {}",
protocol
))),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
pub struct OTLPEndpointConfig {
pub endpoint: String,
pub protocol: OTLPProtocol,
pub labels: BTreeMap<String, String>,
}

impl Display for OTLPEndpointConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"[endpoint={}, protocol={}, labels={}]",
self.endpoint, self.protocol, labels
)
}
}

impl Default for OTLPEndpointConfig {
fn default() -> Self {
Self {
endpoint: "http://127.0.0.1:4317".to_string(),
protocol: OTLPProtocol::Grpc,
labels: BTreeMap::new(),
}
}
}
Loading

0 comments on commit c8e3c20

Please sign in to comment.