From cc8a88fe67897638c55146d927b312c09f737b2d Mon Sep 17 00:00:00 2001 From: Syfaro Date: Tue, 4 Jan 2022 23:09:07 -0500 Subject: [PATCH] Significant refactoring. --- fuzzysearch-api/queries/lookup_api_key.sql | 5 +- fuzzysearch-api/src/api/auth.rs | 47 +++ fuzzysearch-api/src/api/mod.rs | 282 ++++++++++++++++++ fuzzysearch-api/src/main.rs | 325 ++++++++------------- fuzzysearch-common/src/trace.rs | 23 +- 5 files changed, 462 insertions(+), 220 deletions(-) create mode 100644 fuzzysearch-api/src/api/auth.rs create mode 100644 fuzzysearch-api/src/api/mod.rs diff --git a/fuzzysearch-api/queries/lookup_api_key.sql b/fuzzysearch-api/queries/lookup_api_key.sql index ae69d66..eae6670 100644 --- a/fuzzysearch-api/queries/lookup_api_key.sql +++ b/fuzzysearch-api/queries/lookup_api_key.sql @@ -1,12 +1,11 @@ SELECT api_key.id, + api_key.user_id, api_key.name_limit, api_key.image_limit, api_key.hash_limit, - api_key.name, - account.email owner_email + api_key.name FROM api_key - JOIN account ON account.id = api_key.user_id WHERE api_key.key = $1 diff --git a/fuzzysearch-api/src/api/auth.rs b/fuzzysearch-api/src/api/auth.rs new file mode 100644 index 0000000..2417ff0 --- /dev/null +++ b/fuzzysearch-api/src/api/auth.rs @@ -0,0 +1,47 @@ +use poem::Request; +use poem_openapi::{auth::ApiKey, SecurityScheme}; + +use crate::Pool; + +/// Simple authentication using a static API key. Must be manually requested. +#[derive(SecurityScheme)] +#[oai( + type = "api_key", + key_name = "X-Api-Key", + in = "header", + checker = "api_checker" +)] +pub(crate) struct ApiKeyAuthorization(pub(crate) UserApiKey); + +pub(crate) struct UserApiKey { + pub(crate) id: i32, + pub(crate) name: Option, + pub(crate) user_id: i32, + pub(crate) name_limit: i16, + pub(crate) image_limit: i16, + pub(crate) hash_limit: i16, +} + +#[tracing::instrument(skip(req, api_key))] +async fn api_checker(req: &Request, api_key: ApiKey) -> Option { + let pool: &Pool = req.data().unwrap(); + + let user_api_key = sqlx::query_file_as!(UserApiKey, "queries/lookup_api_key.sql", api_key.key) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(user_api_key) = user_api_key.as_ref() { + tracing::debug!( + api_key_id = user_api_key.id, + app_name = user_api_key.name.as_deref().unwrap_or("unknown"), + owner_id = %user_api_key.user_id, + "found valid api key" + ); + } else { + tracing::warn!("request had invalid api key: {}", api_key.key); + } + + user_api_key +} diff --git a/fuzzysearch-api/src/api/mod.rs b/fuzzysearch-api/src/api/mod.rs new file mode 100644 index 0000000..a680d5f --- /dev/null +++ b/fuzzysearch-api/src/api/mod.rs @@ -0,0 +1,282 @@ +use bkapi_client::BKApiClient; +use bytes::BufMut; +use poem_openapi::{ + param::{Path, Query}, + payload::{Json, Response}, + types::ToJSON, + ApiResponse, Object, +}; + +use crate::{ + hash_input, lookup_hashes, rate_limit, Endpoints, FurAffinityFile, HashLookupResult, + ImageSearchPayload, ImageSearchResult, ImageSearchType, KnownServiceName, Pool, + ResponseRateLimitHeaders, +}; + +mod auth; + +pub(crate) use auth::ApiKeyAuthorization; + +#[derive(Object)] +pub(crate) struct RateLimitResponse { + bucket: String, + retry_after: i32, +} + +#[derive(Object)] +pub(crate) struct BadRequestResponse { + message: String, +} + +#[derive(ApiResponse)] +pub(crate) enum RateLimitedResponse +where + T: ToJSON, + E: ToJSON, +{ + /// The request was successful. + #[oai(status = 200)] + Available(Json), + + /// The request was not valid. A description can be found within the + /// resulting message. + #[oai(status = 400)] + BadRequest(Json), + + /// The API key has exhaused the maximum permissible requests for the + /// current time window. The `retry_after` field contains the number of + /// seconds before a request is likely to succeed. + #[oai(status = 429)] + Limited(Json), +} + +impl RateLimitedResponse +where + T: ToJSON, + E: ToJSON, +{ + pub(crate) fn available(json: T) -> Response { + Self::Available(Json(json)).response() + } + + pub(crate) fn limited(bucket: S, retry_after: i32) -> Response { + Self::Limited(Json(RateLimitResponse { + bucket: bucket.to_string(), + retry_after, + })) + .response() + } + + fn response(self) -> Response { + Response::new(self) + } +} + +impl RateLimitedResponse +where + T: ToJSON, +{ + pub(crate) fn bad_request(message: M) -> Response { + Self::BadRequest(Json(BadRequestResponse { + message: message.to_string(), + })) + .response() + } +} + +#[tracing::instrument(err, skip(pool, bkapi, auth, hashes, distance), fields(hashes = %hashes.0, distance = ?distance.0))] +pub(crate) async fn hashes( + pool: &Pool, + bkapi: &BKApiClient, + auth: ApiKeyAuthorization, + hashes: Query, + distance: Query>, +) -> poem::Result>>> { + let hashes: Vec = hashes + .0 + .split(',') + .take(10) + .filter_map(|hash| hash.parse().ok()) + .collect(); + + if hashes.is_empty() { + return Ok(RateLimitedResponse::bad_request("hashes must be provided")); + } + + let image_remaining = rate_limit!(auth, pool, image_limit, "image", hashes.len() as i16); + + let results = lookup_hashes(pool, bkapi, &hashes, distance.unwrap_or(3)).await?; + + let resp = + RateLimitedResponse::available(results).inject_rate_limit_headers("image", image_remaining); + + Ok(resp) +} + +#[tracing::instrument(err, skip(pool, bkapi, client, endpoints, auth, search_type, payload))] +pub(crate) async fn image( + pool: &Pool, + bkapi: &BKApiClient, + client: &reqwest::Client, + endpoints: &Endpoints, + auth: ApiKeyAuthorization, + search_type: Query>, + payload: ImageSearchPayload, +) -> poem::Result>> { + let image_remaining = rate_limit!(auth, pool, image_limit, "image"); + let hash_remaining = rate_limit!(auth, pool, hash_limit, "hash"); + + let stream = tokio_util::io::ReaderStream::new(payload.image.into_async_read()); + let body = reqwest::Body::wrap_stream(stream); + + let hash = hash_input(client, &endpoints.hash_input, body).await?; + + let search_type = search_type.0.unwrap_or(ImageSearchType::Close); + let hashes = vec![hash]; + + let mut results = { + if search_type == ImageSearchType::Force { + tracing::debug!("search type is force, starting with distance of 10"); + lookup_hashes(pool, bkapi, &hashes, 10).await? + } else { + tracing::debug!("close or exact search type, starting with distance of 0"); + let results = lookup_hashes(pool, bkapi, &hashes, 0).await?; + + if results.is_empty() && search_type != ImageSearchType::Exact { + tracing::debug!("results were empty and type is not force, expanding search"); + lookup_hashes(pool, bkapi, &hashes, 10).await? + } else { + tracing::debug!("results were not empty or search type was force, ending search"); + results + } + } + }; + + tracing::info!("search ended with {} results", results.len()); + + results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap()); + + let resp = RateLimitedResponse::available(ImageSearchResult { + hash, + matches: results, + }) + .header("x-image-hash", hash) + .inject_rate_limit_headers("image", image_remaining) + .inject_rate_limit_headers("hash", hash_remaining); + + Ok(resp) +} + +#[tracing::instrument(err, skip(pool, bkapi, client, endpoints, auth, url, distance), fields(url = %url.0, distance = ?distance.0))] +pub(crate) async fn url( + pool: &Pool, + bkapi: &BKApiClient, + client: &reqwest::Client, + endpoints: &Endpoints, + auth: ApiKeyAuthorization, + url: Query, + distance: Query>, +) -> poem::Result>> { + let image_remaining = rate_limit!(auth, pool, image_limit, "image"); + let hash_remaining = rate_limit!(auth, pool, hash_limit, "hash"); + + let mut resp = client + .get(&url.0) + .send() + .await + .map_err(crate::Error::from)?; + + let distance = distance.unwrap_or(3); + + let content_length = resp + .headers() + .get("content-length") + .and_then(|len| { + String::from_utf8_lossy(len.as_bytes()) + .parse::() + .ok() + }) + .unwrap_or(0); + + if content_length > 10_000_000 { + return Ok(RateLimitedResponse::bad_request(format!( + "image too large: {} bytes", + content_length + ))); + } + + let mut buf = bytes::BytesMut::with_capacity(content_length); + + while let Some(chunk) = resp.chunk().await.map_err(crate::Error::from)? { + if buf.len() + chunk.len() > 10_000_000 { + return Ok(RateLimitedResponse::bad_request(format!( + "image too large: {}+ bytes", + buf.len() + chunk.len() + ))); + } + + buf.put(chunk); + } + + let body = reqwest::Body::from(buf.to_vec()); + let hash = hash_input(client, &endpoints.hash_input, body).await?; + + let results = lookup_hashes(pool, bkapi, &[hash], distance).await?; + + let resp = RateLimitedResponse::available(ImageSearchResult { + hash, + matches: results, + }) + .header("x-image-hash", hash) + .inject_rate_limit_headers("image", image_remaining) + .inject_rate_limit_headers("hash", hash_remaining); + + Ok(resp) +} + +#[tracing::instrument(err, skip(pool, auth, file_id), fields(file_id = %file_id.0))] +pub(crate) async fn furaffinity_data( + pool: &Pool, + auth: ApiKeyAuthorization, + file_id: Query, +) -> poem::Result>>> { + let file_remaining = rate_limit!(auth, pool, image_limit, "file"); + + let matches = sqlx::query_file!("queries/lookup_furaffinity_file_id.sql", file_id.0) + .map(|row| FurAffinityFile { + id: row.id, + url: row.url, + filename: row.filename, + file_id: row.file_id, + rating: row.rating.and_then(|rating| rating.parse().ok()), + posted_at: row.posted_at, + artist: Some(row.artist), + hash: row.hash, + }) + .fetch_all(pool) + .await + .map_err(crate::Error::from)?; + + let resp = + RateLimitedResponse::available(matches).inject_rate_limit_headers("file", file_remaining); + + Ok(resp) +} + +#[tracing::instrument(err, skip(pool, service, handle), fields(service = %service.0, handle = %handle.0))] +pub(crate) async fn known_service( + pool: &Pool, + service: Path, + handle: Query, +) -> poem::Result> { + let handle_exists = match service.0 { + KnownServiceName::Twitter => { + sqlx::query_file_scalar!("queries/handle_twitter.sql", handle.0) + .fetch_one(pool) + .await + .map_err(poem::error::InternalServerError)? + } + }; + + Ok(Json(handle_exists)) +} diff --git a/fuzzysearch-api/src/main.rs b/fuzzysearch-api/src/main.rs index 482974f..59b14e5 100644 --- a/fuzzysearch-api/src/main.rs +++ b/fuzzysearch-api/src/main.rs @@ -1,17 +1,18 @@ -use std::{borrow::Cow, str::FromStr}; +use std::{borrow::Cow, fmt::Display, str::FromStr}; +use api::ApiKeyAuthorization; use bkapi_client::BKApiClient; -use bytes::BufMut; use hyper::StatusCode; -use poem::{error::ResponseError, listener::TcpListener, web::Data, EndpointExt, Request, Route}; +use poem::{error::ResponseError, listener::TcpListener, web::Data, EndpointExt, Route}; use poem_openapi::{ - auth::ApiKey, param::{Path, Query}, payload::{Json, Response}, types::multipart::Upload, - Multipart, Object, OneOf, OpenApi, OpenApiService, SecurityScheme, + Multipart, Object, OneOf, OpenApi, OpenApiService, }; +mod api; + type Pool = sqlx::PgPool; #[derive(Clone)] @@ -22,41 +23,20 @@ pub struct Endpoints { struct Api; -/// Simple authentication using a static API key. Must be manually requested. -#[derive(SecurityScheme)] -#[oai( - type = "api_key", - key_name = "X-Api-Key", - in = "header", - checker = "api_checker" -)] -struct ApiKeyAuthorization(UserApiKey); - -struct UserApiKey { - id: i32, - name: Option, - owner_email: String, - name_limit: i16, - image_limit: i16, - hash_limit: i16, -} - -async fn api_checker(req: &Request, api_key: ApiKey) -> Option { - let pool: &Pool = req.data().unwrap(); - - sqlx::query_file_as!(UserApiKey, "queries/lookup_api_key.sql", api_key.key) - .fetch_optional(pool) - .await - .ok() - .flatten() -} - #[derive(poem_openapi::Enum, Debug, PartialEq)] #[oai(rename_all = "snake_case")] enum KnownServiceName { Twitter, } +impl Display for KnownServiceName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Twitter => write!(f, "Twitter"), + } + } +} + #[derive(poem_openapi::Enum, Debug)] #[oai(rename_all = "lowercase")] enum Rating { @@ -139,7 +119,10 @@ enum Error { impl ResponseError for Error { fn status(&self) -> hyper::StatusCode { - hyper::StatusCode::INTERNAL_SERVER_ERROR + match self { + Self::BadRequest(_) => hyper::StatusCode::BAD_REQUEST, + _ => hyper::StatusCode::INTERNAL_SERVER_ERROR, + } } } @@ -163,18 +146,6 @@ impl ResponseError for BadRequest { } } -#[derive(Debug, thiserror::Error)] -#[error("rate limited")] -struct RateLimited { - bucket: String, -} - -impl ResponseError for RateLimited { - fn status(&self) -> hyper::StatusCode { - hyper::StatusCode::TOO_MANY_REQUESTS - } -} - /// The status of an API key's rate limit. #[derive(Debug, PartialEq)] pub enum RateLimit { @@ -215,28 +186,28 @@ async fn update_rate_limit( } } +#[macro_export] macro_rules! rate_limit { ($api_key:expr, $db:expr, $limit:tt, $group:expr) => { rate_limit!($api_key, $db, $limit, $group, 1) }; ($api_key:expr, $db:expr, $limit:tt, $group:expr, $incr_by:expr) => {{ - let rate_limit = update_rate_limit($db, $api_key.0.id, $api_key.0.$limit, $group, $incr_by) - .await - .map_err(Error::from)?; + let rate_limit = + crate::update_rate_limit($db, $api_key.0.id, $api_key.0.$limit, $group, $incr_by) + .await + .map_err(crate::Error::from)?; match rate_limit { - RateLimit::Limited => { - return Err(RateLimited { - bucket: $group.to_string(), - } - .into()) + crate::RateLimit::Limited => { + return Ok(crate::api::RateLimitedResponse::limited($group, 60)) } - RateLimit::Available(count) => count, + crate::RateLimit::Available(count) => count, } }}; } +#[tracing::instrument(err, skip(pool, bkapi))] async fn lookup_hashes( pool: &Pool, bkapi: &BKApiClient, @@ -247,6 +218,8 @@ async fn lookup_hashes( return Err(BadRequest::with_message(format!("distance too large: {}", distance)).into()); } + tracing::info!("looking up {} hashes", hashes.len()); + let index_hashes: Vec<_> = bkapi .search_many(hashes, distance) .await? @@ -262,6 +235,15 @@ async fn lookup_hashes( }) .collect(); + tracing::info!("found {} results in bkapi index", index_hashes.len()); + tracing::trace!( + "bkapi matches: {:?}", + index_hashes + .iter() + .map(|hash| hash.found_hash) + .collect::>() + ); + let data = serde_json::to_value(index_hashes)?; let results = sqlx::query_file!("queries/lookup_hashes.sql", data) @@ -294,6 +276,9 @@ async fn lookup_hashes( .fetch_all(pool) .await?; + tracing::info!("found {} matches from database", results.len()); + tracing::trace!("database matches: {:?}", results); + Ok(results) } @@ -302,6 +287,7 @@ struct ImageSearchPayload { image: Upload, } +#[tracing::instrument(skip(client, hash_input_endpoint, image))] async fn hash_input( client: &reqwest::Client, hash_input_endpoint: &str, @@ -310,6 +296,8 @@ async fn hash_input( let part = reqwest::multipart::Part::stream(image); let form = reqwest::multipart::Form::new().part("image", part); + tracing::info!("sending image for hashing"); + let resp = client .post(hash_input_endpoint) .multipart(form) @@ -317,12 +305,21 @@ async fn hash_input( .await?; if resp.status() != StatusCode::OK { + tracing::warn!("got wrong status code: {}", resp.status()); return Err(BadRequest::with_message("invalid image").into()); } - match resp.text().await?.parse() { - Ok(hash) => Ok(hash), - Err(_err) => Err(BadRequest::with_message("invalid image").into()), + let text = resp.text().await?; + + match text.parse() { + Ok(hash) => { + tracing::debug!("image had hash {}", hash); + Ok(hash) + } + Err(_err) => { + tracing::warn!("got invalid data: {}", text); + Err(BadRequest::with_message("invalid image").into()) + } } } @@ -352,11 +349,38 @@ struct FurAffinityFile { hash: Option, } +trait ResponseRateLimitHeaders +where + Self: Sized, +{ + fn inject_rate_limit_headers(self, name: &'static str, remaining: (i16, i16)) -> Self; +} + +impl ResponseRateLimitHeaders for poem_openapi::payload::Response { + fn inject_rate_limit_headers(self, name: &'static str, remaining: (i16, i16)) -> Self { + self.header(&format!("x-rate-limit-total-{}", name), remaining.1) + .header(&format!("x-rate-limit-remaining-{}", name), remaining.0) + } +} + +/// LimitsResponse +/// +/// The allowed number of requests per minute for an API key. +#[derive(Object, Debug)] +struct LimitsResponse { + /// The number of name lookups. + name: i16, + /// The number of hash lookups. + image: i16, + /// The number of image hashes. + hash: i16, +} + #[OpenApi] impl Api { /// Lookup images by hash /// - /// Perform a lookup for up to 10 given hashes. + /// Perform a lookup using up to 10 hashes. #[oai(path = "/hashes", method = "get")] async fn hashes( &self, @@ -365,27 +389,8 @@ impl Api { auth: ApiKeyAuthorization, hashes: Query, distance: Query>, - ) -> poem::Result>>> { - let hashes: Vec = hashes - .0 - .split(',') - .take(10) - .filter_map(|hash| hash.parse().ok()) - .collect(); - - let image_remaining = rate_limit!(auth, pool.0, image_limit, "image", hashes.len() as i16); - - if hashes.is_empty() { - return Err(BadRequest::with_message("hashes must be provided").into()); - } - - let results = lookup_hashes(&pool, &bkapi, &hashes, distance.unwrap_or(3)).await?; - - let resp = Response::new(Json(results)) - .header("x-rate-limit-total-image", image_remaining.1) - .header("x-rate-limit-remaining-image", image_remaining.0); - - Ok(resp) + ) -> poem::Result>>> { + api::hashes(pool.0, bkapi.0, auth, hashes, distance).await } /// Lookup images by image @@ -401,45 +406,17 @@ impl Api { auth: ApiKeyAuthorization, search_type: Query>, payload: ImageSearchPayload, - ) -> poem::Result>> { - let image_remaining = rate_limit!(auth, pool.0, image_limit, "image"); - let hash_remaining = rate_limit!(auth, pool.0, hash_limit, "hash"); - - let stream = tokio_util::io::ReaderStream::new(payload.image.into_async_read()); - let body = reqwest::Body::wrap_stream(stream); - - let hash = hash_input(&client, &endpoints.hash_input, body).await?; - - let search_type = search_type.0.unwrap_or(ImageSearchType::Close); - let hashes = vec![hash]; - - let mut results = { - if search_type == ImageSearchType::Force { - lookup_hashes(pool.0, bkapi.0, &hashes, 10).await? - } else { - let results = lookup_hashes(pool.0, bkapi.0, &hashes, 0).await?; - - if results.is_empty() && search_type != ImageSearchType::Exact { - lookup_hashes(pool.0, bkapi.0, &hashes, 10).await? - } else { - results - } - } - }; - - results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap()); - - let resp = Response::new(Json(ImageSearchResult { - hash, - matches: results, - })) - .header("x-image-hash", hash) - .header("x-rate-limit-total-image", image_remaining.1) - .header("x-rate-limit-remaining-image", image_remaining.0) - .header("x-rate-limit-total-hash", hash_remaining.1) - .header("x-rate-limit-remaining-hash", hash_remaining.0); - - Ok(resp) + ) -> poem::Result>> { + api::image( + pool.0, + bkapi.0, + client.0, + endpoints.0, + auth, + search_type, + payload, + ) + .await } /// Lookup images by image URL @@ -455,62 +432,8 @@ impl Api { auth: ApiKeyAuthorization, url: Query, distance: Query>, - ) -> poem::Result>> { - let image_remaining = rate_limit!(auth, pool.0, image_limit, "image"); - let hash_remaining = rate_limit!(auth, pool.0, hash_limit, "hash"); - - let mut resp = client.get(&url.0).send().await.map_err(Error::from)?; - - let distance = distance.unwrap_or(3); - - let content_length = resp - .headers() - .get("content-length") - .and_then(|len| { - String::from_utf8_lossy(len.as_bytes()) - .parse::() - .ok() - }) - .unwrap_or(0); - - if content_length > 10_000_000 { - return Err(BadRequest::with_message(format!( - "image too large: {} bytes", - content_length - )) - .into()); - } - - let mut buf = bytes::BytesMut::with_capacity(content_length); - - while let Some(chunk) = resp.chunk().await.map_err(Error::from)? { - if buf.len() + chunk.len() > 10_000_000 { - return Err(BadRequest::with_message(format!( - "image too large: {} bytes", - content_length - )) - .into()); - } - - buf.put(chunk); - } - - let body = reqwest::Body::from(buf.to_vec()); - let hash = hash_input(&client, &endpoints.hash_input, body).await?; - - let results = lookup_hashes(pool.0, bkapi.0, &[hash], distance).await?; - - let resp = Response::new(Json(ImageSearchResult { - hash, - matches: results, - })) - .header("x-image-hash", hash) - .header("x-rate-limit-total-image", image_remaining.1) - .header("x-rate-limit-remaining-image", image_remaining.0) - .header("x-rate-limit-total-hash", hash_remaining.1) - .header("x-rate-limit-remaining-hash", hash_remaining.0); - - Ok(resp) + ) -> poem::Result>> { + api::url(pool.0, bkapi.0, client.0, endpoints.0, auth, url, distance).await } /// Lookup FurAffinity submission by File ID @@ -520,29 +443,21 @@ impl Api { pool: Data<&Pool>, auth: ApiKeyAuthorization, file_id: Query, - ) -> poem::Result>>> { - let file_remaining = rate_limit!(auth, pool.0, image_limit, "file"); + ) -> poem::Result>>> { + api::furaffinity_data(pool.0, auth, file_id).await + } - let matches = sqlx::query_file!("queries/lookup_furaffinity_file_id.sql", file_id.0) - .map(|row| FurAffinityFile { - id: row.id, - url: row.url, - filename: row.filename, - file_id: row.file_id, - rating: row.rating.and_then(|rating| rating.parse().ok()), - posted_at: row.posted_at, - artist: Some(row.artist), - hash: row.hash, - }) - .fetch_all(pool.0) - .await - .map_err(Error::from)?; - - let resp = Response::new(Json(matches)) - .header("x-rate-limit-total-file", file_remaining.1) - .header("x-rate-limit-remaining-file", file_remaining.0); - - Ok(resp) + /// Check API key limits + /// + /// Determine the number of allowed requests per minute for the current + /// API token. + #[oai(path = "/limits", method = "get")] + async fn limits(&self, auth: ApiKeyAuthorization) -> Json { + Json(LimitsResponse { + name: auth.0.name_limit, + image: auth.0.image_limit, + hash: auth.0.hash_limit, + }) } /// Check if a handle is known for a given service @@ -556,16 +471,7 @@ impl Api { service: Path, handle: Query, ) -> poem::Result> { - let handle_exists = match service.0 { - KnownServiceName::Twitter => { - sqlx::query_file_scalar!("queries/handle_twitter.sql", handle.0) - .fetch_one(pool.0) - .await - .map_err(poem::error::InternalServerError)? - } - }; - - Ok(Json(handle_exists)) + api::known_service(pool.0, service, handle).await } } @@ -607,6 +513,9 @@ async fn main() { .data(reqwest::Client::new()) .with(poem::middleware::Tracing) .with(poem::middleware::OpenTelemetryMetrics::new()) + .with(poem::middleware::OpenTelemetryTracing::new( + fuzzysearch_common::trace::get_tracer("fuzzysearch-api"), + )) .with(cors); poem::Server::new(TcpListener::bind("0.0.0.0:8080")) diff --git a/fuzzysearch-common/src/trace.rs b/fuzzysearch-common/src/trace.rs index 2383596..933ad0b 100644 --- a/fuzzysearch-common/src/trace.rs +++ b/fuzzysearch-common/src/trace.rs @@ -1,9 +1,6 @@ -pub fn configure_tracing(service_name: &'static str) { - use opentelemetry::KeyValue; - use tracing_subscriber::layer::SubscriberExt; - - tracing_log::LogTracer::init().unwrap(); +use opentelemetry::KeyValue; +pub fn get_tracer(service_name: &'static str) -> opentelemetry::sdk::trace::Tracer { let env = std::env::var("ENVIRONMENT"); let env = if let Ok(env) = env.as_ref() { env.as_str() @@ -13,9 +10,7 @@ pub fn configure_tracing(service_name: &'static str) { "release" }; - opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - - let tracer = opentelemetry_jaeger::new_pipeline() + opentelemetry_jaeger::new_pipeline() .with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR")) .with_service_name(service_name) .with_tags(vec![ @@ -23,7 +18,17 @@ pub fn configure_tracing(service_name: &'static str) { KeyValue::new("version", env!("CARGO_PKG_VERSION")), ]) .install_batch(opentelemetry::runtime::Tokio) - .unwrap(); + .unwrap() +} + +pub fn configure_tracing(service_name: &'static str) { + use tracing_subscriber::layer::SubscriberExt; + + tracing_log::LogTracer::init().unwrap(); + + opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + + let tracer = get_tracer(service_name); let trace = tracing_opentelemetry::layer().with_tracer(tracer); let env_filter = tracing_subscriber::EnvFilter::from_default_env();