Break apart into multiple microservices (#10)

* Update CI.

* Bump versions.

* Fix dependencies.

* Unify tracing and metrics export.

* Create service to hash image.

* Updates for hashing service.

* Fix missing file changes.

* Old changes I don't remember.

* Update dependencies, improve Docker images.

* Use BKApi instead of in-memory tree.

* Include health endpoint with metrics.

* Avoid some unwraps.
This commit is contained in:
Syfaro 2021-08-10 21:28:32 -04:00 committed by GitHub
parent 097a350724
commit 5773cc03f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1469 additions and 940 deletions

View File

@ -1,146 +0,0 @@
---
kind: pipeline
type: docker
name: default
platform:
os: linux
arch: amd64
steps:
- name: Run tests
pull: always
image: rust:1-slim-buster
commands:
- apt-get update -y
- apt-get install -y wget libssl-dev pkg-config
- apt-get install -y libavcodec-dev libavformat-dev libavutil-dev libavdevice-dev clang llvm
- apt-get install -y python3 python3-pip
- pip3 install cfscrape
- wget -O sccache.tar.gz https://github.com/mozilla/sccache/releases/download/0.2.13/sccache-0.2.13-x86_64-unknown-linux-musl.tar.gz
- tar zxvf sccache.tar.gz
- export RUSTC_WRAPPER=$(pwd)/sccache-0.2.13-x86_64-unknown-linux-musl/sccache
- export SQLX_OFFLINE=true
- cargo build
- cargo test
- $(pwd)/sccache-0.2.13-x86_64-unknown-linux-musl/sccache --show-stats
environment:
AWS_ACCESS_KEY_ID:
from_secret: sccache_s3_access_key
AWS_SECRET_ACCESS_KEY:
from_secret: sccache_s3_secret_key
SCCACHE_BUCKET: sccache
SCCACHE_ENDPOINT:
from_secret: sccache_s3_endpoint
SCCACHE_S3_USE_SSL: true
- name: Build FuzzySearch API
pull: always
image: plugins/docker
settings:
auto_tag: true
dockerfile: fuzzysearch/Dockerfile
password:
from_secret: docker_password
registry: registry.huefox.com
repo: registry.huefox.com/fuzzysearch/api
username:
from_secret: docker_username
when:
branch:
- main
event:
- push
paths:
- fuzzysearch/**
- Cargo.lock
- name: Build FuzzySearch Webhook
pull: always
image: plugins/docker
settings:
auto_tag: true
dockerfile: fuzzysearch-webhook/Dockerfile
password:
from_secret: docker_password
registry: registry.huefox.com
repo: registry.huefox.com/fuzzysearch/webhook
username:
from_secret: docker_username
when:
branch:
- main
event:
- push
paths:
- fuzzysearch-webhook/**
- Cargo.lock
- name: Build Ingester e621
pull: always
image: plugins/docker
settings:
auto_tag: true
dockerfile: fuzzysearch-ingest-e621/Dockerfile
password:
from_secret: docker_password
registry: registry.huefox.com
repo: registry.huefox.com/fuzzysearch/ingest-e621
username:
from_secret: docker_username
when:
branch:
- main
event:
- push
paths:
- fuzzysearch-ingest-e621/**
- Cargo.lock
- name: Build Ingester FurAffinity
pull: always
image: plugins/docker
settings:
auto_tag: true
dockerfile: fuzzysearch-ingest-furaffinity/Dockerfile
password:
from_secret: docker_password
registry: registry.huefox.com
repo: registry.huefox.com/fuzzysearch/ingest-furaffinity
username:
from_secret: docker_username
when:
branch:
- main
event:
- push
paths:
- fuzzysearch-ingest-furaffinity/**
- Cargo.lock
- name: Build Ingester Weasyl
pull: always
image: plugins/docker
settings:
auto_tag: true
dockerfile: fuzzysearch-ingest-weasyl/Dockerfile
password:
from_secret: docker_password
registry: registry.huefox.com
repo: registry.huefox.com/fuzzysearch/ingest-weasyl
username:
from_secret: docker_username
when:
branch:
- main
event:
- push
paths:
- fuzzysearch-ingest-weasyl/**
- Cargo.lock
---
kind: signature
hmac: af0338b214c113b628f362a1bff2b282dece671adc6247e88d11ec7e0c7edc2a
...

146
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,146 @@
stages:
- test
- build
- image
variables:
CARGO_HOME: "$CI_PROJECT_DIR/.cargo"
SQLX_OFFLINE: "true"
# Cache should only be updated once, default to pull only
cache: &global_cache
key:
files:
- Cargo.lock
paths:
- .cargo/
policy: pull
# Run tests on current stable Rust version
test:latest: &base_test
image: rust:1.53-slim-buster
stage: test
cache:
<<: *global_cache
policy: pull-push
before_script:
- apt-get update -y
- apt-get install -y libssl-dev pkg-config libavcodec-dev libavformat-dev libavutil-dev libavdevice-dev clang llvm python3 python3-pip
script:
# Build, test, and show stats
- cargo build --verbose
- cargo test --verbose
# Same as above, but nightly Rust
test:nightly:
<<: *base_test
image: rustlang/rust:nightly-slim
allow_failure: true
build:api: &base_build
<<: *base_test
stage: build
cache:
<<: *global_cache
policy: pull
needs: ['test:latest']
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch/fuzzysearch
script:
- cargo build --verbose --release --bin fuzzysearch
- mv ./target/release/fuzzysearch ./fuzzysearch/fuzzysearch
build:webhook:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-webhook/fuzzysearch-webhook
script:
- cargo build --verbose --release --bin fuzzysearch-webhook
- mv ./target/release/fuzzysearch-webhook ./fuzzysearch-webhook/fuzzysearch-webhook
build:hash-input:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-hash-input/fuzzysearch-hash-input
script:
- cargo build --verbose --release --bin fuzzysearch-hash-input
- mv ./target/release/fuzzysearch-hash-input ./fuzzysearch-hash-input/fuzzysearch-hash-input
build:ingest-e621:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621
script:
- cargo build --verbose --release --bin fuzzysearch-ingest-e621
- mv ./target/release/fuzzysearch-ingest-e621 ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621
build:ingest-furaffinity:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity
script:
- cargo build --verbose --release --bin fuzzysearch-ingest-furaffinity
- mv ./target/release/fuzzysearch-ingest-furaffinity ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity
build:ingest-weasyl:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl
script:
- cargo build --verbose --release --bin fuzzysearch-ingest-weasyl
- mv ./target/release/fuzzysearch-ingest-weasyl ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl
images:api: &base_images
stage: image
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
cache: {}
before_script:
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"auth\":\"$(echo -n ${CI_REGISTRY_USER}:${CI_REGISTRY_PASSWORD} | base64)\"}}}" > /kaniko/.docker/config.json
needs: ['build:api']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch/Dockerfile --destination $CI_REGISTRY_IMAGE/api:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/api:latest --cache=true
images:webhook:
<<: *base_images
needs: ['build:webhook']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-webhook/Dockerfile --destination $CI_REGISTRY_IMAGE/webhook:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/webhook:latest --cache=true
images:hash-input:
<<: *base_images
needs: ['build:hash-input']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-hash-input/Dockerfile --destination $CI_REGISTRY_IMAGE/hash-input:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/hash-input:latest --cache=true
images:ingest-e621:
<<: *base_images
needs: ['build:ingest-e621']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-e621/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-e621:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-e621:latest --cache=true
images:ingest-furaffinity:
<<: *base_images
needs: ['build:ingest-furaffinity']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-furaffinity/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-furaffinity:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-furaffinity:latest --cache=true
images:ingest-weasyl:
<<: *base_images
needs: ['build:ingest-weasyl']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-weasyl/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-weasyl:latest --cache=true

1150
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,18 +1,16 @@
[workspace]
members = [
"fuzzysearch",
"fuzzysearch-common",
"fuzzysearch",
"fuzzysearch-hash-input",
"fuzzysearch-webhook",
"fuzzysearch-ingest-e621",
"fuzzysearch-ingest-furaffinity",
"fuzzysearch-ingest-weasyl",
"fuzzysearch-webhook"
]
[profile.dev]
[profile.dev.package."*"]
opt-level = 2
debug = true
[profile.release]
lto = true
codegen-units = 1
opt-level = 3

View File

@ -5,19 +5,25 @@ authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018"
[features]
default = []
default = ["trace"]
video = ["ffmpeg-next", "tempfile"]
queue = ["faktory", "tokio", "serde_json"]
trace = ["opentelemetry", "opentelemetry-jaeger", "tracing-opentelemetry", "opentelemetry-http", "hyper", "prometheus", "tokio", "reqwest"]
[dependencies]
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-log = "0.1"
tokio = { version = "1", features = ["rt"], optional = true }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
base64 = "0.13"
serde_json = { version = "1", optional = true }
base64 = "0.13"
image = "0.23"
img_hash = "3"
@ -25,5 +31,12 @@ ffmpeg-next = { version = "4", optional = true }
tempfile = { version = "3", optional = true }
faktory = { version = "0.11", optional = true }
tokio = { version = "1", features = ["rt"], optional = true }
serde_json = { version = "1", optional = true }
opentelemetry = { version = "0.15.0", features = ["rt-tokio"], optional = true }
opentelemetry-jaeger = { version = "0.14", features = ["tokio"], optional = true }
tracing-opentelemetry = { version = "0.14", optional = true }
opentelemetry-http = { version = "0.4", optional = true }
hyper = { version = "0.14", features = ["server", "http2", "tcp"], optional = true }
prometheus = { version = "0.12", optional = true }
reqwest = { version = "0.11", optional = true }

View File

@ -4,6 +4,9 @@ pub mod types;
#[cfg(feature = "video")]
pub mod video;
#[cfg(feature = "trace")]
pub mod trace;
/// Create an instance of img_hash with project defaults.
pub fn get_hasher() -> img_hash::Hasher<[u8; 8]> {
use img_hash::{HashAlg::Gradient, HasherConfig};

View File

@ -0,0 +1,118 @@
pub fn configure_tracing(service_name: &'static str) {
use opentelemetry::KeyValue;
use tracing_subscriber::layer::SubscriberExt;
tracing_log::LogTracer::init().unwrap();
let env = std::env::var("ENVIRONMENT");
let env = if let Ok(env) = env.as_ref() {
env.as_str()
} else if cfg!(debug_assertions) {
"debug"
} else {
"release"
};
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
.with_service_name(service_name)
.with_tags(vec![
KeyValue::new("environment", env.to_owned()),
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
])
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let trace = tracing_opentelemetry::layer().with_tracer(tracer);
let env_filter = tracing_subscriber::EnvFilter::from_default_env();
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) {
let subscriber = tracing_subscriber::fmt::layer()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.with_target(true);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
} else {
let subscriber = tracing_subscriber::fmt::layer();
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
}
tracing::debug!(service_name, "set application tracing service name");
}
async fn metrics(
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response, StatusCode};
match req.uri().path() {
"/health" => Ok(Response::new(Body::from("OK"))),
"/metrics" => {
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(Response::new(Body::from(buffer)))
}
_ => {
let mut not_found = Response::new(Body::default());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}
pub async fn serve_metrics() {
use hyper::{
server::Server,
service::{make_service_fn, service_fn},
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect("Missing METRICS_HOST")
.parse()
.expect("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
server.await.expect("Metrics server error");
});
}
pub trait InjectContext {
fn inject_context(self) -> Self;
}
impl InjectContext for reqwest::RequestBuilder {
fn inject_context(self: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let mut headers: reqwest::header::HeaderMap = Default::default();
let cx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut opentelemetry_http::HeaderInjector(&mut headers))
});
self.headers(headers)
}
}

View File

@ -0,0 +1,25 @@
[package]
name = "fuzzysearch-hash-input"
version = "0.1.0"
authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018"
[dependencies]
tracing = "0.1"
anyhow = "1"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tempfile = "3"
image = "0.23"
actix-web = "4.0.0-beta.5"
actix-http = "3.0.0-beta.5"
actix-multipart = "0.4.0-beta.4"
tracing-actix-web = { version = "0.4.0-beta.4", features = ["opentelemetry_0_15"] }
lazy_static = "1"
prometheus = { version = "0.12", features = ["process"] }
fuzzysearch-common = { path = "../fuzzysearch-common" }

View File

@ -0,0 +1,4 @@
FROM debian:buster-slim
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY ./fuzzysearch-hash-input/fuzzysearch-hash-input /bin/fuzzysearch-hash-input
CMD ["/bin/fuzzysearch-hash-input"]

View File

@ -0,0 +1,140 @@
use std::{
convert::TryInto,
io::{BufReader, SeekFrom},
};
use actix_web::{post, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder};
use tempfile::tempfile;
use tokio::{
io::{AsyncSeekExt, AsyncWriteExt},
sync::Semaphore,
};
use tokio_stream::StreamExt;
lazy_static::lazy_static! {
static ref IMAGE_LOADING_DURATION: prometheus::Histogram =
prometheus::register_histogram!("fuzzysearch_image_image_loading_seconds", "Duration to download and save image").unwrap();
static ref IMAGE_DECODING_DURATION: prometheus::Histogram =
prometheus::register_histogram!("fuzzysearch_image_image_decoding_seconds", "Duration to decode image data").unwrap();
static ref IMAGE_HASHING_DURATION: prometheus::Histogram =
prometheus::register_histogram!("fuzzysearch_image_image_hashing_seconds", "Duration to hash image").unwrap();
}
enum ImageResponse {
Hash(i64),
Error(anyhow::Error),
}
impl Responder for ImageResponse {
fn respond_to(self, _req: &HttpRequest) -> HttpResponse {
match self {
ImageResponse::Hash(hash) => HttpResponse::Ok()
.content_type("text/plain")
.body(hash.to_string()),
ImageResponse::Error(error) => HttpResponse::BadRequest()
.content_type("text/plain")
.body(error.to_string()),
}
}
}
#[tracing::instrument(err, skip(field, semaphore))]
async fn process_image(
mut field: actix_multipart::Field,
semaphore: Data<Semaphore>,
) -> anyhow::Result<i64> {
tracing::debug!("creating temp file");
let loading_duration = IMAGE_LOADING_DURATION.start_timer();
let mut file =
tokio::task::spawn_blocking(move || -> anyhow::Result<tokio::fs::File, anyhow::Error> {
let file = tempfile()?;
Ok(tokio::fs::File::from_std(file))
})
.await??;
tracing::debug!("writing contents to temp file");
let mut size = 0;
while let Ok(Some(chunk)) = field.try_next().await {
file.write_all(&chunk).await?;
size += chunk.len();
}
tracing::debug!("file was {} bytes", size);
tracing::debug!("returning file to beginning");
file.seek(SeekFrom::Start(0)).await?;
let file = file.into_std().await;
loading_duration.stop_and_record();
tracing::debug!("getting semaphore permit");
let _permit = semaphore.acquire().await?;
tracing::debug!("decoding and hashing image");
let hash = tokio::task::spawn_blocking(move || -> anyhow::Result<i64, anyhow::Error> {
let decoding_duration = IMAGE_DECODING_DURATION.start_timer();
let reader = BufReader::new(file);
let reader = image::io::Reader::new(reader).with_guessed_format()?;
let im = reader.decode()?;
decoding_duration.stop_and_record();
let hashing_duration = IMAGE_HASHING_DURATION.start_timer();
let image_hash = fuzzysearch_common::get_hasher().hash_image(&im);
let hash: [u8; 8] = image_hash.as_bytes().try_into()?;
let hash = i64::from_be_bytes(hash);
hashing_duration.stop_and_record();
Ok(hash)
})
.await??;
tracing::debug!("calculated image hash: {}", hash);
Ok(hash)
}
#[post("/image")]
async fn post_image(
mut form: actix_multipart::Multipart,
semaphore: Data<Semaphore>,
) -> impl Responder {
while let Ok(Some(field)) = form.try_next().await {
tracing::debug!("got multipart field: {:?}", field);
let content_type = if let Some(content_disposition) = field.content_disposition() {
content_disposition
} else {
continue;
};
if !matches!(content_type.get_name(), Some("image")) {
continue;
}
match process_image(field, semaphore).await {
Ok(hash) => return ImageResponse::Hash(hash),
Err(err) => return ImageResponse::Error(err),
}
}
ImageResponse::Error(anyhow::anyhow!("missing image field"))
}
#[actix_web::main]
async fn main() {
fuzzysearch_common::trace::configure_tracing("fuzzysearch-image");
fuzzysearch_common::trace::serve_metrics().await;
let semaphore = Data::new(Semaphore::new(4));
HttpServer::new(move || {
App::new()
.wrap(tracing_actix_web::TracingLogger::default())
.app_data(semaphore.clone())
.service(post_image)
})
.workers(2)
.bind("0.0.0.0:8090")
.unwrap()
.run()
.await
.unwrap();
}

View File

@ -16,7 +16,6 @@ serde_json = "1"
sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] }
image = "0.23"
hamming = "0.1.3"
img_hash = "3"
sha2 = "0.9"

View File

@ -1,14 +1,6 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
COPY . .
RUN cargo install --root / --path ./fuzzysearch-ingest-e621
FROM debian:buster-slim
EXPOSE 8080
ENV METRICS_HOST=0.0.0.0:8080
WORKDIR /app
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-ingest-e621 /bin/fuzzysearch-ingest-e621
COPY ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621 /bin/fuzzysearch-ingest-e621
CMD ["/bin/fuzzysearch-ingest-e621"]

View File

@ -30,9 +30,8 @@ type Auth = (String, Option<String>);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fuzzysearch_common::init_logger();
create_metrics_server().await;
fuzzysearch_common::trace::configure_tracing("fuzzysearch-ingest-e621");
fuzzysearch_common::trace::serve_metrics().await;
let login = std::env::var("E621_LOGIN").expect_or_log("Missing E621_LOGIN");
let api_key = std::env::var("E621_API_KEY").expect_or_log("Missing E621_API_KEY");
@ -345,41 +344,3 @@ async fn load_image(client: &reqwest::Client, url: &str) -> anyhow::Result<Image
Ok((Some(hash), None, Some(result)))
}
async fn provide_metrics(
_: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response};
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder
.encode(&metric_families, &mut buffer)
.unwrap_or_log();
Ok(Response::new(Body::from(buffer)))
}
async fn create_metrics_server() {
use hyper::{
service::{make_service_fn, service_fn},
Server,
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(provide_metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect_or_log("Missing METRICS_HOST")
.parse()
.expect_or_log("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move { server.await.expect_or_log("Metrics server error") });
}

View File

@ -6,11 +6,8 @@ edition = "2018"
[dependencies]
reqwest = "0.11"
postgres = { version = "0.19", features = ["with-chrono-0_4"] }
tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "0.7.0" }
r2d2_postgres = " 0.18.0"
r2d2 = "0.8"
tokio-postgres = { version = "0.7.0", features = ["with-chrono-0_4"] }
chrono = "0.4"
hyper = { version = "0.14", features = ["server"] }
prometheus = { version = "0.12", features = ["process"] }
@ -26,4 +23,5 @@ fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] }
[dependencies.furaffinity-rs]
git = "https://github.com/Syfaro/furaffinity-rs"
branch = "main"
features = ["cloudflare-bypass"]

View File

@ -1,14 +1,7 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config python3 python3-pip
COPY . .
RUN cargo install --root / --path ./fuzzysearch-ingest-furaffinity
FROM debian:buster-slim
RUN apt-get update -y && \
apt-get install -y openssl ca-certificates python3 python3-pip && \
python3 -m pip --no-cache-dir install cfscrape && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-ingest-furaffinity /bin/fuzzysearch-ingest-furaffinity
COPY ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity /bin/fuzzysearch-ingest-furaffinity
CMD ["/bin/fuzzysearch-ingest-furaffinity"]

View File

@ -115,51 +115,6 @@ async fn insert_null_submission(client: &Client, id: i32) -> Result<u64, tokio_p
.await
}
async fn request(
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/health") => Ok(hyper::Response::new(hyper::Body::from("OK"))),
(&hyper::Method::GET, "/metrics") => {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder
.encode(&metric_families, &mut buffer)
.unwrap_or_log();
Ok(hyper::Response::new(hyper::Body::from(buffer)))
}
_ => {
let mut not_found = hyper::Response::default();
*not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}
async fn web() {
use hyper::service::{make_service_fn, service_fn};
let addr: std::net::SocketAddr = std::env::var("HTTP_HOST")
.expect_or_log("Missing HTTP_HOST")
.parse()
.unwrap_or_log();
let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(request)) });
let server = hyper::Server::bind(&addr).serve(service);
tracing::info!("Listening on http://{}", addr);
server.await.unwrap_or_log();
}
struct RetryHandler {
max_attempts: usize,
}
@ -270,7 +225,8 @@ async fn process_submission(
#[tokio::main]
async fn main() {
fuzzysearch_common::init_logger();
fuzzysearch_common::trace::configure_tracing("fuzzysearch-ingest-furaffinity");
fuzzysearch_common::trace::serve_metrics().await;
let (cookie_a, cookie_b) = (
std::env::var("FA_A").expect_or_log("Missing FA_A"),
@ -297,8 +253,6 @@ async fn main() {
}
});
tokio::spawn(async move { web().await });
let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn)
.await

View File

@ -1,11 +1,4 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
COPY . .
RUN cargo install --root / --path ./fuzzysearch-ingest-weasyl
FROM debian:buster-slim
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-ingest-weasyl /bin/fuzzysearch-ingest-weasyl
COPY ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl /bin/fuzzysearch-ingest-weasyl
CMD ["/bin/fuzzysearch-ingest-weasyl"]

View File

@ -198,7 +198,8 @@ async fn insert_null(
#[tokio::main]
async fn main() {
fuzzysearch_common::init_logger();
fuzzysearch_common::trace::configure_tracing("fuzzysearch-ingest-weasyl");
fuzzysearch_common::trace::serve_metrics().await;
let api_key = std::env::var("WEASYL_APIKEY").unwrap_or_log();

View File

@ -1,12 +1,4 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
COPY . .
RUN cargo install --root / --path ./fuzzysearch-webhook
FROM debian:buster-slim
WORKDIR /app
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-webhook /bin/fuzzysearch-webhook
COPY ./fuzzysearch-webhook/fuzzysearch-webhook /bin/fuzzysearch-webhook
CMD ["/bin/fuzzysearch-webhook"]

View File

@ -27,7 +27,7 @@ pub enum WebhookError {
}
fn main() {
fuzzysearch_common::init_logger();
fuzzysearch_common::trace::configure_tracing("fuzzysearch-webhook");
tracing::info!("Starting...");

View File

@ -8,17 +8,18 @@ edition = "2018"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2"
tracing-log = "0.1"
prometheus = { version = "0.12", features = ["process"] }
lazy_static = "1"
opentelemetry = { version = "0.13", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.12", features = ["tokio"] }
tracing-opentelemetry = "0.12"
opentelemetry-http = "0.2"
opentelemetry = { version = "0.15.0", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.14", features = ["tokio"] }
tracing-opentelemetry = "0.14"
opentelemetry-http = "0.4"
tokio = { version = "1", features = ["full"] }
async-stream = "0.3"
tokio-stream = "0.1"
futures = "0.3"
@ -29,17 +30,16 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
warp = "0.3"
reqwest = "0.11"
reqwest = { version = "0.11", features = ["multipart"] }
hyper = "0.14"
sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] }
infer = { version = "0.3", default-features = false }
ffmpeg-next = "4"
image = "0.23"
img_hash = "3"
hamming = "0.1"
bk-tree = "0.3"
bkapi-client = { git = "https://github.com/Syfaro/bkapi.git" }
fuzzysearch-common = { path = "../fuzzysearch-common", features = ["video"] }
fuzzysearch-common = { path = "../fuzzysearch-common" }

View File

@ -1,14 +1,6 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config libavcodec-dev libavformat-dev libavutil-dev libavdevice-dev clang llvm
COPY . .
RUN cargo install --root / --path ./fuzzysearch
FROM debian:buster-slim
EXPOSE 8080 8081
ENV METRICS_HOST=0.0.0.0:8081
WORKDIR /app
RUN apt-get update -y && apt-get install -y --no-install-recommends openssl ca-certificates ffmpeg && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch /bin/fuzzysearch
COPY ./fuzzysearch/fuzzysearch /bin/fuzzysearch
CMD ["/bin/fuzzysearch"]

View File

@ -1,19 +1,19 @@
use crate::types::*;
use crate::{handlers, Pool, Tree};
use crate::{handlers, Pool};
use crate::{types::*, Endpoints};
use std::convert::Infallible;
use tracing_futures::Instrument;
use warp::{Filter, Rejection, Reply};
pub fn search(
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
endpoints: Endpoints,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
search_image(db.clone(), tree.clone())
.or(search_hashes(db.clone(), tree.clone()))
search_image(db.clone(), bkapi.clone(), endpoints)
.or(search_hashes(db.clone(), bkapi.clone()))
.or(search_file(db.clone()))
.or(search_video(db.clone()))
.or(check_handle(db.clone()))
.or(search_image_by_url(db, tree))
.or(search_image_by_url(db, bkapi))
}
pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
@ -34,7 +34,8 @@ pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Reject
pub fn search_image(
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
endpoints: Endpoints,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("image")
.and(warp::header::headers_cloned())
@ -42,65 +43,51 @@ pub fn search_image(
.and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(warp::query::<ImageSearchOpts>())
.and(with_pool(db))
.and(with_tree(tree))
.and(with_bkapi(bkapi))
.and(with_api_key())
.and_then(|headers, form, opts, pool, tree, api_key| {
.and(with_endpoints(endpoints))
.and_then(|headers, form, opts, pool, bkapi, api_key, endpoints| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_image", ?opts);
span.set_parent(with_telem(headers));
span.in_scope(|| {
handlers::search_image(form, opts, pool, tree, api_key).in_current_span()
handlers::search_image(form, opts, pool, bkapi, api_key, endpoints)
.in_current_span()
})
})
}
pub fn search_image_by_url(
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("url")
.and(warp::get())
.and(warp::query::<UrlSearchOpts>())
.and(with_pool(db))
.and(with_tree(tree))
.and(with_bkapi(bkapi))
.and(with_api_key())
.and_then(handlers::search_image_by_url)
}
pub fn search_hashes(
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("hashes")
.and(warp::header::headers_cloned())
.and(warp::get())
.and(warp::query::<HashSearchOpts>())
.and(with_pool(db))
.and(with_tree(tree))
.and(with_bkapi(bkapi))
.and(with_api_key())
.and_then(|headers, opts, db, tree, api_key| {
.and_then(|headers, opts, db, bkapi, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_hashes", ?opts);
span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_hashes(opts, db, tree, api_key).in_current_span())
})
}
pub fn search_video(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("video")
.and(warp::header::headers_cloned())
.and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(with_pool(db))
.and(with_api_key())
.and_then(|headers, form, db, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_video");
span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_video(form, db, api_key).in_current_span())
span.in_scope(|| handlers::search_hashes(opts, db, bkapi, api_key).in_current_span())
})
}
@ -120,8 +107,16 @@ fn with_pool(db: Pool) -> impl Filter<Extract = (Pool,), Error = Infallible> + C
warp::any().map(move || db.clone())
}
fn with_tree(tree: Tree) -> impl Filter<Extract = (Tree,), Error = Infallible> + Clone {
warp::any().map(move || tree.clone())
fn with_bkapi(
bkapi: bkapi_client::BKApiClient,
) -> impl Filter<Extract = (bkapi_client::BKApiClient,), Error = Infallible> + Clone {
warp::any().map(move || bkapi.clone())
}
fn with_endpoints(
endpoints: Endpoints,
) -> impl Filter<Extract = (Endpoints,), Error = Infallible> + Clone {
warp::any().map(move || endpoints.clone())
}
fn with_telem(headers: warp::http::HeaderMap) -> opentelemetry::Context {

View File

@ -1,3 +1,6 @@
use futures::StreamExt;
use futures::TryStreamExt;
use hyper::StatusCode;
use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
use std::convert::TryInto;
@ -7,8 +10,12 @@ use warp::{Rejection, Reply};
use crate::models::image_query;
use crate::types::*;
use crate::{early_return, rate_limit, Pool, Tree};
use fuzzysearch_common::types::{SearchResult, SiteInfo};
use crate::Endpoints;
use crate::{early_return, rate_limit, Pool};
use fuzzysearch_common::{
trace::InjectContext,
types::{SearchResult, SiteInfo},
};
lazy_static! {
static ref IMAGE_HASH_DURATION: Histogram = register_histogram!(
@ -37,6 +44,7 @@ lazy_static! {
enum Error {
Postgres(sqlx::Error),
Reqwest(reqwest::Error),
Warp(warp::Error),
InvalidData,
InvalidImage,
ApiKey,
@ -46,7 +54,7 @@ enum Error {
impl warp::Reply for Error {
fn into_response(self) -> warp::reply::Response {
let msg = match self {
Error::Postgres(_) | Error::Reqwest(_) => ErrorMessage {
Error::Postgres(_) | Error::Reqwest(_) | Error::Warp(_) => ErrorMessage {
code: 500,
message: "Internal server error".to_string(),
},
@ -89,98 +97,89 @@ impl From<reqwest::Error> for Error {
}
}
async fn get_field_bytes(form: warp::multipart::FormData, field: &str) -> bytes::BytesMut {
use bytes::BufMut;
use futures::StreamExt;
let parts: Vec<_> = form.collect().await;
let mut parts = parts
.into_iter()
.map(|part| {
let part = part.unwrap();
(part.name().to_string(), part)
})
.collect::<std::collections::HashMap<_, _>>();
let data = parts.remove(field).unwrap();
data.stream()
.fold(bytes::BytesMut::new(), |mut b, data| {
b.put(data.unwrap());
async move { b }
})
.await
impl From<warp::Error> for Error {
fn from(err: warp::Error) -> Self {
Self::Warp(err)
}
}
#[tracing::instrument(skip(form))]
async fn hash_input(form: warp::multipart::FormData) -> i64 {
let bytes = get_field_bytes(form, "image").await;
#[tracing::instrument(skip(endpoints, form))]
async fn hash_input(
endpoints: &Endpoints,
mut form: warp::multipart::FormData,
) -> Result<i64, Error> {
let mut image_part = None;
let len = bytes.len();
let _timer = IMAGE_HASH_DURATION.start_timer();
let hash = tokio::task::spawn_blocking(move || {
let hasher = fuzzysearch_common::get_hasher();
let image = image::load_from_memory(&bytes).unwrap();
hasher.hash_image(&image)
})
.instrument(span!(tracing::Level::TRACE, "hashing image", len))
.await
.unwrap();
drop(_timer);
let mut buf: [u8; 8] = [0; 8];
buf.copy_from_slice(&hash.as_bytes());
i64::from_be_bytes(buf)
}
#[tracing::instrument(skip(form))]
async fn hash_video(form: warp::multipart::FormData) -> Option<Vec<[u8; 8]>> {
use bytes::Buf;
let bytes = get_field_bytes(form, "video").await;
let _timer = VIDEO_HASH_DURATION.start_timer();
let hashes = tokio::task::spawn_blocking(move || {
if infer::is_video(&bytes) {
fuzzysearch_common::video::extract_video_hashes(bytes.reader()).ok()
} else if infer::image::is_gif(&bytes) {
fuzzysearch_common::video::extract_gif_hashes(bytes.reader()).ok()
} else {
None
tracing::debug!("looking at image parts");
while let Ok(Some(part)) = form.try_next().await {
if part.name() == "image" {
image_part = Some(part);
}
})
.instrument(span!(tracing::Level::TRACE, "hashing video"))
.await
.unwrap();
drop(_timer);
}
hashes
let image_part = image_part.ok_or(Error::InvalidImage)?;
tracing::debug!("found image part, reading data");
let bytes = image_part
.stream()
.fold(bytes::BytesMut::new(), |mut buf, chunk| {
use bytes::BufMut;
buf.put(chunk.unwrap());
async move { buf }
})
.await;
let part = reqwest::multipart::Part::bytes(bytes.to_vec());
let form = reqwest::multipart::Form::new().part("image", part);
tracing::debug!("sending image to hash input service");
let client = reqwest::Client::new();
let resp = client
.post(&endpoints.hash_input)
.inject_context()
.multipart(form)
.send()
.await?;
tracing::debug!("got response");
if resp.status() != StatusCode::OK {
return Err(Error::InvalidImage);
}
let hash: i64 = resp
.text()
.await?
.parse()
.map_err(|_err| Error::InvalidImage)?;
Ok(hash)
}
pub async fn search_image(
form: warp::multipart::FormData,
opts: ImageSearchOpts,
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
api_key: String,
endpoints: Endpoints,
) -> Result<Box<dyn Reply>, Rejection> {
let image_remaining = rate_limit!(&api_key, &db, image_limit, "image");
let hash_remaining = rate_limit!(&api_key, &db, hash_limit, "hash");
let num = hash_input(form).await;
let num = early_return!(hash_input(&endpoints, form).await);
let mut items = {
if opts.search_type == Some(ImageSearchType::Force) {
image_query(db.clone(), tree.clone(), vec![num], 10)
image_query(db.clone(), bkapi.clone(), vec![num], 10)
.await
.unwrap()
} else {
let results = image_query(db.clone(), tree.clone(), vec![num], 0)
let results = image_query(db.clone(), bkapi.clone(), vec![num], 0)
.await
.unwrap();
if results.is_empty() && opts.search_type != Some(ImageSearchType::Exact) {
image_query(db.clone(), tree.clone(), vec![num], 10)
image_query(db.clone(), bkapi.clone(), vec![num], 10)
.await
.unwrap()
} else {
@ -220,7 +219,7 @@ pub async fn search_image(
pub async fn search_hashes(
opts: HashSearchOpts,
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
api_key: String,
) -> Result<Box<dyn Reply>, Rejection> {
let pool = db.clone();
@ -239,7 +238,7 @@ pub async fn search_hashes(
let image_remaining = rate_limit!(&api_key, &db, image_limit, "image", hashes.len() as i16);
let results =
early_return!(image_query(pool, tree, hashes.clone(), opts.distance.unwrap_or(10),).await);
early_return!(image_query(pool, bkapi, hashes.clone(), opts.distance.unwrap_or(10)).await);
let resp = warp::http::Response::builder()
.header("x-rate-limit-total-image", image_remaining.1.to_string())
@ -385,16 +384,6 @@ pub async fn search_file(
Ok(Box::new(resp))
}
pub async fn search_video(
form: warp::multipart::FormData,
_db: Pool,
_api_key: String,
) -> Result<impl Reply, Rejection> {
let hashes = hash_video(form).await;
Ok(warp::reply::json(&hashes))
}
pub async fn check_handle(opts: HandleOpts, db: Pool) -> Result<Box<dyn Reply>, Rejection> {
let exists = if let Some(handle) = opts.twitter {
let result = sqlx::query_scalar!("SELECT exists(SELECT 1 FROM twitter_user WHERE lower(data->>'screen_name') = lower($1))", handle)
@ -413,7 +402,7 @@ pub async fn check_handle(opts: HandleOpts, db: Pool) -> Result<Box<dyn Reply>,
pub async fn search_image_by_url(
opts: UrlSearchOpts,
db: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
api_key: String,
) -> Result<Box<dyn Reply>, Rejection> {
use bytes::BufMut;
@ -470,7 +459,7 @@ pub async fn search_image_by_url(
let hash: [u8; 8] = hash.as_bytes().try_into().unwrap();
let num = i64::from_be_bytes(hash);
let results = image_query(db.clone(), tree.clone(), vec![num], 3)
let results = image_query(db.clone(), bkapi.clone(), vec![num], 3)
.await
.unwrap();

View File

@ -1,7 +1,5 @@
#![recursion_limit = "256"]
use std::sync::Arc;
use tokio::sync::RwLock;
use warp::Filter;
mod filters;
@ -10,37 +8,18 @@ mod models;
mod types;
mod utils;
type Tree = Arc<RwLock<bk_tree::BKTree<Node, Hamming>>>;
type Pool = sqlx::PgPool;
#[derive(Debug)]
pub struct Node(pub [u8; 8]);
impl Node {
pub fn new(hash: i64) -> Self {
Self(hash.to_be_bytes())
}
pub fn query(hash: [u8; 8]) -> Self {
Self(hash)
}
pub fn num(&self) -> i64 {
i64::from_be_bytes(self.0)
}
}
pub struct Hamming;
impl bk_tree::Metric<Node> for Hamming {
fn distance(&self, a: &Node, b: &Node) -> u64 {
hamming::distance_fast(&a.0, &b.0).unwrap()
}
#[derive(Clone)]
pub struct Endpoints {
pub hash_input: String,
pub bkapi: String,
}
#[tokio::main]
async fn main() {
configure_tracing();
fuzzysearch_common::trace::configure_tracing("fuzzysearch");
fuzzysearch_common::trace::serve_metrics().await;
ffmpeg_next::init().expect("Unable to initialize ffmpeg");
@ -50,11 +29,12 @@ async fn main() {
.await
.expect("Unable to create Postgres pool");
serve_metrics().await;
let endpoints = Endpoints {
hash_input: std::env::var("ENDPOINT_HASH_INPUT").expect("Missing ENDPOINT_HASH_INPUT"),
bkapi: std::env::var("ENDPOINT_BKAPI").expect("Missing ENDPOINT_BKAPI"),
};
let tree: Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(Hamming)));
load_updates(db_pool.clone(), tree.clone()).await;
let bkapi = bkapi_client::BKApiClient::new(&endpoints.bkapi);
let log = warp::log("fuzzysearch");
let cors = warp::cors()
@ -64,7 +44,7 @@ async fn main() {
let options = warp::options().map(|| "");
let api = options.or(filters::search(db_pool, tree));
let api = options.or(filters::search(db_pool, bkapi, endpoints));
let routes = api
.or(warp::path::end()
.map(|| warp::redirect(warp::http::Uri::from_static("https://fuzzysearch.net"))))
@ -74,173 +54,3 @@ async fn main() {
warp::serve(routes).run(([0, 0, 0, 0], 8080)).await;
}
fn configure_tracing() {
use opentelemetry::KeyValue;
use tracing_subscriber::layer::SubscriberExt;
let env = std::env::var("ENVIRONMENT");
let env = if let Ok(env) = env.as_ref() {
env.as_str()
} else if cfg!(debug_assertions) {
"debug"
} else {
"release"
};
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
.with_service_name("fuzzysearch")
.with_tags(vec![
KeyValue::new("environment", env.to_owned()),
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
])
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let trace = tracing_opentelemetry::layer().with_tracer(tracer);
let env_filter = tracing_subscriber::EnvFilter::from_default_env();
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) {
let subscriber = tracing_subscriber::fmt::layer()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.with_target(true);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
} else {
let subscriber = tracing_subscriber::fmt::layer();
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
}
}
async fn metrics(
_: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response};
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(Response::new(Body::from(buffer)))
}
async fn serve_metrics() {
use hyper::{
service::{make_service_fn, service_fn},
Server,
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect("Missing METRICS_HOST")
.parse()
.expect("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
server.await.expect("Metrics server error");
});
}
#[derive(serde::Deserialize)]
struct HashRow {
hash: i64,
}
async fn create_tree(conn: &Pool) -> bk_tree::BKTree<Node, Hamming> {
use futures::TryStreamExt;
let mut tree = bk_tree::BKTree::new(Hamming);
let mut rows = sqlx::query!(
"SELECT hash_int hash FROM submission WHERE hash_int IS NOT NULL
UNION ALL
SELECT hash FROM e621 WHERE hash IS NOT NULL
UNION ALL
SELECT hash FROM tweet_media WHERE hash IS NOT NULL
UNION ALL
SELECT hash FROM weasyl WHERE hash IS NOT NULL"
)
.fetch(conn);
let mut count = 0;
while let Some(row) = rows.try_next().await.expect("Unable to get row") {
if let Some(hash) = row.hash {
let node = Node::new(hash);
if tree.find_exact(&node).is_none() {
tree.add(node);
}
count += 1;
if count % 250_000 == 0 {
tracing::debug!(count, "Made progress in loading tree rows");
}
}
}
tracing::info!(count, "Completed loading rows for tree");
tree
}
async fn load_updates(conn: Pool, tree: Tree) {
let mut listener = sqlx::postgres::PgListener::connect_with(&conn)
.await
.unwrap();
listener.listen("fuzzysearch_hash_added").await.unwrap();
let new_tree = create_tree(&conn).await;
let mut lock = tree.write().await;
*lock = new_tree;
drop(lock);
tokio::spawn(async move {
loop {
while let Some(notification) = listener
.try_recv()
.await
.expect("Unable to recv notification")
{
let payload: HashRow = serde_json::from_str(notification.payload()).unwrap();
tracing::debug!(hash = payload.hash, "Adding new hash to tree");
let lock = tree.read().await;
if lock.find_exact(&Node::new(payload.hash)).is_some() {
continue;
}
drop(lock);
let mut lock = tree.write().await;
lock.add(Node(payload.hash.to_be_bytes()));
drop(lock);
}
tracing::error!("Lost connection to Postgres, recreating tree");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
let new_tree = create_tree(&conn).await;
let mut lock = tree.write().await;
*lock = new_tree;
drop(lock);
tracing::info!("Replaced tree");
}
});
}

View File

@ -2,15 +2,10 @@ use lazy_static::lazy_static;
use prometheus::{register_histogram, Histogram};
use crate::types::*;
use crate::{Pool, Tree};
use crate::Pool;
use fuzzysearch_common::types::{SearchResult, SiteInfo};
lazy_static! {
static ref IMAGE_TREE_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_tree_seconds",
"Duration to search for hashes in tree"
)
.unwrap();
static ref IMAGE_QUERY_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_query_seconds",
"Duration to perform a single image lookup query"
@ -51,28 +46,30 @@ struct HashSearch {
distance: u64,
}
#[tracing::instrument(skip(pool, tree))]
#[tracing::instrument(skip(pool, bkapi))]
pub async fn image_query(
pool: Pool,
tree: Tree,
bkapi: bkapi_client::BKApiClient,
hashes: Vec<i64>,
distance: i64,
) -> Result<Vec<SearchResult>, sqlx::Error> {
let timer = IMAGE_TREE_DURATION.start_timer();
let lock = tree.read().await;
let found_hashes: Vec<HashSearch> = hashes
.iter()
.flat_map(|hash| {
lock.find(&crate::Node::new(*hash), distance as u64)
.map(|(dist, node)| HashSearch {
searched_hash: *hash,
found_hash: node.num(),
distance: dist,
let found_hashes: Vec<HashSearch> = bkapi
.search_many(&hashes, distance as u64)
.await
.unwrap()
.into_iter()
.flat_map(|results| {
results
.hashes
.iter()
.map(|hash| HashSearch {
searched_hash: results.hash,
found_hash: hash.hash,
distance: hash.distance,
})
.collect::<Vec<_>>()
})
.collect();
timer.stop_and_record();
let timer = IMAGE_QUERY_DURATION.start_timer();
let matches = sqlx::query!(