diff --git a/README.md b/README.md index 6c684cd..e7ff8b8 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,63 @@ A fast way to look up hamming distance hashes. -It operates by connecting to a PostgreSQL database (`DATABASE_URL`), selecting -every column of a provided query (`DATABASE_QUERY`), subscribing to events -(`DATABASE_SUBSCRIBE`), and holding everything in a BK tree. +## Querying + +There are two interfaces for querying, both of which can be used simultaneously. + +### HTTP It provides a single API endpoint, `/search` which takes in a `hash` and -`distance` query parameter to search for matches. +`distance` query parameter to search for matches. The response looks like this: + +```jsonc +{ + "hash": 8525958787074004077, // Searched hash + "distance": 3, // Maximum search distance + "hashes": [ // Results + {"hash": 8525958787073873005, "distance": 1} + ] +} +``` + +### NATS + +It listens on `$NATS_PREFIX.bkapi.search` for a payload like: + +```json +[ + {"hash": 12345, "distance": 3} +] +``` + +Each input will have a corresponding entry in the response: + +```jsonc +[ + [ // Results for first input + {"hash": 8525958787073873005, "distance": 1} + ] +] +``` + +## Initial Load + +The initial entries are populated through the databaes query `$DATABASE_QUERY`. + +## Listening + +It can be configured to listen to PostgreSQL notifications OR with NATS +messages. + +### PostgreSQL + +It subscribes to `$DATABASE_SUBSCRIBE`, expecting events to be a JSON object +containing the hash. + +### NATS + +It subscribes to `$NATS_PREFIX.bkapi.add`, expecting events to be a JSON object +containing the hash. + +JetStream is used to ensure no hashes are lost without having to reload the +entire tree on a connection error. diff --git a/bkapi/src/main.rs b/bkapi/src/main.rs index 9818453..17bbf68 100644 --- a/bkapi/src/main.rs +++ b/bkapi/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use actix_service::Service; use actix_web::{ @@ -73,16 +73,16 @@ struct Config { database_query: String, /// If provided, the Postgres notification topic to subscribe to. - #[clap(long, env)] + #[clap(long, env, required_unless_present = "nats_url")] database_subscribe: Option, - /// The NATS host. + /// NATS URLs. #[clap(long, env, requires = "nats_prefix")] - nats_host: Option, - /// The NATS NKEY. + nats_url: Option, + /// Path to NATS credential file. #[clap(long, env)] - nats_nkey: Option, - /// A prefix to use for NATS subjects. + nats_creds: Option, + /// Prefix to use for NATS subjects. #[clap(long, env)] nats_prefix: Option, @@ -122,14 +122,20 @@ async fn main() { let (sender, receiver) = futures::channel::oneshot::channel(); - let client = match (config.nats_host.as_deref(), config.nats_nkey.as_deref()) { + let client = match (config.nats_url.as_deref(), config.nats_creds.as_deref()) { (Some(host), None) => Some( async_nats::connect(host) .await .expect_or_log("could not connect to nats with no nkey"), ), - (Some(host), Some(nkey)) => Some( - async_nats::ConnectOptions::with_nkey(nkey.to_string()) + (Some(host), Some(creds_path)) => Some( + async_nats::ConnectOptions::with_credentials_file(creds_path.to_owned()) + .await + .expect_or_log("could not open credentials file") + .custom_inbox_prefix(format!( + "_INBOX_{}", + config.nats_prefix.as_deref().unwrap().replace('.', "_") + )) .connect(host) .await .expect_or_log("could not connect to nats with nkey"), @@ -139,26 +145,35 @@ async fn main() { let tree_clone = tree.clone(); let config_clone = config.clone(); + let token_clone = token.clone(); let mut listener_task = if let Some(subscription) = config.database_subscribe.clone() { tracing::info!("starting to listen for payloads from postgres"); - tokio::spawn(tree::listen_for_payloads_db( - pool, - subscription, - config.database_query.clone(), - tree_clone, - sender, - token.clone(), - )) + tokio::spawn(async move { + tree::listen_for_payloads_db( + pool, + subscription, + config_clone.database_query, + tree_clone, + sender, + token_clone, + ) + .await + .expect_or_log("could not listen for payloads") + }) } else if let Some(client) = client.clone() { tracing::info!("starting to listen for payloads from nats"); - tokio::spawn(tree::listen_for_payloads_nats( - config_clone, - pool, - client, - tree_clone, - sender, - token.clone(), - )) + tokio::spawn(async move { + tree::listen_for_payloads_nats( + config_clone, + pool, + client, + tree_clone, + sender, + token_clone, + ) + .await + .expect_or_log("could not listen for payloads") + }) } else { panic!("no listener source available"); }; @@ -311,7 +326,10 @@ async fn search_nats( let service = client .add_service(async_nats::service::Config { - name: "bkapi-search".to_string(), + name: format!( + "{}-bkapi", + config.nats_prefix.as_deref().unwrap().replace('.', "-") + ), version: env!("CARGO_PKG_VERSION").to_string(), description: None, stats_handler: None, diff --git a/bkapi/src/tree.rs b/bkapi/src/tree.rs index b3d1b8c..df1d8d1 100644 --- a/bkapi/src/tree.rs +++ b/bkapi/src/tree.rs @@ -263,6 +263,7 @@ pub(crate) async fn listen_for_payloads_nats( // we don't need to start the listener until after it's loaded. This // prevents issues with a slow client but still retains every hash. let mut seq = stream.cached_info().state.last_sequence; + tracing::debug!(seq, "found starting seq"); let create_consumer = |stream: async_nats::jetstream::stream::Stream, start_sequence: u64| async move { tracing::info!(start_sequence, "creating consumer");