diff --git a/bkapi/src/main.rs b/bkapi/src/main.rs index d12fe88..a477d2a 100644 --- a/bkapi/src/main.rs +++ b/bkapi/src/main.rs @@ -296,6 +296,7 @@ async fn search_nats( tracing::info!("subscribing to searches"); let client = Arc::new(client); + let max_distance = config.max_distance; let mut sub = client .queue_subscribe("bkapi.search".to_string(), "bkapi-search".to_string()) @@ -312,30 +313,54 @@ async fn search_nats( } }; - let payloads: Vec = - serde_json::from_slice(&message.payload).map_err(Error::Data)?; - - let tree = tree.clone(); - let client = client.clone(); - let max_distance = config.max_distance; - - tokio::task::spawn( - async move { - let hashes = payloads.into_iter().map(|payload| tree::HashDistance { - hash: payload.hash, - distance: payload.distance.clamp(0, max_distance), - }); - - let results = tree.find(hashes).await; - - client - .publish(reply, serde_json::to_vec(&results).unwrap_or_log().into()) - .await - .unwrap_or_log(); - } - .in_current_span(), - ); + if let Err(err) = handle_search_nats( + max_distance, + client.clone(), + tree.clone(), + reply, + &message.payload, + ) + .await + { + tracing::error!("could not handle nats search: {err}"); + } } Ok(()) } + +async fn handle_search_nats( + max_distance: u32, + client: Arc, + tree: tree::Tree, + reply: String, + payload: &[u8], +) -> Result<(), Error> { + let payloads: Vec = serde_json::from_slice(payload).map_err(Error::Data)?; + + tokio::task::spawn( + async move { + let hashes = payloads.into_iter().map(|payload| tree::HashDistance { + hash: payload.hash, + distance: payload.distance.clamp(0, max_distance), + }); + + let results = tree.find(hashes).await; + + if let Err(err) = client + .publish( + reply, + serde_json::to_vec(&results) + .expect_or_log("results could not be serialized") + .into(), + ) + .await + { + tracing::error!("could not publish results: {err}"); + } + } + .in_current_span(), + ); + + Ok(()) +}