Improve error handling.

This commit is contained in:
Syfaro 2022-10-12 23:19:15 -04:00
parent 76504cdc28
commit f9a0a21f92

View File

@ -296,6 +296,7 @@ async fn search_nats(
tracing::info!("subscribing to searches");
let client = Arc::new(client);
let max_distance = config.max_distance;
let mut sub = client
.queue_subscribe("bkapi.search".to_string(), "bkapi-search".to_string())
@ -312,30 +313,54 @@ async fn search_nats(
}
};
let payloads: Vec<SearchPayload> =
serde_json::from_slice(&message.payload).map_err(Error::Data)?;
let tree = tree.clone();
let client = client.clone();
let max_distance = config.max_distance;
tokio::task::spawn(
async move {
let hashes = payloads.into_iter().map(|payload| tree::HashDistance {
hash: payload.hash,
distance: payload.distance.clamp(0, max_distance),
});
let results = tree.find(hashes).await;
client
.publish(reply, serde_json::to_vec(&results).unwrap_or_log().into())
.await
.unwrap_or_log();
}
.in_current_span(),
);
if let Err(err) = handle_search_nats(
max_distance,
client.clone(),
tree.clone(),
reply,
&message.payload,
)
.await
{
tracing::error!("could not handle nats search: {err}");
}
}
Ok(())
}
async fn handle_search_nats(
max_distance: u32,
client: Arc<async_nats::Client>,
tree: tree::Tree,
reply: String,
payload: &[u8],
) -> Result<(), Error> {
let payloads: Vec<SearchPayload> = serde_json::from_slice(payload).map_err(Error::Data)?;
tokio::task::spawn(
async move {
let hashes = payloads.into_iter().map(|payload| tree::HashDistance {
hash: payload.hash,
distance: payload.distance.clamp(0, max_distance),
});
let results = tree.find(hashes).await;
if let Err(err) = client
.publish(
reply,
serde_json::to_vec(&results)
.expect_or_log("results could not be serialized")
.into(),
)
.await
{
tracing::error!("could not publish results: {err}");
}
}
.in_current_span(),
);
Ok(())
}