diff --git a/src/filters.rs b/src/filters.rs index 3c94ccc..7533420 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -12,6 +12,7 @@ pub fn search(db: Pool) -> impl Filter pub fn search_file(db: Pool) -> impl Filter + Clone { warp::path("file") + .and(with_telem()) .and(warp::get()) .and(warp::query::()) .and(with_pool(db)) @@ -21,6 +22,7 @@ pub fn search_file(db: Pool) -> impl Filter impl Filter + Clone { warp::path("image") + .and(with_telem()) .and(warp::post()) .and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::query::()) @@ -31,6 +33,7 @@ pub fn search_image(db: Pool) -> impl Filter impl Filter + Clone { warp::path("hashes") + .and(with_telem()) .and(warp::get()) .and(warp::query::()) .and(with_pool(db)) @@ -42,6 +45,7 @@ pub fn stream_search_image( db: Pool, ) -> impl Filter + Clone { warp::path("stream") + .and(with_telem()) .and(warp::post()) .and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(with_pool(db)) @@ -56,3 +60,27 @@ fn with_api_key() -> impl Filter + Clone fn with_pool(db: Pool) -> impl Filter + Clone { warp::any().map(move || db.clone()) } + +fn with_telem() -> impl Filter + Clone { + warp::any() + .and(warp::header::optional("traceparent")) + .map(|traceparent: Option| { + use opentelemetry::api::trace::{provider::Provider, tracer::Tracer, propagator::HttpTextFormat}; + + let mut headers = std::collections::HashMap::new(); + headers.insert("Traceparent", traceparent.unwrap_or_else(String::new)); + + let propagator = opentelemetry::api::distributed_context::http_trace_context_propagator::HTTPTraceContextPropagator::new(); + let context = propagator.extract(&headers); + + if context.is_valid() { + let tracer = opentelemetry::global::trace_provider().get_tracer("api"); + let span = tracer.start("context", Some(context)); + tracer.mark_span_as_active(&span); + + Some(span) + } else { + None + } + }) +} diff --git a/src/handlers.rs b/src/handlers.rs index 680d9a1..789a79c 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -76,8 +76,9 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas (i64::from_be_bytes(buf), hash) } -#[tracing::instrument(skip(form, pool, api_key))] +#[tracing::instrument(skip(_telem, form, pool, api_key))] pub async fn search_image( + _telem: crate::Span, form: warp::multipart::FormData, opts: ImageSearchOpts, pool: Pool, @@ -123,8 +124,9 @@ pub async fn search_image( Ok(warp::reply::json(&similarity)) } -#[tracing::instrument(skip(form, pool, api_key))] +#[tracing::instrument(skip(_telem, form, pool, api_key))] pub async fn stream_image( + _telem: crate::Span, form: warp::multipart::FormData, pool: Pool, api_key: String, @@ -158,8 +160,9 @@ fn sse_matches( Ok(warp::sse::json(items)) } -#[tracing::instrument(skip(form, db, api_key))] +#[tracing::instrument(skip(_telem, form, db, api_key))] pub async fn search_hashes( + _telem: crate::Span, opts: HashSearchOpts, db: Pool, api_key: String, @@ -190,8 +193,9 @@ pub async fn search_hashes( Ok(warp::reply::json(&matches)) } -#[tracing::instrument(skip(db, api_key))] +#[tracing::instrument(skip(_telem, db, api_key))] pub async fn search_file( + _telem: crate::Span, opts: FileSearchOpts, db: Pool, api_key: String, diff --git a/src/main.rs b/src/main.rs index acf8389..1b62a72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,8 @@ mod utils; use warp::Filter; +type Span = Option; + fn configure_tracing() { use opentelemetry::{ api::{KeyValue, Provider, Sampler}, @@ -43,7 +45,9 @@ fn configure_tracing() { }) .build(); - let tracer = provider.get_tracer("api"); + opentelemetry::global::set_provider(provider); + + let tracer = opentelemetry::global::trace_provider().get_tracer("api"); let telem_layer = tracing_opentelemetry::OpentelemetryLayer::with_tracer(tracer); let fmt_layer = tracing_subscriber::fmt::Layer::default();