Skip to content

Commit

Permalink
Merge pull request #56 from tatchi/tatchi/per-signals-url
Browse files Browse the repository at this point in the history
add support for per-signal urls
  • Loading branch information
c-cube committed Aug 12, 2024
2 parents ea684b0 + e73ea7e commit 4f6cf08
Show file tree
Hide file tree
Showing 23 changed files with 551 additions and 132 deletions.
30 changes: 24 additions & 6 deletions src/client-cohttp-lwt/common_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,30 @@ let debug_ =

let default_url = "http://localhost:4318"

let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)

let get_url () = !url

let set_url s = url := s
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value

let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"

let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"

let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"

let get_url_logs_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"

let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url

let parse_headers s =
let parse_header s =
Expand Down
54 changes: 44 additions & 10 deletions src/client-cohttp-lwt/config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ open Common_

type t = {
debug: bool;
url: string;
url_traces: string;
url_metrics: string;
url_logs: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
Expand All @@ -16,7 +18,9 @@ let pp out self : unit =
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
Expand All @@ -26,17 +30,47 @@ let pp out self : unit =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms
"{@[ debug=%B;@ url_traces=%S;@ url_metrics=%S;@ url_logs=%S;@ \
headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \
batch_timeout_ms=%d; @]}"
debug url_traces url_metrics url_logs ppheaders headers ppiopt batch_traces
ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms

let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = Some 20)
?(batch_logs = Some 400) ?(batch_timeout_ms = 500) () : t =
let make ?(debug = !debug_) ?url ?url_traces ?url_metrics ?url_logs
?(headers = get_headers ()) ?(batch_traces = Some 400)
?(batch_metrics = Some 20) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) () : t =
let url_traces, url_metrics, url_logs =
let base_url =
let base_url =
match get_url_from_env () with
| None -> Option.value url ~default:default_url
| Some url -> remove_trailing_slash url
in
remove_trailing_slash base_url
in
let url_traces =
match get_url_traces_from_env () with
| None -> Option.value url_traces ~default:(base_url ^ "/v1/traces")
| Some url -> url
in
let url_metrics =
match get_url_metrics_from_env () with
| None -> Option.value url_metrics ~default:(base_url ^ "/v1/metrics")
| Some url -> url
in
let url_logs =
match get_url_logs_from_env () with
| None -> Option.value url_logs ~default:(base_url ^ "/v1/logs")
| Some url -> url
in
url_traces, url_metrics, url_logs
in
{
debug;
url;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
Expand Down
28 changes: 25 additions & 3 deletions src/client-cohttp-lwt/config.mli
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
type t = private {
debug: bool;
url: string;
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
url_traces: string; (** Url to send traces *)
url_metrics: string; (** Url to send metrics*)
url_logs: string; (** Url to send logs *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
Expand Down Expand Up @@ -37,6 +37,9 @@ type t = private {
val make :
?debug:bool ->
?url:string ->
?url_traces:string ->
?url_metrics:string ->
?url_logs:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
Expand All @@ -49,6 +52,25 @@ val make :
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
@param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url.
Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal type.
@param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set.
The url is used as-is without any modification.
*)

val pp : Format.formatter -> t -> unit
70 changes: 31 additions & 39 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ module Httpc : sig

val send :
t ->
config:Config.t ->
path:string ->
url:string ->
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
string ->
('a, error) result Lwt.t
Expand All @@ -91,17 +90,8 @@ end = struct
let cleanup _self = ()

(* send the content to the remote endpoint/path *)
let send (_self : t) ~(config : Config.t) ~path ~decode (bod : string) :
('a, error) result Lwt.t =
let url =
let url = config.url in
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url
in
let full_url = url ^ path in
let uri = Uri.of_string full_url in
let send (_self : t) ~url ~decode (bod : string) : ('a, error) result Lwt.t =
let uri = Uri.of_string url in

let open Cohttp in
let headers = Header.(add_list (init ()) !headers) in
Expand All @@ -121,7 +111,7 @@ end = struct
| Error e ->
let err =
`Failure
(spf "sending signals via http POST to %S\nfailed with:\n%s" full_url
(spf "sending signals via http POST to %S\nfailed with:\n%s" url
(Printexc.to_string e))
in
Lwt.return @@ Error err
Expand Down Expand Up @@ -158,7 +148,7 @@ end = struct
%s\n\
status: %S\n\
%s"
full_url code (Printexc.to_string e) body bt))
url code (Printexc.to_string e) body bt))
in
Lwt.return r
)
Expand Down Expand Up @@ -292,11 +282,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =

let set_on_tick_callbacks = Atomic.set on_tick_cbs_

let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit Lwt.t =
let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Lwt.t =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let* r = Httpc.send httpc ~config ~path ~decode:(`Ret ()) data in
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
match r with
| Ok () -> Lwt.return ()
| Error `Sysbreak ->
Expand All @@ -317,23 +307,26 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l ()
in
send_http_ curl encoder ~path:"/v1/metrics"
let url = config.Config.url_metrics in
send_http_ curl encoder ~url
~encode:Metrics_service.encode_pb_export_metrics_service_request x

let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ curl encoder ~path:"/v1/traces"
let url = config.Config.url_traces in
send_http_ curl encoder ~url
~encode:Trace_service.encode_pb_export_trace_service_request x

let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ curl encoder ~path:"/v1/logs"
let url = config.Config.url_logs in
send_http_ curl encoder ~url
~encode:Logs_service.encode_pb_export_logs_service_request x

(* emit metrics, if the batch is full or timeout lapsed *)
Expand Down Expand Up @@ -459,12 +452,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
end in
(module M)

module Backend (Arg : sig
val stop : bool Atomic.t
module Backend
(Arg : sig
val stop : bool Atomic.t

val config : Config.t
end)
() : Opentelemetry.Collector.BACKEND = struct
val config : Config.t
end)
() : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())

open Opentelemetry.Proto
Expand All @@ -475,10 +469,10 @@ end)
send =
(fun l ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l);
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l);
push_trace l;
ret ());
}
Expand Down Expand Up @@ -532,10 +526,10 @@ end)
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m);
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m);

let m = List.rev_append (additional_metrics ()) m in
push_metrics m;
Expand All @@ -547,10 +541,10 @@ end)
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m);
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m);

push_logs m;
ret ());
Expand All @@ -560,8 +554,6 @@ end
let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
debug_ := config.debug;

if config.url <> get_url () then set_url config.url;

let module B =
Backend
(struct
Expand Down
6 changes: 0 additions & 6 deletions src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

open Common_

val get_url : unit -> string

val set_url : string -> unit
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)

val get_headers : unit -> (string * string) list

val set_headers : (string * string) list -> unit
Expand Down
30 changes: 24 additions & 6 deletions src/client-ocurl/common_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,30 @@ let debug_ =

let default_url = "http://localhost:4318"

let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)

let get_url () = !url

let set_url s = url := s
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value

let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"

let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"

let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"

let get_url_logs_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"

let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url

let parse_headers s =
let parse_header s =
Expand Down
Loading

0 comments on commit 4f6cf08

Please sign in to comment.