diff --git a/Cargo.lock b/Cargo.lock index 071ecd9..f84d390 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -894,8 +894,9 @@ dependencies = [ "infer", "lazy_static", "opentelemetry", + "opentelemetry-http", "opentelemetry-jaeger", - "prometheus 0.12.0", + "prometheus", "reqwest", "serde", "serde_json", @@ -937,7 +938,7 @@ dependencies = [ "image", "img_hash", "lazy_static", - "prometheus 0.12.0", + "prometheus", "reqwest", "serde", "serde_json", @@ -961,7 +962,7 @@ dependencies = [ "hyper", "lazy_static", "postgres", - "prometheus 0.12.0", + "prometheus", "r2d2", "r2d2_postgres", "reqwest", @@ -1276,7 +1277,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.7", + "pin-project", "socket2", "tokio", "tower-service", @@ -1844,26 +1845,46 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.6.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc49c2e2f36870923d158ff0e357c28e2ff581cb7d8d2181ac27198882ca4132" +checksum = "b91cea1dfd50064e52db033179952d18c770cbc5dfefc8eba45d619357ba3914" dependencies = [ + "async-trait", "futures", + "js-sys", "lazy_static", "percent-encoding", - "pin-project 0.4.28", - "prometheus 0.7.0", - "rand 0.7.3", + "pin-project", + "rand 0.8.3", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "opentelemetry-http" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba72bd27f28377a2e4f0f15a040c06ab0e70ea73df0689f8adf4d22d04591a1a" +dependencies = [ + "async-trait", + "http", + "opentelemetry", + "thiserror", ] [[package]] name = "opentelemetry-jaeger" -version = "0.5.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adb48b02de79b2d67c2d04e8a0c6140b63830fc6bfde0fdb1e55e74bbb710f57" +checksum = "ddd4984441954f9ebbe3eebdfc6fd4fa95be6400d403171228779b949f3cd918" dependencies = [ + "async-trait", + "lazy_static", "opentelemetry", + "thiserror", "thrift", + "tokio", ] [[package]] @@ -1985,33 +2006,13 @@ dependencies = [ "siphasher", ] -[[package]] -name = "pin-project" -version = "0.4.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "918192b5c59119d51e0cd221f4d49dde9112824ba717369e903c97d076083d0f" -dependencies = [ - "pin-project-internal 0.4.28", -] - [[package]] name = "pin-project" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4" dependencies = [ - "pin-project-internal 1.0.7", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -2146,20 +2147,6 @@ dependencies = [ "libc", ] -[[package]] -name = "prometheus" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" -dependencies = [ - "cfg-if 0.1.10", - "fnv", - "lazy_static", - "protobuf", - "quick-error", - "spin", -] - [[package]] name = "prometheus" version = "0.12.0" @@ -2774,12 +2761,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "sqlformat" version = "0.1.6" @@ -3174,7 +3155,7 @@ checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" dependencies = [ "futures-util", "log", - "pin-project 1.0.7", + "pin-project", "tokio", "tungstenite", ] @@ -3238,7 +3219,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 1.0.7", + "pin-project", "tracing", ] @@ -3255,12 +3236,11 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.5.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b64607914c2892f73ebfd19c6777a0dfb3f5457bf3894693dd2125c41b9c208a" +checksum = "99003208b647dae59dcefc49c98aecaa3512fbc29351685d4b9ef23a9218458e" dependencies = [ "opentelemetry", - "rand 0.7.3", "tracing", "tracing-core", "tracing-log", @@ -3477,7 +3457,7 @@ dependencies = [ "mime_guess", "multipart", "percent-encoding", - "pin-project 1.0.7", + "pin-project", "scoped-tls", "serde", "serde_json", diff --git a/fuzzysearch/Cargo.toml b/fuzzysearch/Cargo.toml index f14397a..e433ad4 100644 --- a/fuzzysearch/Cargo.toml +++ b/fuzzysearch/Cargo.toml @@ -12,9 +12,10 @@ tracing-futures = "0.2" prometheus = { version = "0.12", features = ["process"] } lazy_static = "1" -opentelemetry = "0.6" -opentelemetry-jaeger = "0.5" -tracing-opentelemetry = "0.5" +opentelemetry = { version = "0.13", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.12", features = ["tokio"] } +tracing-opentelemetry = "0.12" +opentelemetry-http = "0.2" tokio = { version = "1", features = ["full"] } async-stream = "0.3" diff --git a/fuzzysearch/src/filters.rs b/fuzzysearch/src/filters.rs index d1974b4..f03a77f 100644 --- a/fuzzysearch/src/filters.rs +++ b/fuzzysearch/src/filters.rs @@ -19,16 +19,16 @@ pub fn search( pub fn search_file(db: Pool) -> impl Filter + Clone { warp::path("file") - .and(warp::header::optional::("x-b3")) + .and(warp::header::headers_cloned()) .and(warp::get()) .and(warp::query::()) .and(with_pool(db)) .and(with_api_key()) - .and_then(|b3, opts, db, api_key| { + .and_then(|headers, opts, db, api_key| { use tracing_opentelemetry::OpenTelemetrySpanExt; let span = tracing::info_span!("search_file", ?opts); - span.set_parent(&with_telem(b3)); + span.set_parent(with_telem(headers)); span.in_scope(|| handlers::search_file(opts, db, api_key).in_current_span()) }) } @@ -38,18 +38,18 @@ pub fn search_image( tree: Tree, ) -> impl Filter + Clone { warp::path("image") - .and(warp::header::optional::("x-b3")) + .and(warp::header::headers_cloned()) .and(warp::post()) .and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::query::()) .and(with_pool(db)) .and(with_tree(tree)) .and(with_api_key()) - .and_then(|b3, form, opts, pool, tree, api_key| { + .and_then(|headers, form, opts, pool, tree, api_key| { use tracing_opentelemetry::OpenTelemetrySpanExt; let span = tracing::info_span!("search_image", ?opts); - span.set_parent(&with_telem(b3)); + span.set_parent(with_telem(headers)); span.in_scope(|| { handlers::search_image(form, opts, pool, tree, api_key).in_current_span() }) @@ -74,17 +74,17 @@ pub fn search_hashes( tree: Tree, ) -> impl Filter + Clone { warp::path("hashes") - .and(warp::header::optional::("x-b3")) + .and(warp::header::headers_cloned()) .and(warp::get()) .and(warp::query::()) .and(with_pool(db)) .and(with_tree(tree)) .and(with_api_key()) - .and_then(|b3, opts, db, tree, api_key| { + .and_then(|headers, opts, db, tree, api_key| { use tracing_opentelemetry::OpenTelemetrySpanExt; let span = tracing::info_span!("search_hashes", ?opts); - span.set_parent(&with_telem(b3)); + span.set_parent(with_telem(headers)); span.in_scope(|| handlers::search_hashes(opts, db, tree, api_key).in_current_span()) }) } @@ -94,33 +94,33 @@ pub fn stream_search_image( tree: Tree, ) -> impl Filter + Clone { warp::path("stream") - .and(warp::header::optional::("x-b3")) + .and(warp::header::headers_cloned()) .and(warp::post()) .and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(with_pool(db)) .and(with_tree(tree)) .and(with_api_key()) - .and_then(|b3, form, pool, tree, api_key| { + .and_then(|headers, form, pool, tree, api_key| { use tracing_opentelemetry::OpenTelemetrySpanExt; let span = tracing::info_span!("stream_search_image"); - span.set_parent(&with_telem(b3)); + span.set_parent(with_telem(headers)); span.in_scope(|| handlers::stream_image(form, pool, tree, api_key).in_current_span()) }) } pub fn search_video(db: Pool) -> impl Filter + Clone { warp::path("video") - .and(warp::header::optional::("x-b3")) + .and(warp::header::headers_cloned()) .and(warp::post()) .and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(with_pool(db)) .and(with_api_key()) - .and_then(|b3, form, db, api_key| { + .and_then(|headers, form, db, api_key| { use tracing_opentelemetry::OpenTelemetrySpanExt; let span = tracing::info_span!("search_video"); - span.set_parent(&with_telem(b3)); + span.set_parent(with_telem(headers)); span.in_scope(|| handlers::search_video(form, db, api_key).in_current_span()) }) } @@ -145,23 +145,12 @@ fn with_tree(tree: Tree) -> impl Filter + warp::any().map(move || tree.clone()) } -fn with_telem(b3: Option) -> opentelemetry::api::context::Context { - use opentelemetry::api::{HttpTextFormat, TraceContextExt}; +fn with_telem(headers: warp::http::HeaderMap) -> opentelemetry::Context { + let remote_context = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&opentelemetry_http::HeaderExtractor(&headers)) + }); - let mut carrier = std::collections::HashMap::new(); - if let Some(b3) = b3 { - // It took way too long to realize it's a case-sensitive comparison... - // Looks like it should be fixed in the next release, - // https://github.com/open-telemetry/opentelemetry-rust/pull/148 - carrier.insert("X-B3".to_string(), b3); - } + tracing::trace!(?remote_context, "Got remote context"); - let propagator = opentelemetry::api::B3Propagator::new(true); - let parent_context = propagator.extract(&carrier); - tracing::trace!( - "remote span context: {:?}", - parent_context.remote_span_context() - ); - - parent_context + remote_context } diff --git a/fuzzysearch/src/main.rs b/fuzzysearch/src/main.rs index 9e5389a..a4689c4 100644 --- a/fuzzysearch/src/main.rs +++ b/fuzzysearch/src/main.rs @@ -76,61 +76,51 @@ async fn main() { } fn configure_tracing() { - use opentelemetry::{ - api::{KeyValue, Provider}, - sdk::{Config, Sampler}, - }; - use tracing_subscriber::{layer::SubscriberExt, prelude::*}; + use opentelemetry::KeyValue; + use tracing_subscriber::layer::SubscriberExt; - let env = if cfg!(debug_assertions) { + let env = std::env::var("ENVIRONMENT"); + let env = if let Ok(env) = env.as_ref() { + env.as_str() + } else if cfg!(debug_assertions) { "debug" } else { "release" }; - let fmt_layer = tracing_subscriber::fmt::layer(); - let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env() - .or_else(|_| tracing_subscriber::EnvFilter::try_new("info")) + opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + + let tracer = opentelemetry_jaeger::new_pipeline() + .with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR")) + .with_service_name("fuzzysearch") + .with_tags(vec![ + KeyValue::new("environment", env.to_owned()), + KeyValue::new("version", env!("CARGO_PKG_VERSION")), + ]) + .install_batch(opentelemetry::runtime::Tokio) .unwrap(); - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .finish(); - let registry = tracing_subscriber::registry() - .with(filter_layer) - .with(fmt_layer); - let exporter = opentelemetry_jaeger::Exporter::builder() - .with_agent_endpoint( - std::env::var("JAEGER_COLLECTOR") - .expect("Missing JAEGER_COLLECTOR") - .parse() - .unwrap(), - ) - .with_process(opentelemetry_jaeger::Process { - service_name: "fuzzysearch".to_string(), - tags: vec![ - KeyValue::new("environment", env), - KeyValue::new("version", env!("CARGO_PKG_VERSION")), - ], - }) - .init() - .expect("unable to create jaeger exporter"); + let trace = tracing_opentelemetry::layer().with_tracer(tracer); + let env_filter = tracing_subscriber::EnvFilter::from_default_env(); - let provider = opentelemetry::sdk::Provider::builder() - .with_simple_exporter(exporter) - .with_config(Config { - default_sampler: Box::new(Sampler::Always), - ..Default::default() - }) - .build(); - - opentelemetry::global::set_provider(provider); - - let tracer = opentelemetry::global::trace_provider().get_tracer("fuzzysearch"); - let telem_layer = tracing_opentelemetry::layer().with_tracer(tracer); - let registry = registry.with(telem_layer); - - registry.init(); + if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) { + let subscriber = tracing_subscriber::fmt::layer() + .json() + .with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339()) + .with_target(true); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(trace) + .with(subscriber); + tracing::subscriber::set_global_default(subscriber).unwrap(); + } else { + let subscriber = tracing_subscriber::fmt::layer(); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(trace) + .with(subscriber); + tracing::subscriber::set_global_default(subscriber).unwrap(); + } } async fn metrics(