mirror of
https://github.com/Syfaro/bkapi.git
synced 2024-11-21 14:34:08 +00:00
Move client nats behind feature.
This commit is contained in:
parent
2448e9a8fa
commit
fcd698e033
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
|||||||
/target
|
/target
|
||||||
.env
|
.env
|
||||||
|
.vscode/
|
||||||
|
@ -5,20 +5,19 @@ authors = ["Syfaro <syfaro@huefox.com>"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
|
[features]
|
||||||
|
nats = ["async-nats"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
async-nats = { version = "0.27", optional = true }
|
||||||
|
futures = "0.3"
|
||||||
|
opentelemetry = "0.18"
|
||||||
|
opentelemetry-http = "0.7"
|
||||||
|
reqwest = { version = "0.11", features = ["json"] }
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-opentelemetry = "0.18"
|
tracing-opentelemetry = "0.18"
|
||||||
|
|
||||||
opentelemetry = "0.18"
|
|
||||||
opentelemetry-http = "0.7"
|
|
||||||
|
|
||||||
futures = "0.3"
|
|
||||||
|
|
||||||
reqwest = { version = "0.11", features = ["json"] }
|
|
||||||
async-nats = "0.27"
|
|
||||||
|
|
||||||
serde = { version = "1", features = ["derive"] }
|
|
||||||
serde_json = "1"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
@ -7,6 +7,11 @@
|
|||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
mod nats;
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
pub use nats::{BKApiNatsClient, HashDistance};
|
||||||
|
|
||||||
/// A search result, containing the searched information and all of the results.
|
/// A search result, containing the searched information and all of the results.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct SearchResults {
|
pub struct SearchResults {
|
||||||
@ -110,79 +115,6 @@ impl InjectContext for reqwest::RequestBuilder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The BKApi client, operating over NATS instead of HTTP.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct BKApiNatsClient {
|
|
||||||
client: async_nats::Client,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A hash and distance.
|
|
||||||
#[derive(serde::Serialize, serde::Deserialize)]
|
|
||||||
pub struct HashDistance {
|
|
||||||
/// Hash to search.
|
|
||||||
pub hash: i64,
|
|
||||||
/// Maximum distance from hash to include in results.
|
|
||||||
pub distance: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BKApiNatsClient {
|
|
||||||
const NATS_SUBJECT: &str = "bkapi.search";
|
|
||||||
|
|
||||||
/// Create a new client with a given NATS client.
|
|
||||||
pub fn new(client: async_nats::Client) -> Self {
|
|
||||||
Self { client }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Search for a single hash.
|
|
||||||
pub async fn search(
|
|
||||||
&self,
|
|
||||||
hash: i64,
|
|
||||||
distance: i32,
|
|
||||||
) -> Result<SearchResults, async_nats::Error> {
|
|
||||||
let hashes = [HashDistance {
|
|
||||||
hash,
|
|
||||||
distance: distance as u32,
|
|
||||||
}];
|
|
||||||
|
|
||||||
self.search_many(&hashes)
|
|
||||||
.await
|
|
||||||
.map(|mut results| results.remove(0))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Search many hashes at once.
|
|
||||||
pub async fn search_many(
|
|
||||||
&self,
|
|
||||||
hashes: &[HashDistance],
|
|
||||||
) -> Result<Vec<SearchResults>, async_nats::Error> {
|
|
||||||
let payload = serde_json::to_vec(hashes).unwrap();
|
|
||||||
|
|
||||||
let message = self
|
|
||||||
.client
|
|
||||||
.request(Self::NATS_SUBJECT.to_string(), payload.into())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let results: Vec<Vec<HashDistance>> = serde_json::from_slice(&message.payload).unwrap();
|
|
||||||
|
|
||||||
let results = results
|
|
||||||
.into_iter()
|
|
||||||
.zip(hashes)
|
|
||||||
.map(|(results, search)| SearchResults {
|
|
||||||
hash: search.hash,
|
|
||||||
distance: search.distance as u64,
|
|
||||||
hashes: results
|
|
||||||
.into_iter()
|
|
||||||
.map(|result| SearchResult {
|
|
||||||
hash: result.hash,
|
|
||||||
distance: result.distance as u64,
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Ok(results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
fn get_test_endpoint() -> String {
|
fn get_test_endpoint() -> String {
|
||||||
|
74
bkapi-client/src/nats.rs
Normal file
74
bkapi-client/src/nats.rs
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
use crate::{SearchResult, SearchResults};
|
||||||
|
|
||||||
|
/// The BKApi client, operating over NATS instead of HTTP.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct BKApiNatsClient {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A hash and distance.
|
||||||
|
#[derive(serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct HashDistance {
|
||||||
|
/// Hash to search.
|
||||||
|
pub hash: i64,
|
||||||
|
/// Maximum distance from hash to include in results.
|
||||||
|
pub distance: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BKApiNatsClient {
|
||||||
|
const NATS_SUBJECT: &str = "bkapi.search";
|
||||||
|
|
||||||
|
/// Create a new client with a given NATS client.
|
||||||
|
pub fn new(client: async_nats::Client) -> Self {
|
||||||
|
Self { client }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search for a single hash.
|
||||||
|
pub async fn search(
|
||||||
|
&self,
|
||||||
|
hash: i64,
|
||||||
|
distance: i32,
|
||||||
|
) -> Result<SearchResults, async_nats::Error> {
|
||||||
|
let hashes = [HashDistance {
|
||||||
|
hash,
|
||||||
|
distance: distance as u32,
|
||||||
|
}];
|
||||||
|
|
||||||
|
self.search_many(&hashes)
|
||||||
|
.await
|
||||||
|
.map(|mut results| results.remove(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search many hashes at once.
|
||||||
|
pub async fn search_many(
|
||||||
|
&self,
|
||||||
|
hashes: &[HashDistance],
|
||||||
|
) -> Result<Vec<SearchResults>, async_nats::Error> {
|
||||||
|
let payload = serde_json::to_vec(hashes).unwrap();
|
||||||
|
|
||||||
|
let message = self
|
||||||
|
.client
|
||||||
|
.request(Self::NATS_SUBJECT.to_string(), payload.into())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let results: Vec<Vec<HashDistance>> = serde_json::from_slice(&message.payload).unwrap();
|
||||||
|
|
||||||
|
let results = results
|
||||||
|
.into_iter()
|
||||||
|
.zip(hashes)
|
||||||
|
.map(|(results, search)| SearchResults {
|
||||||
|
hash: search.hash,
|
||||||
|
distance: search.distance as u64,
|
||||||
|
hashes: results
|
||||||
|
.into_iter()
|
||||||
|
.map(|result| SearchResult {
|
||||||
|
hash: result.hash,
|
||||||
|
distance: result.distance as u64,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user