Update OpenTelemetry.

This commit is contained in:
Syfaro 2020-07-22 23:04:42 -05:00
parent ec9baf0c8a
commit 124089b76f
5 changed files with 417 additions and 478 deletions

745
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,14 +5,13 @@ authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
pretty_env_logger = "0.4"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-subscriber = "0.2"
tracing-futures = "0.2" tracing-futures = "0.2"
opentelemetry = "0.1" opentelemetry = "0.6"
tracing-opentelemetry = "0.1" opentelemetry-jaeger = "0.5"
tracing-opentelemetry = "0.5"
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
futures = "0.3" futures = "0.3"
@ -34,6 +33,10 @@ hamming = "0.1"
bk-tree = "0.3" bk-tree = "0.3"
[profile.dev]
opt-level = 2
debug = true
[profile.release] [profile.release]
lto = true lto = true
codegen-units = 1 codegen-units = 1

View File

@ -1,6 +1,7 @@
use crate::types::*; use crate::types::*;
use crate::{handlers, Pool, Tree}; use crate::{handlers, Pool, Tree};
use std::convert::Infallible; use std::convert::Infallible;
use tracing_futures::Instrument;
use warp::{Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
pub fn search( pub fn search(
@ -16,12 +17,18 @@ pub fn search(
pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("file") warp::path("file")
.and(with_telem()) .and(warp::header::optional::<String>("x-b3"))
.and(warp::get()) .and(warp::get())
.and(warp::query::<FileSearchOpts>()) .and(warp::query::<FileSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_api_key()) .and(with_api_key())
.and_then(handlers::search_file) .and_then(|b3, opts, db, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_file", ?opts);
span.set_parent(&with_telem(b3));
span.in_scope(|| handlers::search_file(opts, db, api_key).in_current_span())
})
} }
pub fn search_image( pub fn search_image(
@ -29,14 +36,22 @@ pub fn search_image(
tree: Tree, tree: Tree,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("image") warp::path("image")
.and(with_telem()) .and(warp::header::optional::<String>("x-b3"))
.and(warp::post()) .and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(warp::query::<ImageSearchOpts>()) .and(warp::query::<ImageSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_tree(tree))
.and(with_api_key()) .and(with_api_key())
.and_then(handlers::search_image) .and_then(|b3, form, opts, pool, tree, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_image", ?opts);
span.set_parent(&with_telem(b3));
span.in_scope(|| {
handlers::search_image(form, opts, pool, tree, api_key).in_current_span()
})
})
} }
pub fn search_hashes( pub fn search_hashes(
@ -44,13 +59,19 @@ pub fn search_hashes(
tree: Tree, tree: Tree,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("hashes") warp::path("hashes")
.and(with_telem()) .and(warp::header::optional::<String>("x-b3"))
.and(warp::get()) .and(warp::get())
.and(warp::query::<HashSearchOpts>()) .and(warp::query::<HashSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_tree(tree))
.and(with_api_key()) .and(with_api_key())
.and_then(handlers::search_hashes) .and_then(|b3, opts, db, tree, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_hashes", ?opts);
span.set_parent(&with_telem(b3));
span.in_scope(|| handlers::search_hashes(opts, db, tree, api_key).in_current_span())
})
} }
pub fn stream_search_image( pub fn stream_search_image(
@ -58,13 +79,19 @@ pub fn stream_search_image(
tree: Tree, tree: Tree,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("stream") warp::path("stream")
.and(with_telem()) .and(warp::header::optional::<String>("x-b3"))
.and(warp::post()) .and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_tree(tree))
.and(with_api_key()) .and(with_api_key())
.and_then(handlers::stream_image) .and_then(|b3, form, pool, tree, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("stream_search_image");
span.set_parent(&with_telem(b3));
span.in_scope(|| handlers::stream_image(form, pool, tree, api_key).in_current_span())
})
} }
pub fn check_handle(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn check_handle(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
@ -87,28 +114,23 @@ fn with_tree(tree: Tree) -> impl Filter<Extract = (Tree,), Error = Infallible> +
warp::any().map(move || tree.clone()) warp::any().map(move || tree.clone())
} }
fn with_telem() -> impl Filter<Extract = (crate::Span,), Error = Rejection> + Clone { fn with_telem(b3: Option<String>) -> opentelemetry::api::context::Context {
warp::any() use opentelemetry::api::{HttpTextFormat, TraceContextExt};
.and(warp::header::optional("traceparent"))
.map(|traceparent: Option<String>| {
use opentelemetry::api::trace::{provider::Provider, tracer::Tracer, propagator::HttpTextFormat};
let mut headers = std::collections::HashMap::new(); let mut carrier = std::collections::HashMap::new();
headers.insert("Traceparent", traceparent.unwrap_or_else(String::new)); if let Some(b3) = b3 {
// It took way too long to realize it's a case-sensitive comparison...
// Looks like it should be fixed in the next release,
// https://github.com/open-telemetry/opentelemetry-rust/pull/148
carrier.insert("X-B3".to_string(), b3);
}
let propagator = opentelemetry::api::distributed_context::http_trace_context_propagator::HTTPTraceContextPropagator::new(); let propagator = opentelemetry::api::B3Propagator::new(true);
let context = propagator.extract(&headers); let parent_context = propagator.extract(&carrier);
tracing::trace!(
"remote span context: {:?}",
parent_context.remote_span_context()
);
tracing::trace!("got context from request: {:?}", context); parent_context
if context.is_valid() {
let tracer = opentelemetry::global::trace_provider().get_tracer("api");
let span = tracer.start("context", Some(context));
tracer.mark_span_as_active(&span);
Some(span)
} else {
None
}
})
} }

View File

@ -76,9 +76,7 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas
(i64::from_be_bytes(buf), hash) (i64::from_be_bytes(buf), hash)
} }
#[tracing::instrument(skip(_telem, form, pool, tree, api_key))]
pub async fn search_image( pub async fn search_image(
_telem: crate::Span,
form: warp::multipart::FormData, form: warp::multipart::FormData,
opts: ImageSearchOpts, opts: ImageSearchOpts,
pool: Pool, pool: Pool,
@ -144,9 +142,7 @@ pub async fn search_image(
Ok(warp::reply::json(&similarity)) Ok(warp::reply::json(&similarity))
} }
#[tracing::instrument(skip(_telem, form, pool, tree, api_key))]
pub async fn stream_image( pub async fn stream_image(
_telem: crate::Span,
form: warp::multipart::FormData, form: warp::multipart::FormData,
pool: Pool, pool: Pool,
tree: Tree, tree: Tree,
@ -181,9 +177,7 @@ fn sse_matches(
Ok(warp::sse::json(items)) Ok(warp::sse::json(items))
} }
#[tracing::instrument(skip(_telem, db, tree, api_key))]
pub async fn search_hashes( pub async fn search_hashes(
_telem: crate::Span,
opts: HashSearchOpts, opts: HashSearchOpts,
db: Pool, db: Pool,
tree: Tree, tree: Tree,
@ -221,9 +215,7 @@ pub async fn search_hashes(
Ok(warp::reply::json(&matches)) Ok(warp::reply::json(&matches))
} }
#[tracing::instrument(skip(_telem, db, api_key))]
pub async fn search_file( pub async fn search_file(
_telem: crate::Span,
opts: FileSearchOpts, opts: FileSearchOpts,
db: Pool, db: Pool,
api_key: String, api_key: String,

View File

@ -12,15 +12,12 @@ mod utils;
use warp::Filter; use warp::Filter;
type Span = Option<opentelemetry::global::BoxedSpan>;
fn configure_tracing() { fn configure_tracing() {
use opentelemetry::{ use opentelemetry::{
api::{KeyValue, Provider, Sampler}, api::{KeyValue, Provider},
exporter::trace::jaeger, sdk::{Config, Sampler},
sdk::Config,
}; };
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::{layer::SubscriberExt, prelude::*};
let env = if cfg!(debug_assertions) { let env = if cfg!(debug_assertions) {
"debug" "debug"
@ -28,38 +25,44 @@ fn configure_tracing() {
"release" "release"
}; };
let exporter = jaeger::Exporter::builder() let fmt_layer = tracing_subscriber::fmt::layer();
.with_collector_endpoint(std::env::var("JAEGER_COLLECTOR").unwrap().parse().unwrap()) let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
.with_process(jaeger::Process { .or_else(|_| tracing_subscriber::EnvFilter::try_new("info"))
service_name: "fuzzysearch", .unwrap();
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.finish();
let registry = tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer);
let exporter = opentelemetry_jaeger::Exporter::builder()
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").unwrap().parse().unwrap())
.with_process(opentelemetry_jaeger::Process {
service_name: "fuzzysearch".to_string(),
tags: vec![ tags: vec![
KeyValue::new("environment", env), KeyValue::new("environment", env),
KeyValue::new("version", env!("CARGO_PKG_VERSION")), KeyValue::new("version", env!("CARGO_PKG_VERSION")),
], ],
}) })
.init(); .init()
.expect("unable to create jaeger exporter");
let provider = opentelemetry::sdk::Provider::builder() let provider = opentelemetry::sdk::Provider::builder()
.with_exporter(exporter) .with_simple_exporter(exporter)
.with_config(Config { .with_config(Config {
default_sampler: Sampler::Always, default_sampler: Box::new(Sampler::Always),
..Default::default() ..Default::default()
}) })
.build(); .build();
opentelemetry::global::set_provider(provider); opentelemetry::global::set_provider(provider);
let tracer = opentelemetry::global::trace_provider().get_tracer("api"); let tracer = opentelemetry::global::trace_provider().get_tracer("fuzzysearch");
let telem_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let registry = registry.with(telem_layer);
let telem_layer = tracing_opentelemetry::OpentelemetryLayer::with_tracer(tracer); registry.init();
let fmt_layer = tracing_subscriber::fmt::Layer::default();
let subscriber = tracing_subscriber::Registry::default()
.with(telem_layer)
.with(fmt_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to set default tracing subscriber");
} }
#[derive(Debug)] #[derive(Debug)]
@ -86,8 +89,6 @@ impl bk_tree::Metric<Node> for Hamming {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
pretty_env_logger::init();
configure_tracing(); configure_tracing();
let s = std::env::var("POSTGRES_DSN").expect("Missing POSTGRES_DSN"); let s = std::env::var("POSTGRES_DSN").expect("Missing POSTGRES_DSN");