Add NATS option for notifications and searches.

This commit is contained in:
Syfaro 2022-10-12 21:16:58 -04:00
parent c38d53cfea
commit 876a6bee55
5 changed files with 764 additions and 173 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target
.env

456
Cargo.lock generated
View File

@ -222,6 +222,40 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "async-nats"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5788cd6eb396656c504ad4b2b7fd384715a4c188c03490a9d68f75931a1b182"
dependencies = [
"base64",
"base64-url",
"bytes",
"futures",
"http",
"itertools",
"itoa",
"lazy_static",
"nkeys",
"nuid",
"once_cell",
"regex",
"ring",
"rustls-native-certs",
"rustls-pemfile 0.3.0",
"serde",
"serde_json",
"serde_nanos",
"serde_repr",
"subslice",
"time",
"tokio",
"tokio-rustls",
"tokio-util",
"tracing",
"url",
]
[[package]]
name = "async-trait"
version = "0.1.57"
@ -242,6 +276,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -254,6 +299,21 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "base64-url"
version = "1.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f"
dependencies = [
"base64",
]
[[package]]
name = "base64ct"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c"
[[package]]
name = "bitflags"
version = "1.3.2"
@ -277,8 +337,10 @@ dependencies = [
"actix-http",
"actix-service",
"actix-web",
"async-nats",
"bk-tree",
"envconfig",
"clap",
"dotenvy",
"futures",
"hamming",
"lazy_static",
@ -312,6 +374,15 @@ dependencies = [
"tracing-opentelemetry 0.17.4",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
@ -384,6 +455,49 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "4.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea54a38e4bce14ff6931c72e5b3c43da7051df056913d4e7e1fcdb1c03df69d"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"once_cell",
"strsim",
"termcolor",
]
[[package]]
name = "clap_derive"
version = "4.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "const-oid"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b"
[[package]]
name = "convert_case"
version = "0.4.0"
@ -489,6 +603,19 @@ dependencies = [
"typenum",
]
[[package]]
name = "curve25519-dalek"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61"
dependencies = [
"byteorder",
"digest 0.9.0",
"rand_core 0.5.1",
"subtle",
"zeroize",
]
[[package]]
name = "dashmap"
version = "5.4.0"
@ -502,6 +629,21 @@ dependencies = [
"parking_lot_core 0.9.3",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "der"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4"
dependencies = [
"const-oid",
]
[[package]]
name = "derive_more"
version = "0.99.17"
@ -515,13 +657,22 @@ dependencies = [
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "digest"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [
"block-buffer",
"block-buffer 0.10.3",
"crypto-common",
"subtle",
]
@ -555,6 +706,27 @@ dependencies = [
"dirs",
]
[[package]]
name = "ed25519"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9c280362032ea4203659fc489832d0204ef09f247a0506f170dafcac08c369"
dependencies = [
"signature",
]
[[package]]
name = "ed25519-dalek"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d"
dependencies = [
"curve25519-dalek",
"ed25519",
"sha2 0.9.9",
"zeroize",
]
[[package]]
name = "either"
version = "1.8.0"
@ -570,26 +742,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "envconfig"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea81cc7e21f55a9d9b1efb6816904978d0bfbe31a50347cb24b2e75564bcac9b"
dependencies = [
"envconfig_derive",
]
[[package]]
name = "envconfig_derive"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfca278e5f84b45519acaaff758ebfa01f18e96998bc24b8f1b722dd804b9bf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "event-listener"
version = "2.5.3"
@ -848,7 +1000,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
"digest 0.10.5",
]
[[package]]
@ -1066,7 +1218,7 @@ version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest",
"digest 0.10.5",
]
[[package]]
@ -1126,6 +1278,21 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nkeys"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594"
dependencies = [
"byteorder",
"data-encoding",
"ed25519-dalek",
"getrandom",
"log",
"rand",
"signatory",
]
[[package]]
name = "nom"
version = "7.1.1"
@ -1146,6 +1313,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nuid"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20c1bb65186718d348306bf1afdeb20d9ab45b2ab80fb793c0fdcf59ffbb4f38"
dependencies = [
"lazy_static",
"rand",
]
[[package]]
name = "num-traits"
version = "0.2.15"
@ -1180,6 +1357,12 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.42"
@ -1339,6 +1522,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "os_str_bytes"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "overload"
version = "0.1.1"
@ -1399,6 +1588,15 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1de2e551fb905ac83f73f7aedf2f0cb4a0da7e35efa24a202a936269f1f18e1"
[[package]]
name = "pem-rfc7468"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84e93a3b1cc0510b03020f33f21e62acdde3dcaef432edc95bea377fbd4c2cd4"
dependencies = [
"base64ct",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@ -1437,6 +1635,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee3ef9b64d26bad0536099c816c6734379e45bbd5f14798def6809e5cc350447"
dependencies = [
"der",
"pem-rfc7468",
"spki",
"zeroize",
]
[[package]]
name = "pkg-config"
version = "0.3.25"
@ -1449,6 +1659,30 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.46"
@ -1511,7 +1745,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_core 0.6.4",
]
[[package]]
@ -1521,9 +1755,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.6.4",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
[[package]]
name = "rand_core"
version = "0.6.4"
@ -1661,6 +1901,27 @@ dependencies = [
"webpki",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.1",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
dependencies = [
"base64",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
@ -1762,6 +2023,26 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_nanos"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e44969a61f5d316be20a42ff97816efb3b407a924d06824c3d8a49fa8450de0e"
dependencies = [
"serde",
]
[[package]]
name = "serde_repr"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fe39d9fbb0ebf5eb2c7cb7e2a47e4f462fad1379f1166b8ae49ad9eae89a7ca"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -1782,7 +2063,20 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.5",
]
[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer 0.9.0",
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]]
@ -1793,7 +2087,7 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.5",
]
[[package]]
@ -1814,6 +2108,24 @@ dependencies = [
"libc",
]
[[package]]
name = "signatory"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfecc059e81632eef1dd9b79e22fc28b8fe69b30d3357512a77a0ad8ee3c782"
dependencies = [
"pkcs8",
"rand_core 0.6.4",
"signature",
"zeroize",
]
[[package]]
name = "signature"
version = "1.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
[[package]]
name = "slab"
version = "0.4.7"
@ -1845,6 +2157,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spki"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c01a0c15da1b0b0e1494112e7af814a678fec9bd157881b49beac661e9b6f32"
dependencies = [
"der",
]
[[package]]
name = "sqlformat"
version = "0.2.0"
@ -1903,11 +2224,11 @@ dependencies = [
"percent-encoding",
"rand",
"rustls",
"rustls-pemfile",
"rustls-pemfile 1.0.1",
"serde",
"serde_json",
"sha1",
"sha2",
"sha2 0.10.6",
"smallvec",
"sqlformat",
"sqlx-rt",
@ -1931,7 +2252,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"sha2",
"sha2 0.10.6",
"sqlx-core",
"sqlx-rt",
"syn",
@ -1959,6 +2280,21 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "subslice"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae"
dependencies = [
"memchr",
]
[[package]]
name = "subtle"
version = "2.4.1"
@ -1976,6 +2312,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]]
name = "tempfile"
version = "3.3.0"
@ -1990,6 +2338,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.37"
@ -2050,6 +2407,7 @@ dependencies = [
"itoa",
"libc",
"num_threads",
"serde",
"time-macros",
]
@ -2316,6 +2674,12 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a"
[[package]]
name = "unicode-xid"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "unicode_categories"
version = "0.1.1"
@ -2504,6 +2868,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@ -2562,6 +2935,27 @@ dependencies = [
"winapi",
]
[[package]]
name = "zeroize"
version = "1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@ -5,7 +5,8 @@ authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018"
[dependencies]
envconfig = "0.10"
dotenvy = "0.15"
clap = { version = "4", features = ["derive", "env"] }
thiserror = "1"
tracing = "0.1"
@ -33,6 +34,8 @@ actix-http = "3"
actix-service = "2"
tracing-actix-web = { version = "0.6", features = ["opentelemetry_0_17"] }
async-nats = "0.21"
[dependencies.sqlx]
version = "0.6"
features = ["runtime-actix-rustls", "postgres"]

View File

@ -6,7 +6,8 @@ use actix_web::{
web::{self, Data},
App, HttpResponse, HttpServer, Responder,
};
use envconfig::Envconfig;
use clap::Parser;
use futures::StreamExt;
use opentelemetry::KeyValue;
use prometheus::{Encoder, TextEncoder};
use sqlx::postgres::PgPoolOptions;
@ -21,7 +22,6 @@ lazy_static::lazy_static! {
static ref HTTP_REQUEST_DURATION: prometheus::HistogramVec = prometheus::register_histogram_vec!("http_request_duration_seconds", "Duration of HTTP requests", &["http_route", "http_method", "http_status_code"]).unwrap();
static ref TREE_DURATION: prometheus::HistogramVec = prometheus::register_histogram_vec!("bkapi_tree_duration_seconds", "Duration of tree search time", &["distance"]).unwrap();
static ref TREE_ADD_DURATION: prometheus::Histogram = prometheus::register_histogram!("bkapi_tree_add_duration_seconds", "Duration to add new item to tree").unwrap();
}
#[derive(thiserror::Error, Debug)]
@ -32,32 +32,56 @@ enum Error {
Listener(sqlx::Error),
#[error("listener got data that could not be decoded: {0}")]
Data(serde_json::Error),
#[error("nats encountered error: {0}")]
Nats(#[from] async_nats::Error),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Envconfig, Clone)]
#[derive(Parser, Clone)]
struct Config {
#[envconfig(default = "0.0.0.0:3000")]
/// Host to listen for incoming HTTP requests.
#[clap(long, env, default_value = "127.0.0.1:3000")]
http_listen: String,
#[envconfig(default = "127.0.0.1:6831")]
/// Jaeger agent endpoint for span collection.
#[clap(long, env, default_value = "127.0.0.1:6831")]
jaeger_agent: String,
#[envconfig(default = "bkapi")]
/// Service name for spans.
#[clap(long, env, default_value = "bkapi")]
service_name: String,
/// Database URL for fetching data.
#[clap(long, env)]
database_url: String,
/// Query to perform to fetch initial values.
#[clap(long, env)]
database_query: String,
database_subscribe: String,
#[envconfig(default = "false")]
database_is_unique: bool,
max_distance: Option<u32>,
/// If provided, the Postgres notification topic to subscribe to.
#[clap(long, env)]
database_subscribe: Option<String>,
/// The NATS host.
#[clap(long, env)]
nats_host: Option<String>,
/// The NATS NKEY.
#[clap(long, env)]
nats_nkey: Option<String>,
/// Maximum distance permitted in queries.
#[clap(long, env, default_value = "10")]
max_distance: u32,
}
#[actix_web::main]
async fn main() {
let config = Config::init_from_env().expect("could not load config");
let _ = dotenvy::dotenv();
let config = Config::parse();
configure_tracing(&config);
tracing::info!("starting bkbase");
tracing::info!("starting bkapi");
let tree: tree::Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(tree::Hamming)));
@ -69,18 +93,46 @@ async fn main() {
.expect_or_log("could not connect to database");
tracing::debug!("connected to postgres");
let http_listen = config.http_listen.clone();
let (sender, receiver) = futures::channel::oneshot::channel();
tracing::info!("starting to listen for payloads");
let client = match (config.nats_host.as_deref(), config.nats_nkey.as_deref()) {
(Some(host), None) => Some(
async_nats::connect(host)
.await
.expect_or_log("could not connect to nats with no nkey"),
),
(Some(host), Some(nkey)) => Some(
async_nats::ConnectOptions::with_nkey(nkey.to_string())
.connect(host)
.await
.expect_or_log("could not connect to nats with nkey"),
),
_ => None,
};
let tree_clone = tree.clone();
let config_clone = config.clone();
if let Some(client) = client.clone() {
tracing::info!("starting to listen for payloads from nats");
tokio::task::spawn(async {
tree::listen_for_payloads(pool, config_clone, tree_clone, sender)
tree::listen_for_payloads_nats(config_clone, pool, client, tree_clone, sender)
.await
.expect_or_log("listenting for updates failed");
.unwrap_or_log();
});
} else if let Some(subscription) = config.database_subscribe.clone() {
tracing::info!("starting to listen for payloads from postgres");
let query = config.database_query.clone();
tokio::task::spawn(async move {
tree::listen_for_payloads_db(pool, subscription, query, tree_clone, sender)
.await
.unwrap_or_log();
});
} else {
panic!("no listener source available");
};
tracing::info!("waiting for initial tree to load");
receiver
@ -88,45 +140,17 @@ async fn main() {
.expect_or_log("tree loading was dropped before completing");
tracing::info!("initial tree loaded, starting server");
let tree = Data::new(tree);
let config = Data::new(config);
HttpServer::new(move || {
App::new()
.wrap(tracing_actix_web::TracingLogger::default())
.wrap_fn(|req, srv| {
let path = req.path().to_owned();
let method = req.method().to_string();
let start = std::time::Instant::now();
let fut = srv.call(req);
async move {
let res = fut.await?;
let end = std::time::Instant::now().duration_since(start);
let status_code = res.status().as_u16().to_string();
let labels: Vec<&str> = vec![&path, &method, &status_code];
HTTP_REQUEST_COUNT.with_label_values(&labels).inc();
HTTP_REQUEST_DURATION
.with_label_values(&labels)
.observe(end.as_secs_f64());
Ok(res)
}
})
.app_data(tree.clone())
.app_data(config.clone())
.service(search)
.service(health)
.service(metrics)
})
.bind(&http_listen)
.expect_or_log("bind failed")
.run()
if let Some(client) = client {
let tree_clone = tree.clone();
let config_clone = config.clone();
tokio::task::spawn(async move {
search_nats(client, tree_clone, config_clone)
.await
.expect_or_log("server failed");
.unwrap_or_log();
});
}
start_server(config, tree).await.unwrap_or_log();
}
fn configure_tracing(config: &Config) {
@ -163,6 +187,64 @@ fn configure_tracing(config: &Config) {
.expect("tracing could not be configured");
}
async fn start_server(config: Config, tree: tree::Tree) -> Result<(), Error> {
let tree = Data::new(tree);
let config_data = Data::new(config.clone());
HttpServer::new(move || {
App::new()
.wrap(tracing_actix_web::TracingLogger::default())
.wrap_fn(|req, srv| {
let path = req.path().to_owned();
let method = req.method().to_string();
let start = std::time::Instant::now();
let fut = srv.call(req);
async move {
let res = fut.await?;
let end = std::time::Instant::now().duration_since(start);
let status_code = res.status().as_u16().to_string();
let labels: Vec<&str> = vec![&path, &method, &status_code];
HTTP_REQUEST_COUNT.with_label_values(&labels).inc();
HTTP_REQUEST_DURATION
.with_label_values(&labels)
.observe(end.as_secs_f64());
Ok(res)
}
})
.app_data(tree.clone())
.app_data(config_data.clone())
.service(search)
.service(health)
.service(metrics)
})
.bind(&config.http_listen)
.expect_or_log("bind failed")
.run()
.await
.map_err(Error::Io)
}
#[get("/health")]
async fn health() -> impl Responder {
"OK"
}
#[get("/metrics")]
async fn metrics() -> Result<HttpResponse, std::convert::Infallible> {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(HttpResponse::Ok().body(buffer))
}
#[derive(Debug, serde::Deserialize)]
struct Query {
hash: i64,
@ -189,22 +271,91 @@ async fn search(
query: web::Query<Query>,
tree: Data<tree::Tree>,
config: Data<Config>,
) -> Result<HttpResponse, std::convert::Infallible> {
) -> HttpResponse {
let Query { hash, distance } = query.0;
let max_distance = config.max_distance;
tracing::info!("searching for hash {} with distance {}", hash, distance);
if matches!(max_distance, Some(max_distance) if distance > max_distance) {
return Ok(HttpResponse::BadRequest().body("distance is greater than max distance"));
}
let distance = distance.clamp(0, config.max_distance);
let tree = tree.read().await;
let hashes = search_tree(&tree, hash, distance);
drop(tree);
let resp = SearchResponse {
hash,
distance,
hashes,
};
HttpResponse::Ok().json(resp)
}
#[derive(serde::Deserialize)]
struct SearchPayload {
hash: i64,
distance: u32,
}
async fn search_nats(
client: async_nats::Client,
tree: tree::Tree,
config: Config,
) -> Result<(), Error> {
tracing::info!("subscribing to searches");
let mut sub = client
.queue_subscribe("bkapi.search".to_string(), "bkapi-search".to_string())
.await?;
while let Some(message) = sub.next().await {
tracing::trace!("got search message");
let reply = match message.reply {
Some(reply) => reply,
None => {
tracing::warn!("message had no reply subject");
continue;
}
};
let payloads: Vec<SearchPayload> =
serde_json::from_slice(&message.payload).map_err(Error::Data)?;
let tree = tree.clone();
let config = config.clone();
let client = client.clone();
tokio::task::spawn(async move {
let tree = tree.read().await;
let results: Vec<_> = payloads
.into_iter()
.map(|payload| (payload.hash, payload.distance.clamp(0, config.max_distance)))
.map(|(hash, distance)| search_tree(&tree, hash, distance))
.collect();
drop(tree);
client
.publish(reply, serde_json::to_vec(&results).unwrap_or_log().into())
.await
.unwrap_or_log();
});
}
Ok(())
}
#[tracing::instrument(skip(tree))]
fn search_tree(
tree: &bk_tree::BKTree<tree::Node, tree::Hamming>,
hash: i64,
distance: u32,
) -> Vec<HashDistance> {
tracing::debug!("searching tree");
let duration = TREE_DURATION
.with_label_values(&[&distance.to_string()])
.start_timer();
let matches: Vec<HashDistance> = tree
let results: Vec<_> = tree
.find(&hash.into(), distance)
.into_iter()
.map(|item| HashDistance {
@ -214,29 +365,6 @@ async fn search(
.collect();
let time = duration.stop_and_record();
tracing::debug!("found {} items in {} seconds", matches.len(), time);
let resp = SearchResponse {
hash,
distance,
hashes: matches,
};
Ok(HttpResponse::Ok().json(resp))
}
#[get("/health")]
async fn health() -> impl Responder {
"OK"
}
#[get("/metrics")]
async fn metrics() -> Result<HttpResponse, std::convert::Infallible> {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(HttpResponse::Ok().body(buffer))
tracing::info!(time, results = results.len(), "found results");
results
}

View File

@ -1,11 +1,16 @@
use std::sync::Arc;
use futures::TryStreamExt;
use sqlx::{postgres::PgListener, Pool, Postgres, Row};
use tokio::sync::RwLock;
use tracing_unwrap::ResultExt;
use crate::{Config, Error};
lazy_static::lazy_static! {
static ref TREE_ADD_DURATION: prometheus::Histogram = prometheus::register_histogram!("bkapi_tree_add_duration_seconds", "Duration to add new item to tree").unwrap();
}
pub(crate) type Tree = Arc<RwLock<bk_tree::BKTree<Node, Hamming>>>;
/// A hamming distance metric.
@ -44,13 +49,11 @@ impl From<Node> for i64 {
/// be lost.
async fn create_tree(
conn: &Pool<Postgres>,
config: &Config,
query: &str,
) -> Result<bk_tree::BKTree<Node, Hamming>, Error> {
use futures::TryStreamExt;
tracing::warn!("creating new tree");
let mut tree = bk_tree::BKTree::new(Hamming);
let mut rows = sqlx::query(&config.database_query).fetch(conn);
let mut rows = sqlx::query(query).fetch(conn);
let mut count = 0;
@ -59,13 +62,9 @@ async fn create_tree(
while let Some(row) = rows.try_next().await.map_err(Error::LoadingRow)? {
let node: Node = row.get::<i64, _>(0).into();
// Avoid checking if each value is unique if we were told that the
// database query only returns unique values.
let timer = crate::TREE_ADD_DURATION.start_timer();
if config.database_is_unique || tree.find_exact(&node).is_none() {
if tree.find_exact(&node).is_none() {
tree.add(node);
}
timer.stop_and_record();
count += 1;
if count % 250_000 == 0 {
@ -88,56 +87,122 @@ struct Payload {
///
/// This will create a new tree to ensure all items are present. It will also
/// automatically recreate trees as needed if the database connection is lost.
pub(crate) async fn listen_for_payloads(
pub(crate) async fn listen_for_payloads_db(
conn: Pool<Postgres>,
config: Config,
subscription: String,
query: String,
tree: Tree,
initial: futures::channel::oneshot::Sender<()>,
) -> Result<(), Error> {
let mut initial = Some(initial);
loop {
let mut listener = PgListener::connect_with(&conn)
.await
.map_err(Error::Listener)?;
listener
.listen(&config.database_subscribe)
.listen(&subscription)
.await
.map_err(Error::Listener)?;
let new_tree = create_tree(&conn, &config).await?;
let new_tree = create_tree(&conn, &query).await?;
{
let mut tree = tree.write().await;
*tree = new_tree;
}
if let Some(initial) = initial.take() {
initial
.send(())
.expect_or_log("nothing listening for initial data");
}
while let Some(notification) = listener.try_recv().await.map_err(Error::Listener)? {
tracing::trace!("got postgres payload");
process_payload(&tree, notification.payload().as_bytes()).await?;
}
tracing::error!("disconnected from postgres listener, recreating tree");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
pub(crate) async fn listen_for_payloads_nats(
config: Config,
pool: sqlx::PgPool,
client: async_nats::Client,
tree: Tree,
initial: futures::channel::oneshot::Sender<()>,
) -> Result<(), Error> {
static STREAM_NAME: &str = "bkapi-hashes";
let jetstream = async_nats::jetstream::new(client);
let mut initial = Some(initial);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: STREAM_NAME.to_string(),
subjects: vec!["bkapi.add".to_string()],
max_age: std::time::Duration::from_secs(60 * 60 * 24),
retention: async_nats::jetstream::stream::RetentionPolicy::Interest,
..Default::default()
})
.await?;
loop {
while let Some(notification) = listener.try_recv().await.map_err(Error::Listener)? {
let payload: Payload =
serde_json::from_str(notification.payload()).map_err(Error::Data)?;
tracing::debug!(hash = payload.hash, "evaluating new payload");
let consumer = stream
.get_or_create_consumer(
"bkapi-consumer",
async_nats::jetstream::consumer::pull::Config {
..Default::default()
},
)
.await?;
let node: Node = payload.hash.into();
let _timer = crate::TREE_ADD_DURATION.start_timer();
let mut tree = tree.write().await;
if tree.find_exact(&node).is_some() {
tracing::trace!("hash already existed in tree");
continue;
}
tracing::trace!("hash did not exist, adding to tree");
tree.add(node);
}
tracing::error!("disconnected from listener, recreating tree");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
let new_tree = create_tree(&conn, &config).await?;
let new_tree = create_tree(&pool, &config.database_query).await?;
{
let mut tree = tree.write().await;
*tree = new_tree;
}
if let Some(initial) = initial.take() {
initial
.send(())
.expect_or_log("nothing listening for initial data");
}
let mut messages = consumer.messages().await?;
while let Ok(Some(message)) = messages.try_next().await {
tracing::trace!("got nats payload");
message.ack().await?;
process_payload(&tree, &message.payload).await?;
}
tracing::error!("disconnected from nats listener, recreating tree");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
async fn process_payload(tree: &Tree, payload: &[u8]) -> Result<(), Error> {
let payload: Payload = serde_json::from_slice(payload).map_err(Error::Data)?;
tracing::trace!("got hash: {}", payload.hash);
let node: Node = payload.hash.into();
let _timer = TREE_ADD_DURATION.start_timer();
let is_new_hash = {
let tree = tree.read().await;
tree.find_exact(&node).is_none()
};
if is_new_hash {
let mut tree = tree.write().await;
tree.add(node);
}
tracing::debug!(hash = payload.hash, is_new_hash, "processed incoming hash");
Ok(())
}