diff --git a/.gitignore b/.gitignore index ea8c4bf..fedaa2b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +.env diff --git a/Cargo.lock b/Cargo.lock index af9795b..e142ae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -222,6 +222,40 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "async-nats" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5788cd6eb396656c504ad4b2b7fd384715a4c188c03490a9d68f75931a1b182" +dependencies = [ + "base64", + "base64-url", + "bytes", + "futures", + "http", + "itertools", + "itoa", + "lazy_static", + "nkeys", + "nuid", + "once_cell", + "regex", + "ring", + "rustls-native-certs", + "rustls-pemfile 0.3.0", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "subslice", + "time", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", + "url", +] + [[package]] name = "async-trait" version = "0.1.57" @@ -242,6 +276,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -254,6 +299,21 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64-url" +version = "1.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" +dependencies = [ + "base64", +] + +[[package]] +name = "base64ct" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" + [[package]] name = "bitflags" version = "1.3.2" @@ -277,8 +337,10 @@ dependencies = [ "actix-http", "actix-service", "actix-web", + "async-nats", "bk-tree", - "envconfig", + "clap", + "dotenvy", "futures", "hamming", "lazy_static", @@ -312,6 +374,15 @@ dependencies = [ "tracing-opentelemetry 0.17.4", ] +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.3" @@ -384,6 +455,49 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea54a38e4bce14ff6931c72e5b3c43da7051df056913d4e7e1fcdb1c03df69d" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "clap_lex", + "once_cell", + "strsim", + "termcolor", +] + +[[package]] +name = "clap_derive" +version = "4.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" +dependencies = [ + "os_str_bytes", +] + +[[package]] +name = "const-oid" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" + [[package]] name = "convert_case" version = "0.4.0" @@ -489,6 +603,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" +dependencies = [ + "byteorder", + "digest 0.9.0", + "rand_core 0.5.1", + "subtle", + "zeroize", +] + [[package]] name = "dashmap" version = "5.4.0" @@ -502,6 +629,21 @@ dependencies = [ "parking_lot_core 0.9.3", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + +[[package]] +name = "der" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4" +dependencies = [ + "const-oid", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -515,13 +657,22 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" dependencies = [ - "block-buffer", + "block-buffer 0.10.3", "crypto-common", "subtle", ] @@ -555,6 +706,27 @@ dependencies = [ "dirs", ] +[[package]] +name = "ed25519" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9c280362032ea4203659fc489832d0204ef09f247a0506f170dafcac08c369" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2 0.9.9", + "zeroize", +] + [[package]] name = "either" version = "1.8.0" @@ -570,26 +742,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "envconfig" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea81cc7e21f55a9d9b1efb6816904978d0bfbe31a50347cb24b2e75564bcac9b" -dependencies = [ - "envconfig_derive", -] - -[[package]] -name = "envconfig_derive" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfca278e5f84b45519acaaff758ebfa01f18e96998bc24b8f1b722dd804b9bf" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "event-listener" version = "2.5.3" @@ -848,7 +1000,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.5", ] [[package]] @@ -1066,7 +1218,7 @@ version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "digest", + "digest 0.10.5", ] [[package]] @@ -1126,6 +1278,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nkeys" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" +dependencies = [ + "byteorder", + "data-encoding", + "ed25519-dalek", + "getrandom", + "log", + "rand", + "signatory", +] + [[package]] name = "nom" version = "7.1.1" @@ -1146,6 +1313,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nuid" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20c1bb65186718d348306bf1afdeb20d9ab45b2ab80fb793c0fdcf59ffbb4f38" +dependencies = [ + "lazy_static", + "rand", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -1180,6 +1357,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "openssl" version = "0.10.42" @@ -1339,6 +1522,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "os_str_bytes" +version = "6.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" + [[package]] name = "overload" version = "0.1.1" @@ -1399,6 +1588,15 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1de2e551fb905ac83f73f7aedf2f0cb4a0da7e35efa24a202a936269f1f18e1" +[[package]] +name = "pem-rfc7468" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e93a3b1cc0510b03020f33f21e62acdde3dcaef432edc95bea377fbd4c2cd4" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.2.0" @@ -1437,6 +1635,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee3ef9b64d26bad0536099c816c6734379e45bbd5f14798def6809e5cc350447" +dependencies = [ + "der", + "pem-rfc7468", + "spki", + "zeroize", +] + [[package]] name = "pkg-config" version = "0.3.25" @@ -1449,6 +1659,30 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.46" @@ -1511,7 +1745,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -1521,9 +1755,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", ] +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" + [[package]] name = "rand_core" version = "0.6.4" @@ -1661,6 +1901,27 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.1", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" +dependencies = [ + "base64", +] + [[package]] name = "rustls-pemfile" version = "1.0.1" @@ -1762,6 +2023,26 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_nanos" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e44969a61f5d316be20a42ff97816efb3b407a924d06824c3d8a49fa8450de0e" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fe39d9fbb0ebf5eb2c7cb7e2a47e4f462fad1379f1166b8ae49ad9eae89a7ca" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1782,7 +2063,20 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.5", +] + +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", ] [[package]] @@ -1793,7 +2087,7 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.5", ] [[package]] @@ -1814,6 +2108,24 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfecc059e81632eef1dd9b79e22fc28b8fe69b30d3357512a77a0ad8ee3c782" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" + [[package]] name = "slab" version = "0.4.7" @@ -1845,6 +2157,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spki" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c01a0c15da1b0b0e1494112e7af814a678fec9bd157881b49beac661e9b6f32" +dependencies = [ + "der", +] + [[package]] name = "sqlformat" version = "0.2.0" @@ -1903,11 +2224,11 @@ dependencies = [ "percent-encoding", "rand", "rustls", - "rustls-pemfile", + "rustls-pemfile 1.0.1", "serde", "serde_json", "sha1", - "sha2", + "sha2 0.10.6", "smallvec", "sqlformat", "sqlx-rt", @@ -1931,7 +2252,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "sha2", + "sha2 0.10.6", "sqlx-core", "sqlx-rt", "syn", @@ -1959,6 +2280,21 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] +name = "subslice" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae" +dependencies = [ + "memchr", +] + [[package]] name = "subtle" version = "2.4.1" @@ -1976,6 +2312,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "tempfile" version = "3.3.0" @@ -1990,6 +2338,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.37" @@ -2050,6 +2407,7 @@ dependencies = [ "itoa", "libc", "num_threads", + "serde", "time-macros", ] @@ -2316,6 +2674,12 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" +[[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + [[package]] name = "unicode_categories" version = "0.1.1" @@ -2504,6 +2868,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -2562,6 +2935,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "zeroize" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/bkapi/Cargo.toml b/bkapi/Cargo.toml index 2ebe8bd..7368140 100644 --- a/bkapi/Cargo.toml +++ b/bkapi/Cargo.toml @@ -5,7 +5,8 @@ authors = ["Syfaro "] edition = "2018" [dependencies] -envconfig = "0.10" +dotenvy = "0.15" +clap = { version = "4", features = ["derive", "env"] } thiserror = "1" tracing = "0.1" @@ -33,6 +34,8 @@ actix-http = "3" actix-service = "2" tracing-actix-web = { version = "0.6", features = ["opentelemetry_0_17"] } +async-nats = "0.21" + [dependencies.sqlx] version = "0.6" features = ["runtime-actix-rustls", "postgres"] diff --git a/bkapi/src/main.rs b/bkapi/src/main.rs index 835b55b..789a8b2 100644 --- a/bkapi/src/main.rs +++ b/bkapi/src/main.rs @@ -6,7 +6,8 @@ use actix_web::{ web::{self, Data}, App, HttpResponse, HttpServer, Responder, }; -use envconfig::Envconfig; +use clap::Parser; +use futures::StreamExt; use opentelemetry::KeyValue; use prometheus::{Encoder, TextEncoder}; use sqlx::postgres::PgPoolOptions; @@ -21,7 +22,6 @@ lazy_static::lazy_static! { static ref HTTP_REQUEST_DURATION: prometheus::HistogramVec = prometheus::register_histogram_vec!("http_request_duration_seconds", "Duration of HTTP requests", &["http_route", "http_method", "http_status_code"]).unwrap(); static ref TREE_DURATION: prometheus::HistogramVec = prometheus::register_histogram_vec!("bkapi_tree_duration_seconds", "Duration of tree search time", &["distance"]).unwrap(); - static ref TREE_ADD_DURATION: prometheus::Histogram = prometheus::register_histogram!("bkapi_tree_add_duration_seconds", "Duration to add new item to tree").unwrap(); } #[derive(thiserror::Error, Debug)] @@ -32,32 +32,56 @@ enum Error { Listener(sqlx::Error), #[error("listener got data that could not be decoded: {0}")] Data(serde_json::Error), + #[error("nats encountered error: {0}")] + Nats(#[from] async_nats::Error), + #[error("io error: {0}")] + Io(#[from] std::io::Error), } -#[derive(Envconfig, Clone)] +#[derive(Parser, Clone)] struct Config { - #[envconfig(default = "0.0.0.0:3000")] + /// Host to listen for incoming HTTP requests. + #[clap(long, env, default_value = "127.0.0.1:3000")] http_listen: String, - #[envconfig(default = "127.0.0.1:6831")] + + /// Jaeger agent endpoint for span collection. + #[clap(long, env, default_value = "127.0.0.1:6831")] jaeger_agent: String, - #[envconfig(default = "bkapi")] + /// Service name for spans. + #[clap(long, env, default_value = "bkapi")] service_name: String, + /// Database URL for fetching data. + #[clap(long, env)] database_url: String, + /// Query to perform to fetch initial values. + #[clap(long, env)] database_query: String, - database_subscribe: String, - #[envconfig(default = "false")] - database_is_unique: bool, - max_distance: Option, + /// If provided, the Postgres notification topic to subscribe to. + #[clap(long, env)] + database_subscribe: Option, + + /// The NATS host. + #[clap(long, env)] + nats_host: Option, + /// The NATS NKEY. + #[clap(long, env)] + nats_nkey: Option, + + /// Maximum distance permitted in queries. + #[clap(long, env, default_value = "10")] + max_distance: u32, } #[actix_web::main] async fn main() { - let config = Config::init_from_env().expect("could not load config"); + let _ = dotenvy::dotenv(); + + let config = Config::parse(); configure_tracing(&config); - tracing::info!("starting bkbase"); + tracing::info!("starting bkapi"); let tree: tree::Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(tree::Hamming))); @@ -69,18 +93,46 @@ async fn main() { .expect_or_log("could not connect to database"); tracing::debug!("connected to postgres"); - let http_listen = config.http_listen.clone(); - let (sender, receiver) = futures::channel::oneshot::channel(); - tracing::info!("starting to listen for payloads"); + let client = match (config.nats_host.as_deref(), config.nats_nkey.as_deref()) { + (Some(host), None) => Some( + async_nats::connect(host) + .await + .expect_or_log("could not connect to nats with no nkey"), + ), + (Some(host), Some(nkey)) => Some( + async_nats::ConnectOptions::with_nkey(nkey.to_string()) + .connect(host) + .await + .expect_or_log("could not connect to nats with nkey"), + ), + _ => None, + }; + let tree_clone = tree.clone(); let config_clone = config.clone(); - tokio::task::spawn(async { - tree::listen_for_payloads(pool, config_clone, tree_clone, sender) - .await - .expect_or_log("listenting for updates failed"); - }); + if let Some(client) = client.clone() { + tracing::info!("starting to listen for payloads from nats"); + + tokio::task::spawn(async { + tree::listen_for_payloads_nats(config_clone, pool, client, tree_clone, sender) + .await + .unwrap_or_log(); + }); + } else if let Some(subscription) = config.database_subscribe.clone() { + tracing::info!("starting to listen for payloads from postgres"); + + let query = config.database_query.clone(); + + tokio::task::spawn(async move { + tree::listen_for_payloads_db(pool, subscription, query, tree_clone, sender) + .await + .unwrap_or_log(); + }); + } else { + panic!("no listener source available"); + }; tracing::info!("waiting for initial tree to load"); receiver @@ -88,45 +140,17 @@ async fn main() { .expect_or_log("tree loading was dropped before completing"); tracing::info!("initial tree loaded, starting server"); - let tree = Data::new(tree); - let config = Data::new(config); + if let Some(client) = client { + let tree_clone = tree.clone(); + let config_clone = config.clone(); + tokio::task::spawn(async move { + search_nats(client, tree_clone, config_clone) + .await + .unwrap_or_log(); + }); + } - HttpServer::new(move || { - App::new() - .wrap(tracing_actix_web::TracingLogger::default()) - .wrap_fn(|req, srv| { - let path = req.path().to_owned(); - let method = req.method().to_string(); - - let start = std::time::Instant::now(); - let fut = srv.call(req); - - async move { - let res = fut.await?; - let end = std::time::Instant::now().duration_since(start); - - let status_code = res.status().as_u16().to_string(); - - let labels: Vec<&str> = vec![&path, &method, &status_code]; - HTTP_REQUEST_COUNT.with_label_values(&labels).inc(); - HTTP_REQUEST_DURATION - .with_label_values(&labels) - .observe(end.as_secs_f64()); - - Ok(res) - } - }) - .app_data(tree.clone()) - .app_data(config.clone()) - .service(search) - .service(health) - .service(metrics) - }) - .bind(&http_listen) - .expect_or_log("bind failed") - .run() - .await - .expect_or_log("server failed"); + start_server(config, tree).await.unwrap_or_log(); } fn configure_tracing(config: &Config) { @@ -163,6 +187,64 @@ fn configure_tracing(config: &Config) { .expect("tracing could not be configured"); } +async fn start_server(config: Config, tree: tree::Tree) -> Result<(), Error> { + let tree = Data::new(tree); + let config_data = Data::new(config.clone()); + + HttpServer::new(move || { + App::new() + .wrap(tracing_actix_web::TracingLogger::default()) + .wrap_fn(|req, srv| { + let path = req.path().to_owned(); + let method = req.method().to_string(); + + let start = std::time::Instant::now(); + let fut = srv.call(req); + + async move { + let res = fut.await?; + let end = std::time::Instant::now().duration_since(start); + + let status_code = res.status().as_u16().to_string(); + + let labels: Vec<&str> = vec![&path, &method, &status_code]; + HTTP_REQUEST_COUNT.with_label_values(&labels).inc(); + HTTP_REQUEST_DURATION + .with_label_values(&labels) + .observe(end.as_secs_f64()); + + Ok(res) + } + }) + .app_data(tree.clone()) + .app_data(config_data.clone()) + .service(search) + .service(health) + .service(metrics) + }) + .bind(&config.http_listen) + .expect_or_log("bind failed") + .run() + .await + .map_err(Error::Io) +} + +#[get("/health")] +async fn health() -> impl Responder { + "OK" +} + +#[get("/metrics")] +async fn metrics() -> Result { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + + let metric_families = prometheus::gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Ok(HttpResponse::Ok().body(buffer)) +} + #[derive(Debug, serde::Deserialize)] struct Query { hash: i64, @@ -189,22 +271,91 @@ async fn search( query: web::Query, tree: Data, config: Data, -) -> Result { +) -> HttpResponse { let Query { hash, distance } = query.0; - let max_distance = config.max_distance; - - tracing::info!("searching for hash {} with distance {}", hash, distance); - - if matches!(max_distance, Some(max_distance) if distance > max_distance) { - return Ok(HttpResponse::BadRequest().body("distance is greater than max distance")); - } + let distance = distance.clamp(0, config.max_distance); let tree = tree.read().await; + let hashes = search_tree(&tree, hash, distance); + drop(tree); + + let resp = SearchResponse { + hash, + distance, + hashes, + }; + + HttpResponse::Ok().json(resp) +} + +#[derive(serde::Deserialize)] +struct SearchPayload { + hash: i64, + distance: u32, +} + +async fn search_nats( + client: async_nats::Client, + tree: tree::Tree, + config: Config, +) -> Result<(), Error> { + tracing::info!("subscribing to searches"); + + let mut sub = client + .queue_subscribe("bkapi.search".to_string(), "bkapi-search".to_string()) + .await?; + + while let Some(message) = sub.next().await { + tracing::trace!("got search message"); + + let reply = match message.reply { + Some(reply) => reply, + None => { + tracing::warn!("message had no reply subject"); + continue; + } + }; + + let payloads: Vec = + serde_json::from_slice(&message.payload).map_err(Error::Data)?; + + let tree = tree.clone(); + let config = config.clone(); + let client = client.clone(); + + tokio::task::spawn(async move { + let tree = tree.read().await; + + let results: Vec<_> = payloads + .into_iter() + .map(|payload| (payload.hash, payload.distance.clamp(0, config.max_distance))) + .map(|(hash, distance)| search_tree(&tree, hash, distance)) + .collect(); + + drop(tree); + + client + .publish(reply, serde_json::to_vec(&results).unwrap_or_log().into()) + .await + .unwrap_or_log(); + }); + } + + Ok(()) +} + +#[tracing::instrument(skip(tree))] +fn search_tree( + tree: &bk_tree::BKTree, + hash: i64, + distance: u32, +) -> Vec { + tracing::debug!("searching tree"); let duration = TREE_DURATION .with_label_values(&[&distance.to_string()]) .start_timer(); - let matches: Vec = tree + let results: Vec<_> = tree .find(&hash.into(), distance) .into_iter() .map(|item| HashDistance { @@ -214,29 +365,6 @@ async fn search( .collect(); let time = duration.stop_and_record(); - tracing::debug!("found {} items in {} seconds", matches.len(), time); - - let resp = SearchResponse { - hash, - distance, - hashes: matches, - }; - - Ok(HttpResponse::Ok().json(resp)) -} - -#[get("/health")] -async fn health() -> impl Responder { - "OK" -} - -#[get("/metrics")] -async fn metrics() -> Result { - let mut buffer = Vec::new(); - let encoder = TextEncoder::new(); - - let metric_families = prometheus::gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - - Ok(HttpResponse::Ok().body(buffer)) + tracing::info!(time, results = results.len(), "found results"); + results } diff --git a/bkapi/src/tree.rs b/bkapi/src/tree.rs index 530536a..1e7ecb8 100644 --- a/bkapi/src/tree.rs +++ b/bkapi/src/tree.rs @@ -1,11 +1,16 @@ use std::sync::Arc; +use futures::TryStreamExt; use sqlx::{postgres::PgListener, Pool, Postgres, Row}; use tokio::sync::RwLock; use tracing_unwrap::ResultExt; use crate::{Config, Error}; +lazy_static::lazy_static! { + static ref TREE_ADD_DURATION: prometheus::Histogram = prometheus::register_histogram!("bkapi_tree_add_duration_seconds", "Duration to add new item to tree").unwrap(); +} + pub(crate) type Tree = Arc>>; /// A hamming distance metric. @@ -44,13 +49,11 @@ impl From for i64 { /// be lost. async fn create_tree( conn: &Pool, - config: &Config, + query: &str, ) -> Result, Error> { - use futures::TryStreamExt; - tracing::warn!("creating new tree"); let mut tree = bk_tree::BKTree::new(Hamming); - let mut rows = sqlx::query(&config.database_query).fetch(conn); + let mut rows = sqlx::query(query).fetch(conn); let mut count = 0; @@ -59,13 +62,9 @@ async fn create_tree( while let Some(row) = rows.try_next().await.map_err(Error::LoadingRow)? { let node: Node = row.get::(0).into(); - // Avoid checking if each value is unique if we were told that the - // database query only returns unique values. - let timer = crate::TREE_ADD_DURATION.start_timer(); - if config.database_is_unique || tree.find_exact(&node).is_none() { + if tree.find_exact(&node).is_none() { tree.add(node); } - timer.stop_and_record(); count += 1; if count % 250_000 == 0 { @@ -88,56 +87,122 @@ struct Payload { /// /// This will create a new tree to ensure all items are present. It will also /// automatically recreate trees as needed if the database connection is lost. -pub(crate) async fn listen_for_payloads( +pub(crate) async fn listen_for_payloads_db( conn: Pool, - config: Config, + subscription: String, + query: String, tree: Tree, initial: futures::channel::oneshot::Sender<()>, ) -> Result<(), Error> { - let mut listener = PgListener::connect_with(&conn) - .await - .map_err(Error::Listener)?; - listener - .listen(&config.database_subscribe) - .await - .map_err(Error::Listener)?; - - let new_tree = create_tree(&conn, &config).await?; - { - let mut tree = tree.write().await; - *tree = new_tree; - } - - initial - .send(()) - .expect_or_log("nothing listening for initial data"); + let mut initial = Some(initial); loop { - while let Some(notification) = listener.try_recv().await.map_err(Error::Listener)? { - let payload: Payload = - serde_json::from_str(notification.payload()).map_err(Error::Data)?; - tracing::debug!(hash = payload.hash, "evaluating new payload"); + let mut listener = PgListener::connect_with(&conn) + .await + .map_err(Error::Listener)?; + listener + .listen(&subscription) + .await + .map_err(Error::Listener)?; - let node: Node = payload.hash.into(); - - let _timer = crate::TREE_ADD_DURATION.start_timer(); - - let mut tree = tree.write().await; - if tree.find_exact(&node).is_some() { - tracing::trace!("hash already existed in tree"); - continue; - } - - tracing::trace!("hash did not exist, adding to tree"); - tree.add(node); - } - - tracing::error!("disconnected from listener, recreating tree"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - let new_tree = create_tree(&conn, &config).await?; + let new_tree = create_tree(&conn, &query).await?; { let mut tree = tree.write().await; *tree = new_tree; } + + if let Some(initial) = initial.take() { + initial + .send(()) + .expect_or_log("nothing listening for initial data"); + } + + while let Some(notification) = listener.try_recv().await.map_err(Error::Listener)? { + tracing::trace!("got postgres payload"); + process_payload(&tree, notification.payload().as_bytes()).await?; + } + + tracing::error!("disconnected from postgres listener, recreating tree"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; } } + +pub(crate) async fn listen_for_payloads_nats( + config: Config, + pool: sqlx::PgPool, + client: async_nats::Client, + tree: Tree, + initial: futures::channel::oneshot::Sender<()>, +) -> Result<(), Error> { + static STREAM_NAME: &str = "bkapi-hashes"; + + let jetstream = async_nats::jetstream::new(client); + let mut initial = Some(initial); + + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: STREAM_NAME.to_string(), + subjects: vec!["bkapi.add".to_string()], + max_age: std::time::Duration::from_secs(60 * 60 * 24), + retention: async_nats::jetstream::stream::RetentionPolicy::Interest, + ..Default::default() + }) + .await?; + + loop { + let consumer = stream + .get_or_create_consumer( + "bkapi-consumer", + async_nats::jetstream::consumer::pull::Config { + ..Default::default() + }, + ) + .await?; + + let new_tree = create_tree(&pool, &config.database_query).await?; + { + let mut tree = tree.write().await; + *tree = new_tree; + } + + if let Some(initial) = initial.take() { + initial + .send(()) + .expect_or_log("nothing listening for initial data"); + } + + let mut messages = consumer.messages().await?; + + while let Ok(Some(message)) = messages.try_next().await { + tracing::trace!("got nats payload"); + message.ack().await?; + process_payload(&tree, &message.payload).await?; + } + + tracing::error!("disconnected from nats listener, recreating tree"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } +} + +async fn process_payload(tree: &Tree, payload: &[u8]) -> Result<(), Error> { + let payload: Payload = serde_json::from_slice(payload).map_err(Error::Data)?; + tracing::trace!("got hash: {}", payload.hash); + + let node: Node = payload.hash.into(); + + let _timer = TREE_ADD_DURATION.start_timer(); + + let is_new_hash = { + let tree = tree.read().await; + tree.find_exact(&node).is_none() + }; + + if is_new_hash { + let mut tree = tree.write().await; + tree.add(node); + } + + tracing::debug!(hash = payload.hash, is_new_hash, "processed incoming hash"); + + Ok(()) +}