Update opentelemetry.

This commit is contained in:
Syfaro 2021-04-22 12:41:08 -04:00
parent 6a0a7f2e2e
commit ca2247b79f
4 changed files with 99 additions and 139 deletions

96
Cargo.lock generated
View File

@ -894,8 +894,9 @@ dependencies = [
"infer", "infer",
"lazy_static", "lazy_static",
"opentelemetry", "opentelemetry",
"opentelemetry-http",
"opentelemetry-jaeger", "opentelemetry-jaeger",
"prometheus 0.12.0", "prometheus",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@ -937,7 +938,7 @@ dependencies = [
"image", "image",
"img_hash", "img_hash",
"lazy_static", "lazy_static",
"prometheus 0.12.0", "prometheus",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@ -961,7 +962,7 @@ dependencies = [
"hyper", "hyper",
"lazy_static", "lazy_static",
"postgres", "postgres",
"prometheus 0.12.0", "prometheus",
"r2d2", "r2d2",
"r2d2_postgres", "r2d2_postgres",
"reqwest", "reqwest",
@ -1276,7 +1277,7 @@ dependencies = [
"httparse", "httparse",
"httpdate", "httpdate",
"itoa", "itoa",
"pin-project 1.0.7", "pin-project",
"socket2", "socket2",
"tokio", "tokio",
"tower-service", "tower-service",
@ -1844,26 +1845,46 @@ dependencies = [
[[package]] [[package]]
name = "opentelemetry" name = "opentelemetry"
version = "0.6.0" version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc49c2e2f36870923d158ff0e357c28e2ff581cb7d8d2181ac27198882ca4132" checksum = "b91cea1dfd50064e52db033179952d18c770cbc5dfefc8eba45d619357ba3914"
dependencies = [ dependencies = [
"async-trait",
"futures", "futures",
"js-sys",
"lazy_static", "lazy_static",
"percent-encoding", "percent-encoding",
"pin-project 0.4.28", "pin-project",
"prometheus 0.7.0", "rand 0.8.3",
"rand 0.7.3", "thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "opentelemetry-http"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba72bd27f28377a2e4f0f15a040c06ab0e70ea73df0689f8adf4d22d04591a1a"
dependencies = [
"async-trait",
"http",
"opentelemetry",
"thiserror",
] ]
[[package]] [[package]]
name = "opentelemetry-jaeger" name = "opentelemetry-jaeger"
version = "0.5.0" version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb48b02de79b2d67c2d04e8a0c6140b63830fc6bfde0fdb1e55e74bbb710f57" checksum = "ddd4984441954f9ebbe3eebdfc6fd4fa95be6400d403171228779b949f3cd918"
dependencies = [ dependencies = [
"async-trait",
"lazy_static",
"opentelemetry", "opentelemetry",
"thiserror",
"thrift", "thrift",
"tokio",
] ]
[[package]] [[package]]
@ -1985,33 +2006,13 @@ dependencies = [
"siphasher", "siphasher",
] ]
[[package]]
name = "pin-project"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "918192b5c59119d51e0cd221f4d49dde9112824ba717369e903c97d076083d0f"
dependencies = [
"pin-project-internal 0.4.28",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.7" version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4" checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
dependencies = [ dependencies = [
"pin-project-internal 1.0.7", "pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
] ]
[[package]] [[package]]
@ -2146,20 +2147,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "prometheus"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1"
dependencies = [
"cfg-if 0.1.10",
"fnv",
"lazy_static",
"protobuf",
"quick-error",
"spin",
]
[[package]] [[package]]
name = "prometheus" name = "prometheus"
version = "0.12.0" version = "0.12.0"
@ -2774,12 +2761,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "sqlformat" name = "sqlformat"
version = "0.1.6" version = "0.1.6"
@ -3174,7 +3155,7 @@ checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"log", "log",
"pin-project 1.0.7", "pin-project",
"tokio", "tokio",
"tungstenite", "tungstenite",
] ]
@ -3238,7 +3219,7 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [ dependencies = [
"pin-project 1.0.7", "pin-project",
"tracing", "tracing",
] ]
@ -3255,12 +3236,11 @@ dependencies = [
[[package]] [[package]]
name = "tracing-opentelemetry" name = "tracing-opentelemetry"
version = "0.5.0" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b64607914c2892f73ebfd19c6777a0dfb3f5457bf3894693dd2125c41b9c208a" checksum = "99003208b647dae59dcefc49c98aecaa3512fbc29351685d4b9ef23a9218458e"
dependencies = [ dependencies = [
"opentelemetry", "opentelemetry",
"rand 0.7.3",
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
@ -3477,7 +3457,7 @@ dependencies = [
"mime_guess", "mime_guess",
"multipart", "multipart",
"percent-encoding", "percent-encoding",
"pin-project 1.0.7", "pin-project",
"scoped-tls", "scoped-tls",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -12,9 +12,10 @@ tracing-futures = "0.2"
prometheus = { version = "0.12", features = ["process"] } prometheus = { version = "0.12", features = ["process"] }
lazy_static = "1" lazy_static = "1"
opentelemetry = "0.6" opentelemetry = { version = "0.13", features = ["rt-tokio"] }
opentelemetry-jaeger = "0.5" opentelemetry-jaeger = { version = "0.12", features = ["tokio"] }
tracing-opentelemetry = "0.5" tracing-opentelemetry = "0.12"
opentelemetry-http = "0.2"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
async-stream = "0.3" async-stream = "0.3"

View File

@ -19,16 +19,16 @@ pub fn search(
pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("file") warp::path("file")
.and(warp::header::optional::<String>("x-b3")) .and(warp::header::headers_cloned())
.and(warp::get()) .and(warp::get())
.and(warp::query::<FileSearchOpts>()) .and(warp::query::<FileSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_api_key()) .and(with_api_key())
.and_then(|b3, opts, db, api_key| { .and_then(|headers, opts, db, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_file", ?opts); let span = tracing::info_span!("search_file", ?opts);
span.set_parent(&with_telem(b3)); span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_file(opts, db, api_key).in_current_span()) span.in_scope(|| handlers::search_file(opts, db, api_key).in_current_span())
}) })
} }
@ -38,18 +38,18 @@ pub fn search_image(
tree: Tree, tree: Tree,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("image") warp::path("image")
.and(warp::header::optional::<String>("x-b3")) .and(warp::header::headers_cloned())
.and(warp::post()) .and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(warp::query::<ImageSearchOpts>()) .and(warp::query::<ImageSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_tree(tree))
.and(with_api_key()) .and(with_api_key())
.and_then(|b3, form, opts, pool, tree, api_key| { .and_then(|headers, form, opts, pool, tree, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_image", ?opts); let span = tracing::info_span!("search_image", ?opts);
span.set_parent(&with_telem(b3)); span.set_parent(with_telem(headers));
span.in_scope(|| { span.in_scope(|| {
handlers::search_image(form, opts, pool, tree, api_key).in_current_span() handlers::search_image(form, opts, pool, tree, api_key).in_current_span()
}) })
@ -74,17 +74,17 @@ pub fn search_hashes(
tree: Tree, tree: Tree,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("hashes") warp::path("hashes")
.and(warp::header::optional::<String>("x-b3")) .and(warp::header::headers_cloned())
.and(warp::get()) .and(warp::get())
.and(warp::query::<HashSearchOpts>()) .and(warp::query::<HashSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_tree(tree))
.and(with_api_key()) .and(with_api_key())
.and_then(|b3, opts, db, tree, api_key| { .and_then(|headers, opts, db, tree, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_hashes", ?opts); let span = tracing::info_span!("search_hashes", ?opts);
span.set_parent(&with_telem(b3)); span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_hashes(opts, db, tree, api_key).in_current_span()) span.in_scope(|| handlers::search_hashes(opts, db, tree, api_key).in_current_span())
}) })
} }
@ -94,33 +94,33 @@ pub fn stream_search_image(
tree: Tree, tree: Tree,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("stream") warp::path("stream")
.and(warp::header::optional::<String>("x-b3")) .and(warp::header::headers_cloned())
.and(warp::post()) .and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_tree(tree))
.and(with_api_key()) .and(with_api_key())
.and_then(|b3, form, pool, tree, api_key| { .and_then(|headers, form, pool, tree, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("stream_search_image"); let span = tracing::info_span!("stream_search_image");
span.set_parent(&with_telem(b3)); span.set_parent(with_telem(headers));
span.in_scope(|| handlers::stream_image(form, pool, tree, api_key).in_current_span()) span.in_scope(|| handlers::stream_image(form, pool, tree, api_key).in_current_span())
}) })
} }
pub fn search_video(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn search_video(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("video") warp::path("video")
.and(warp::header::optional::<String>("x-b3")) .and(warp::header::headers_cloned())
.and(warp::post()) .and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(with_pool(db)) .and(with_pool(db))
.and(with_api_key()) .and(with_api_key())
.and_then(|b3, form, db, api_key| { .and_then(|headers, form, db, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_video"); let span = tracing::info_span!("search_video");
span.set_parent(&with_telem(b3)); span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_video(form, db, api_key).in_current_span()) span.in_scope(|| handlers::search_video(form, db, api_key).in_current_span())
}) })
} }
@ -145,23 +145,12 @@ fn with_tree(tree: Tree) -> impl Filter<Extract = (Tree,), Error = Infallible> +
warp::any().map(move || tree.clone()) warp::any().map(move || tree.clone())
} }
fn with_telem(b3: Option<String>) -> opentelemetry::api::context::Context { fn with_telem(headers: warp::http::HeaderMap) -> opentelemetry::Context {
use opentelemetry::api::{HttpTextFormat, TraceContextExt}; let remote_context = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&opentelemetry_http::HeaderExtractor(&headers))
});
let mut carrier = std::collections::HashMap::new(); tracing::trace!(?remote_context, "Got remote context");
if let Some(b3) = b3 {
// It took way too long to realize it's a case-sensitive comparison...
// Looks like it should be fixed in the next release,
// https://github.com/open-telemetry/opentelemetry-rust/pull/148
carrier.insert("X-B3".to_string(), b3);
}
let propagator = opentelemetry::api::B3Propagator::new(true); remote_context
let parent_context = propagator.extract(&carrier);
tracing::trace!(
"remote span context: {:?}",
parent_context.remote_span_context()
);
parent_context
} }

View File

@ -76,61 +76,51 @@ async fn main() {
} }
fn configure_tracing() { fn configure_tracing() {
use opentelemetry::{ use opentelemetry::KeyValue;
api::{KeyValue, Provider}, use tracing_subscriber::layer::SubscriberExt;
sdk::{Config, Sampler},
};
use tracing_subscriber::{layer::SubscriberExt, prelude::*};
let env = if cfg!(debug_assertions) { let env = std::env::var("ENVIRONMENT");
let env = if let Ok(env) = env.as_ref() {
env.as_str()
} else if cfg!(debug_assertions) {
"debug" "debug"
} else { } else {
"release" "release"
}; };
let fmt_layer = tracing_subscriber::fmt::layer(); opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("info")) let tracer = opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
.with_service_name("fuzzysearch")
.with_tags(vec![
KeyValue::new("environment", env.to_owned()),
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
])
.install_batch(opentelemetry::runtime::Tokio)
.unwrap(); .unwrap();
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.finish();
let registry = tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer);
let exporter = opentelemetry_jaeger::Exporter::builder() let trace = tracing_opentelemetry::layer().with_tracer(tracer);
.with_agent_endpoint( let env_filter = tracing_subscriber::EnvFilter::from_default_env();
std::env::var("JAEGER_COLLECTOR")
.expect("Missing JAEGER_COLLECTOR")
.parse()
.unwrap(),
)
.with_process(opentelemetry_jaeger::Process {
service_name: "fuzzysearch".to_string(),
tags: vec![
KeyValue::new("environment", env),
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
],
})
.init()
.expect("unable to create jaeger exporter");
let provider = opentelemetry::sdk::Provider::builder() if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) {
.with_simple_exporter(exporter) let subscriber = tracing_subscriber::fmt::layer()
.with_config(Config { .json()
default_sampler: Box::new(Sampler::Always), .with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
..Default::default() .with_target(true);
}) let subscriber = tracing_subscriber::Registry::default()
.build(); .with(env_filter)
.with(trace)
opentelemetry::global::set_provider(provider); .with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
let tracer = opentelemetry::global::trace_provider().get_tracer("fuzzysearch"); } else {
let telem_layer = tracing_opentelemetry::layer().with_tracer(tracer); let subscriber = tracing_subscriber::fmt::layer();
let registry = registry.with(telem_layer); let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
registry.init(); .with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
}
} }
async fn metrics( async fn metrics(