Better docs and other NATS fixes.

This commit is contained in:
Syfaro 2023-08-15 15:01:35 -04:00
parent 01c13f2244
commit dc1bebfd8d
3 changed files with 104 additions and 31 deletions

View File

@ -2,9 +2,63 @@
A fast way to look up hamming distance hashes.
It operates by connecting to a PostgreSQL database (`DATABASE_URL`), selecting
every column of a provided query (`DATABASE_QUERY`), subscribing to events
(`DATABASE_SUBSCRIBE`), and holding everything in a BK tree.
## Querying
There are two interfaces for querying, both of which can be used simultaneously.
### HTTP
It provides a single API endpoint, `/search` which takes in a `hash` and
`distance` query parameter to search for matches.
`distance` query parameter to search for matches. The response looks like this:
```jsonc
{
"hash": 8525958787074004077, // Searched hash
"distance": 3, // Maximum search distance
"hashes": [ // Results
{"hash": 8525958787073873005, "distance": 1}
]
}
```
### NATS
It listens on `$NATS_PREFIX.bkapi.search` for a payload like:
```json
[
{"hash": 12345, "distance": 3}
]
```
Each input will have a corresponding entry in the response:
```jsonc
[
[ // Results for first input
{"hash": 8525958787073873005, "distance": 1}
]
]
```
## Initial Load
The initial entries are populated through the databaes query `$DATABASE_QUERY`.
## Listening
It can be configured to listen to PostgreSQL notifications OR with NATS
messages.
### PostgreSQL
It subscribes to `$DATABASE_SUBSCRIBE`, expecting events to be a JSON object
containing the hash.
### NATS
It subscribes to `$NATS_PREFIX.bkapi.add`, expecting events to be a JSON object
containing the hash.
JetStream is used to ensure no hashes are lost without having to reload the
entire tree on a connection error.

View File

@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use actix_service::Service;
use actix_web::{
@ -73,16 +73,16 @@ struct Config {
database_query: String,
/// If provided, the Postgres notification topic to subscribe to.
#[clap(long, env)]
#[clap(long, env, required_unless_present = "nats_url")]
database_subscribe: Option<String>,
/// The NATS host.
/// NATS URLs.
#[clap(long, env, requires = "nats_prefix")]
nats_host: Option<String>,
/// The NATS NKEY.
nats_url: Option<String>,
/// Path to NATS credential file.
#[clap(long, env)]
nats_nkey: Option<String>,
/// A prefix to use for NATS subjects.
nats_creds: Option<PathBuf>,
/// Prefix to use for NATS subjects.
#[clap(long, env)]
nats_prefix: Option<String>,
@ -122,14 +122,20 @@ async fn main() {
let (sender, receiver) = futures::channel::oneshot::channel();
let client = match (config.nats_host.as_deref(), config.nats_nkey.as_deref()) {
let client = match (config.nats_url.as_deref(), config.nats_creds.as_deref()) {
(Some(host), None) => Some(
async_nats::connect(host)
.await
.expect_or_log("could not connect to nats with no nkey"),
),
(Some(host), Some(nkey)) => Some(
async_nats::ConnectOptions::with_nkey(nkey.to_string())
(Some(host), Some(creds_path)) => Some(
async_nats::ConnectOptions::with_credentials_file(creds_path.to_owned())
.await
.expect_or_log("could not open credentials file")
.custom_inbox_prefix(format!(
"_INBOX_{}",
config.nats_prefix.as_deref().unwrap().replace('.', "_")
))
.connect(host)
.await
.expect_or_log("could not connect to nats with nkey"),
@ -139,26 +145,35 @@ async fn main() {
let tree_clone = tree.clone();
let config_clone = config.clone();
let token_clone = token.clone();
let mut listener_task = if let Some(subscription) = config.database_subscribe.clone() {
tracing::info!("starting to listen for payloads from postgres");
tokio::spawn(tree::listen_for_payloads_db(
tokio::spawn(async move {
tree::listen_for_payloads_db(
pool,
subscription,
config.database_query.clone(),
config_clone.database_query,
tree_clone,
sender,
token.clone(),
))
token_clone,
)
.await
.expect_or_log("could not listen for payloads")
})
} else if let Some(client) = client.clone() {
tracing::info!("starting to listen for payloads from nats");
tokio::spawn(tree::listen_for_payloads_nats(
tokio::spawn(async move {
tree::listen_for_payloads_nats(
config_clone,
pool,
client,
tree_clone,
sender,
token.clone(),
))
token_clone,
)
.await
.expect_or_log("could not listen for payloads")
})
} else {
panic!("no listener source available");
};
@ -311,7 +326,10 @@ async fn search_nats(
let service = client
.add_service(async_nats::service::Config {
name: "bkapi-search".to_string(),
name: format!(
"{}-bkapi",
config.nats_prefix.as_deref().unwrap().replace('.', "-")
),
version: env!("CARGO_PKG_VERSION").to_string(),
description: None,
stats_handler: None,

View File

@ -263,6 +263,7 @@ pub(crate) async fn listen_for_payloads_nats(
// we don't need to start the listener until after it's loaded. This
// prevents issues with a slow client but still retains every hash.
let mut seq = stream.cached_info().state.last_sequence;
tracing::debug!(seq, "found starting seq");
let create_consumer = |stream: async_nats::jetstream::stream::Stream, start_sequence: u64| async move {
tracing::info!(start_sequence, "creating consumer");