Merge branch 'main'

This commit is contained in:
Syfaro 2021-08-10 21:33:38 -04:00
commit 56e4e96d7a
25 changed files with 1315 additions and 829 deletions

View File

@ -4,59 +4,32 @@ stages:
- image - image
variables: variables:
CARGO_HOME: /root/.cargo CARGO_HOME: "$CI_PROJECT_DIR/.cargo"
SCCACHE_VERSION: v0.2.15
SCCACHE_S3_USE_SSL: "true"
SCCACHE_BUCKET: "sccache"
SQLX_OFFLINE: "true" SQLX_OFFLINE: "true"
# Cache should only be updated once, default to pull only # Cache should only be updated once, default to pull only
cache: &global_cache cache: &global_cache
key:
files:
- Cargo.lock
paths: paths:
- .cargo/ - .cargo/
policy: pull policy: pull
# Run tests on current stable Rust version # Run tests on current stable Rust version
test:latest: &base_test test:latest: &base_test
image: rust:1.51-slim-buster image: rust:1.53-slim-buster
stage: test stage: test
cache: cache:
<<: *global_cache <<: *global_cache
policy: pull-push policy: pull-push
before_script: before_script:
# Use proxy for apt, install wget to download sccache and other deps
- export http_proxy=$DEBIAN_PROXY
- apt-get update -y - apt-get update -y
- apt-get install -y wget libssl-dev pkg-config libavcodec-dev libavformat-dev libavutil-dev libavdevice-dev clang llvm python3 python3-pip - apt-get install -y libssl-dev pkg-config libavcodec-dev libavformat-dev libavutil-dev libavdevice-dev clang llvm python3 python3-pip
- unset http_proxy
# Download and extract sccache, ensuring it's executable
- wget -q -O /tmp/sccache.tar.gz https://github.com/mozilla/sccache/releases/download/$SCCACHE_VERSION/sccache-$SCCACHE_VERSION-x86_64-unknown-linux-musl.tar.gz
- tar zxvf /tmp/sccache.tar.gz --strip-components=1 -C /tmp
- export RUSTC_WRAPPER=/tmp/sccache
- chmod a+x $RUSTC_WRAPPER
# Move sccache config into place
- mkdir -p $HOME/.config/sccache
- mv $SCCACHE_CONFIG $HOME/.config/sccache/config
# Prepare cargo data from cache
- rm -rf /root/.cargo || true
- mv .cargo /root/.cargo || true
# Prepare cargo-cache for cleaning data later
- cargo install cargo-cache --no-default-features --features ci-autoclean
script: script:
# Build, test, and show stats # Build, test, and show stats
- cargo build --verbose - cargo build --verbose
- cargo test --verbose - cargo test --verbose
- $RUSTC_WRAPPER --show-stats
# Clean cargo data, move back into place for caching
- $CARGO_HOME/bin/cargo-cache
- rm -rf .cargo || true
- mv /root/.cargo .cargo || true
# Same as above, but nightly Rust # Same as above, but nightly Rust
test:nightly: test:nightly:
@ -77,7 +50,6 @@ build:api: &base_build
- ./fuzzysearch/fuzzysearch - ./fuzzysearch/fuzzysearch
script: script:
- cargo build --verbose --release --bin fuzzysearch - cargo build --verbose --release --bin fuzzysearch
- $RUSTC_WRAPPER --show-stats
- mv ./target/release/fuzzysearch ./fuzzysearch/fuzzysearch - mv ./target/release/fuzzysearch ./fuzzysearch/fuzzysearch
build:webhook: build:webhook:
@ -88,9 +60,18 @@ build:webhook:
- ./fuzzysearch-webhook/fuzzysearch-webhook - ./fuzzysearch-webhook/fuzzysearch-webhook
script: script:
- cargo build --verbose --release --bin fuzzysearch-webhook - cargo build --verbose --release --bin fuzzysearch-webhook
- $RUSTC_WRAPPER --show-stats
- mv ./target/release/fuzzysearch-webhook ./fuzzysearch-webhook/fuzzysearch-webhook - mv ./target/release/fuzzysearch-webhook ./fuzzysearch-webhook/fuzzysearch-webhook
build:hash-input:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-hash-input/fuzzysearch-hash-input
script:
- cargo build --verbose --release --bin fuzzysearch-hash-input
- mv ./target/release/fuzzysearch-hash-input ./fuzzysearch-hash-input/fuzzysearch-hash-input
build:ingest-e621: build:ingest-e621:
<<: *base_build <<: *base_build
artifacts: artifacts:
@ -99,7 +80,6 @@ build:ingest-e621:
- ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621 - ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621
script: script:
- cargo build --verbose --release --bin fuzzysearch-ingest-e621 - cargo build --verbose --release --bin fuzzysearch-ingest-e621
- $RUSTC_WRAPPER --show-stats
- mv ./target/release/fuzzysearch-ingest-e621 ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621 - mv ./target/release/fuzzysearch-ingest-e621 ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621
build:ingest-furaffinity: build:ingest-furaffinity:
@ -110,7 +90,6 @@ build:ingest-furaffinity:
- ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity - ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity
script: script:
- cargo build --verbose --release --bin fuzzysearch-ingest-furaffinity - cargo build --verbose --release --bin fuzzysearch-ingest-furaffinity
- $RUSTC_WRAPPER --show-stats
- mv ./target/release/fuzzysearch-ingest-furaffinity ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity - mv ./target/release/fuzzysearch-ingest-furaffinity ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity
build:ingest-weasyl: build:ingest-weasyl:
@ -121,54 +100,47 @@ build:ingest-weasyl:
- ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl - ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl
script: script:
- cargo build --verbose --release --bin fuzzysearch-ingest-weasyl - cargo build --verbose --release --bin fuzzysearch-ingest-weasyl
- $RUSTC_WRAPPER --show-stats
- mv ./target/release/fuzzysearch-ingest-weasyl ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl - mv ./target/release/fuzzysearch-ingest-weasyl ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl
images:api: &base_images images:api: &base_images
stage: image stage: image
image: docker image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
cache: {} cache: {}
before_script: before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY - mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"auth\":\"$(echo -n ${CI_REGISTRY_USER}:${CI_REGISTRY_PASSWORD} | base64)\"}}}" > /kaniko/.docker/config.json
needs: ['build:api'] needs: ['build:api']
script: script:
- docker pull $CI_REGISTRY_IMAGE/api:latest || true - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch/Dockerfile --destination $CI_REGISTRY_IMAGE/api:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/api:latest --cache=true
- docker build --build-arg http_proxy=$DEBIAN_PROXY --cache-from $CI_REGISTRY_IMAGE/api:latest --tag $CI_REGISTRY_IMAGE/api:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE/api:latest -f fuzzysearch/Dockerfile .
- docker push $CI_REGISTRY_IMAGE/api:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE/api:latest
images:webhook: images:webhook:
<<: *base_images <<: *base_images
needs: ['build:webhook'] needs: ['build:webhook']
script: script:
- docker pull $CI_REGISTRY_IMAGE/webhook:latest || true - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-webhook/Dockerfile --destination $CI_REGISTRY_IMAGE/webhook:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/webhook:latest --cache=true
- docker build --build-arg http_proxy=$DEBIAN_PROXY --cache-from $CI_REGISTRY_IMAGE/webhook:latest --tag $CI_REGISTRY_IMAGE/webhook:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE/webhook:latest -f fuzzysearch-webhook/Dockerfile .
- docker push $CI_REGISTRY_IMAGE/webhook:$CI_COMMIT_SHA images:hash-input:
- docker push $CI_REGISTRY_IMAGE/webhook:latest <<: *base_images
needs: ['build:hash-input']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-hash-input/Dockerfile --destination $CI_REGISTRY_IMAGE/hash-input:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/hash-input:latest --cache=true
images:ingest-e621: images:ingest-e621:
<<: *base_images <<: *base_images
needs: ['build:ingest-e621'] needs: ['build:ingest-e621']
script: script:
- docker pull $CI_REGISTRY_IMAGE/ingest-e621:latest || true - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-e621/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-e621:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-e621:latest --cache=true
- docker build --build-arg http_proxy=$DEBIAN_PROXY --cache-from $CI_REGISTRY_IMAGE/ingest-e621:latest --tag $CI_REGISTRY_IMAGE/ingest-e621:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE/ingest-e621:latest -f fuzzysearch-ingest-e621/Dockerfile .
- docker push $CI_REGISTRY_IMAGE/ingest-e621:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE/ingest-e621:latest
images:ingest-furaffinity: images:ingest-furaffinity:
<<: *base_images <<: *base_images
needs: ['build:ingest-furaffinity'] needs: ['build:ingest-furaffinity']
script: script:
- docker pull $CI_REGISTRY_IMAGE/ingest-furaffinity:latest || true - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-furaffinity/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-furaffinity:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-furaffinity:latest --cache=true
- docker build --build-arg http_proxy=$DEBIAN_PROXY --cache-from $CI_REGISTRY_IMAGE/ingest-furaffinity:latest --tag $CI_REGISTRY_IMAGE/ingest-furaffinity:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE/ingest-furaffinity:latest -f fuzzysearch-ingest-furaffinity/Dockerfile .
- docker push $CI_REGISTRY_IMAGE/ingest-furaffinity:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE/ingest-furaffinity:latest
images:ingest-weasyl: images:ingest-weasyl:
<<: *base_images <<: *base_images
needs: ['build:ingest-weasyl'] needs: ['build:ingest-weasyl']
script: script:
- docker pull $CI_REGISTRY_IMAGE/ingest-weasyl:latest || true - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-weasyl/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-weasyl:latest --cache=true
- docker build --build-arg http_proxy=$DEBIAN_PROXY --cache-from $CI_REGISTRY_IMAGE/ingest-weasyl:latest --tag $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE/ingest-weasyl:latest -f fuzzysearch-ingest-weasyl/Dockerfile .
- docker push $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE/ingest-weasyl:latest

1085
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,18 +1,16 @@
[workspace] [workspace]
members = [ members = [
"fuzzysearch",
"fuzzysearch-common", "fuzzysearch-common",
"fuzzysearch",
"fuzzysearch-hash-input",
"fuzzysearch-webhook",
"fuzzysearch-ingest-e621", "fuzzysearch-ingest-e621",
"fuzzysearch-ingest-furaffinity", "fuzzysearch-ingest-furaffinity",
"fuzzysearch-ingest-weasyl", "fuzzysearch-ingest-weasyl",
"fuzzysearch-webhook"
] ]
[profile.dev] [profile.dev.package."*"]
opt-level = 2 opt-level = 2
debug = true debug = true
[profile.release]
lto = true
codegen-units = 1
opt-level = 3

View File

@ -5,19 +5,25 @@ authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018" edition = "2018"
[features] [features]
default = [] default = ["trace"]
video = ["ffmpeg-next", "tempfile"] video = ["ffmpeg-next", "tempfile"]
queue = ["faktory", "tokio", "serde_json"] queue = ["faktory", "tokio", "serde_json"]
trace = ["opentelemetry", "opentelemetry-jaeger", "tracing-opentelemetry", "opentelemetry-http", "hyper", "prometheus", "tokio", "reqwest"]
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-subscriber = "0.2"
tracing-log = "0.1"
tokio = { version = "1", features = ["rt"], optional = true }
futures = "0.3"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
base64 = "0.13" serde_json = { version = "1", optional = true }
base64 = "0.13"
image = "0.23" image = "0.23"
img_hash = "3" img_hash = "3"
@ -25,5 +31,12 @@ ffmpeg-next = { version = "4", optional = true }
tempfile = { version = "3", optional = true } tempfile = { version = "3", optional = true }
faktory = { version = "0.11", optional = true } faktory = { version = "0.11", optional = true }
tokio = { version = "1", features = ["rt"], optional = true }
serde_json = { version = "1", optional = true } opentelemetry = { version = "0.15.0", features = ["rt-tokio"], optional = true }
opentelemetry-jaeger = { version = "0.14", features = ["tokio"], optional = true }
tracing-opentelemetry = { version = "0.14", optional = true }
opentelemetry-http = { version = "0.4", optional = true }
hyper = { version = "0.14", features = ["server", "http2", "tcp"], optional = true }
prometheus = { version = "0.12", optional = true }
reqwest = { version = "0.11", optional = true }

View File

@ -4,6 +4,9 @@ pub mod types;
#[cfg(feature = "video")] #[cfg(feature = "video")]
pub mod video; pub mod video;
#[cfg(feature = "trace")]
pub mod trace;
/// Create an instance of img_hash with project defaults. /// Create an instance of img_hash with project defaults.
pub fn get_hasher() -> img_hash::Hasher<[u8; 8]> { pub fn get_hasher() -> img_hash::Hasher<[u8; 8]> {
use img_hash::{HashAlg::Gradient, HasherConfig}; use img_hash::{HashAlg::Gradient, HasherConfig};

View File

@ -0,0 +1,118 @@
pub fn configure_tracing(service_name: &'static str) {
use opentelemetry::KeyValue;
use tracing_subscriber::layer::SubscriberExt;
tracing_log::LogTracer::init().unwrap();
let env = std::env::var("ENVIRONMENT");
let env = if let Ok(env) = env.as_ref() {
env.as_str()
} else if cfg!(debug_assertions) {
"debug"
} else {
"release"
};
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
.with_service_name(service_name)
.with_tags(vec![
KeyValue::new("environment", env.to_owned()),
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
])
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let trace = tracing_opentelemetry::layer().with_tracer(tracer);
let env_filter = tracing_subscriber::EnvFilter::from_default_env();
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) {
let subscriber = tracing_subscriber::fmt::layer()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.with_target(true);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
} else {
let subscriber = tracing_subscriber::fmt::layer();
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
}
tracing::debug!(service_name, "set application tracing service name");
}
async fn metrics(
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response, StatusCode};
match req.uri().path() {
"/health" => Ok(Response::new(Body::from("OK"))),
"/metrics" => {
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(Response::new(Body::from(buffer)))
}
_ => {
let mut not_found = Response::new(Body::default());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}
pub async fn serve_metrics() {
use hyper::{
server::Server,
service::{make_service_fn, service_fn},
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect("Missing METRICS_HOST")
.parse()
.expect("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
server.await.expect("Metrics server error");
});
}
pub trait InjectContext {
fn inject_context(self) -> Self;
}
impl InjectContext for reqwest::RequestBuilder {
fn inject_context(self: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let mut headers: reqwest::header::HeaderMap = Default::default();
let cx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut opentelemetry_http::HeaderInjector(&mut headers))
});
self.headers(headers)
}
}

View File

@ -0,0 +1,25 @@
[package]
name = "fuzzysearch-hash-input"
version = "0.1.0"
authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018"
[dependencies]
tracing = "0.1"
anyhow = "1"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tempfile = "3"
image = "0.23"
actix-web = "4.0.0-beta.5"
actix-http = "3.0.0-beta.5"
actix-multipart = "0.4.0-beta.4"
tracing-actix-web = { version = "0.4.0-beta.4", features = ["opentelemetry_0_15"] }
lazy_static = "1"
prometheus = { version = "0.12", features = ["process"] }
fuzzysearch-common = { path = "../fuzzysearch-common" }

View File

@ -0,0 +1,4 @@
FROM debian:buster-slim
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY ./fuzzysearch-hash-input/fuzzysearch-hash-input /bin/fuzzysearch-hash-input
CMD ["/bin/fuzzysearch-hash-input"]

View File

@ -0,0 +1,140 @@
use std::{
convert::TryInto,
io::{BufReader, SeekFrom},
};
use actix_web::{post, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder};
use tempfile::tempfile;
use tokio::{
io::{AsyncSeekExt, AsyncWriteExt},
sync::Semaphore,
};
use tokio_stream::StreamExt;
lazy_static::lazy_static! {
static ref IMAGE_LOADING_DURATION: prometheus::Histogram =
prometheus::register_histogram!("fuzzysearch_image_image_loading_seconds", "Duration to download and save image").unwrap();
static ref IMAGE_DECODING_DURATION: prometheus::Histogram =
prometheus::register_histogram!("fuzzysearch_image_image_decoding_seconds", "Duration to decode image data").unwrap();
static ref IMAGE_HASHING_DURATION: prometheus::Histogram =
prometheus::register_histogram!("fuzzysearch_image_image_hashing_seconds", "Duration to hash image").unwrap();
}
enum ImageResponse {
Hash(i64),
Error(anyhow::Error),
}
impl Responder for ImageResponse {
fn respond_to(self, _req: &HttpRequest) -> HttpResponse {
match self {
ImageResponse::Hash(hash) => HttpResponse::Ok()
.content_type("text/plain")
.body(hash.to_string()),
ImageResponse::Error(error) => HttpResponse::BadRequest()
.content_type("text/plain")
.body(error.to_string()),
}
}
}
#[tracing::instrument(err, skip(field, semaphore))]
async fn process_image(
mut field: actix_multipart::Field,
semaphore: Data<Semaphore>,
) -> anyhow::Result<i64> {
tracing::debug!("creating temp file");
let loading_duration = IMAGE_LOADING_DURATION.start_timer();
let mut file =
tokio::task::spawn_blocking(move || -> anyhow::Result<tokio::fs::File, anyhow::Error> {
let file = tempfile()?;
Ok(tokio::fs::File::from_std(file))
})
.await??;
tracing::debug!("writing contents to temp file");
let mut size = 0;
while let Ok(Some(chunk)) = field.try_next().await {
file.write_all(&chunk).await?;
size += chunk.len();
}
tracing::debug!("file was {} bytes", size);
tracing::debug!("returning file to beginning");
file.seek(SeekFrom::Start(0)).await?;
let file = file.into_std().await;
loading_duration.stop_and_record();
tracing::debug!("getting semaphore permit");
let _permit = semaphore.acquire().await?;
tracing::debug!("decoding and hashing image");
let hash = tokio::task::spawn_blocking(move || -> anyhow::Result<i64, anyhow::Error> {
let decoding_duration = IMAGE_DECODING_DURATION.start_timer();
let reader = BufReader::new(file);
let reader = image::io::Reader::new(reader).with_guessed_format()?;
let im = reader.decode()?;
decoding_duration.stop_and_record();
let hashing_duration = IMAGE_HASHING_DURATION.start_timer();
let image_hash = fuzzysearch_common::get_hasher().hash_image(&im);
let hash: [u8; 8] = image_hash.as_bytes().try_into()?;
let hash = i64::from_be_bytes(hash);
hashing_duration.stop_and_record();
Ok(hash)
})
.await??;
tracing::debug!("calculated image hash: {}", hash);
Ok(hash)
}
#[post("/image")]
async fn post_image(
mut form: actix_multipart::Multipart,
semaphore: Data<Semaphore>,
) -> impl Responder {
while let Ok(Some(field)) = form.try_next().await {
tracing::debug!("got multipart field: {:?}", field);
let content_type = if let Some(content_disposition) = field.content_disposition() {
content_disposition
} else {
continue;
};
if !matches!(content_type.get_name(), Some("image")) {
continue;
}
match process_image(field, semaphore).await {
Ok(hash) => return ImageResponse::Hash(hash),
Err(err) => return ImageResponse::Error(err),
}
}
ImageResponse::Error(anyhow::anyhow!("missing image field"))
}
#[actix_web::main]
async fn main() {
fuzzysearch_common::trace::configure_tracing("fuzzysearch-image");
fuzzysearch_common::trace::serve_metrics().await;
let semaphore = Data::new(Semaphore::new(4));
HttpServer::new(move || {
App::new()
.wrap(tracing_actix_web::TracingLogger::default())
.app_data(semaphore.clone())
.service(post_image)
})
.workers(2)
.bind("0.0.0.0:8090")
.unwrap()
.run()
.await
.unwrap();
}

View File

@ -16,7 +16,6 @@ serde_json = "1"
sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] } sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] }
image = "0.23" image = "0.23"
hamming = "0.1.3"
img_hash = "3" img_hash = "3"
sha2 = "0.9" sha2 = "0.9"

View File

@ -1,14 +1,6 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
COPY . .
RUN cargo install --root / --path ./fuzzysearch-ingest-e621
FROM debian:buster-slim FROM debian:buster-slim
EXPOSE 8080 EXPOSE 8080
ENV METRICS_HOST=0.0.0.0:8080 ENV METRICS_HOST=0.0.0.0:8080
WORKDIR /app
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-ingest-e621 /bin/fuzzysearch-ingest-e621 COPY ./fuzzysearch-ingest-e621/fuzzysearch-ingest-e621 /bin/fuzzysearch-ingest-e621
CMD ["/bin/fuzzysearch-ingest-e621"] CMD ["/bin/fuzzysearch-ingest-e621"]

View File

@ -30,9 +30,8 @@ type Auth = (String, Option<String>);
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
fuzzysearch_common::init_logger(); fuzzysearch_common::trace::configure_tracing("fuzzysearch-ingest-e621");
fuzzysearch_common::trace::serve_metrics().await;
create_metrics_server().await;
let login = std::env::var("E621_LOGIN").expect_or_log("Missing E621_LOGIN"); let login = std::env::var("E621_LOGIN").expect_or_log("Missing E621_LOGIN");
let api_key = std::env::var("E621_API_KEY").expect_or_log("Missing E621_API_KEY"); let api_key = std::env::var("E621_API_KEY").expect_or_log("Missing E621_API_KEY");
@ -345,41 +344,3 @@ async fn load_image(client: &reqwest::Client, url: &str) -> anyhow::Result<Image
Ok((Some(hash), None, Some(result))) Ok((Some(hash), None, Some(result)))
} }
async fn provide_metrics(
_: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response};
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder
.encode(&metric_families, &mut buffer)
.unwrap_or_log();
Ok(Response::new(Body::from(buffer)))
}
async fn create_metrics_server() {
use hyper::{
service::{make_service_fn, service_fn},
Server,
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(provide_metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect_or_log("Missing METRICS_HOST")
.parse()
.expect_or_log("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move { server.await.expect_or_log("Metrics server error") });
}

View File

@ -6,11 +6,8 @@ edition = "2018"
[dependencies] [dependencies]
reqwest = "0.11" reqwest = "0.11"
postgres = { version = "0.19", features = ["with-chrono-0_4"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "0.7.0" } tokio-postgres = { version = "0.7.0", features = ["with-chrono-0_4"] }
r2d2_postgres = " 0.18.0"
r2d2 = "0.8"
chrono = "0.4" chrono = "0.4"
hyper = { version = "0.14", features = ["server"] } hyper = { version = "0.14", features = ["server"] }
prometheus = { version = "0.12", features = ["process"] } prometheus = { version = "0.12", features = ["process"] }
@ -26,4 +23,5 @@ fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] }
[dependencies.furaffinity-rs] [dependencies.furaffinity-rs]
git = "https://github.com/Syfaro/furaffinity-rs" git = "https://github.com/Syfaro/furaffinity-rs"
branch = "main"
features = ["cloudflare-bypass"] features = ["cloudflare-bypass"]

View File

@ -1,14 +1,7 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config python3 python3-pip
COPY . .
RUN cargo install --root / --path ./fuzzysearch-ingest-furaffinity
FROM debian:buster-slim FROM debian:buster-slim
RUN apt-get update -y && \ RUN apt-get update -y && \
apt-get install -y openssl ca-certificates python3 python3-pip && \ apt-get install -y openssl ca-certificates python3 python3-pip && \
python3 -m pip --no-cache-dir install cfscrape && \ python3 -m pip --no-cache-dir install cfscrape && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-ingest-furaffinity /bin/fuzzysearch-ingest-furaffinity COPY ./fuzzysearch-ingest-furaffinity/fuzzysearch-ingest-furaffinity /bin/fuzzysearch-ingest-furaffinity
CMD ["/bin/fuzzysearch-ingest-furaffinity"] CMD ["/bin/fuzzysearch-ingest-furaffinity"]

View File

@ -115,51 +115,6 @@ async fn insert_null_submission(client: &Client, id: i32) -> Result<u64, tokio_p
.await .await
} }
async fn request(
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/health") => Ok(hyper::Response::new(hyper::Body::from("OK"))),
(&hyper::Method::GET, "/metrics") => {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder
.encode(&metric_families, &mut buffer)
.unwrap_or_log();
Ok(hyper::Response::new(hyper::Body::from(buffer)))
}
_ => {
let mut not_found = hyper::Response::default();
*not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}
async fn web() {
use hyper::service::{make_service_fn, service_fn};
let addr: std::net::SocketAddr = std::env::var("HTTP_HOST")
.expect_or_log("Missing HTTP_HOST")
.parse()
.unwrap_or_log();
let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(request)) });
let server = hyper::Server::bind(&addr).serve(service);
tracing::info!("Listening on http://{}", addr);
server.await.unwrap_or_log();
}
struct RetryHandler { struct RetryHandler {
max_attempts: usize, max_attempts: usize,
} }
@ -270,7 +225,8 @@ async fn process_submission(
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
fuzzysearch_common::init_logger(); fuzzysearch_common::trace::configure_tracing("fuzzysearch-ingest-furaffinity");
fuzzysearch_common::trace::serve_metrics().await;
let (cookie_a, cookie_b) = ( let (cookie_a, cookie_b) = (
std::env::var("FA_A").expect_or_log("Missing FA_A"), std::env::var("FA_A").expect_or_log("Missing FA_A"),
@ -297,8 +253,6 @@ async fn main() {
} }
}); });
tokio::spawn(async move { web().await });
let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL"); let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn) let faktory = FaktoryClient::connect(faktory_dsn)
.await .await

View File

@ -1,11 +1,4 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
COPY . .
RUN cargo install --root / --path ./fuzzysearch-ingest-weasyl
FROM debian:buster-slim FROM debian:buster-slim
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-ingest-weasyl /bin/fuzzysearch-ingest-weasyl COPY ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl /bin/fuzzysearch-ingest-weasyl
CMD ["/bin/fuzzysearch-ingest-weasyl"] CMD ["/bin/fuzzysearch-ingest-weasyl"]

View File

@ -198,7 +198,8 @@ async fn insert_null(
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
fuzzysearch_common::init_logger(); fuzzysearch_common::trace::configure_tracing("fuzzysearch-ingest-weasyl");
fuzzysearch_common::trace::serve_metrics().await;
let api_key = std::env::var("WEASYL_APIKEY").unwrap_or_log(); let api_key = std::env::var("WEASYL_APIKEY").unwrap_or_log();

View File

@ -1,12 +1,4 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
COPY . .
RUN cargo install --root / --path ./fuzzysearch-webhook
FROM debian:buster-slim FROM debian:buster-slim
WORKDIR /app
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch-webhook /bin/fuzzysearch-webhook COPY ./fuzzysearch-webhook/fuzzysearch-webhook /bin/fuzzysearch-webhook
CMD ["/bin/fuzzysearch-webhook"] CMD ["/bin/fuzzysearch-webhook"]

View File

@ -27,7 +27,7 @@ pub enum WebhookError {
} }
fn main() { fn main() {
fuzzysearch_common::init_logger(); fuzzysearch_common::trace::configure_tracing("fuzzysearch-webhook");
tracing::info!("Starting..."); tracing::info!("Starting...");

View File

@ -8,17 +8,18 @@ edition = "2018"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-subscriber = "0.2"
tracing-futures = "0.2" tracing-futures = "0.2"
tracing-log = "0.1"
prometheus = { version = "0.12", features = ["process"] } prometheus = { version = "0.12", features = ["process"] }
lazy_static = "1" lazy_static = "1"
opentelemetry = { version = "0.14.0", features = ["rt-tokio"] } opentelemetry = { version = "0.15.0", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.13", features = ["tokio"] } opentelemetry-jaeger = { version = "0.14", features = ["tokio"] }
tracing-opentelemetry = "0.12" tracing-opentelemetry = "0.14"
opentelemetry-http = "0.3" opentelemetry-http = "0.4"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
async-stream = "0.3" tokio-stream = "0.1"
futures = "0.3" futures = "0.3"
@ -29,17 +30,16 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
warp = "0.3" warp = "0.3"
reqwest = "0.11" reqwest = { version = "0.11", features = ["multipart"] }
hyper = "0.14" hyper = "0.14"
sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] } sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] }
infer = { version = "0.4", default-features = false }
ffmpeg-next = "4" ffmpeg-next = "4"
image = "0.23" image = "0.23"
img_hash = "3" img_hash = "3"
hamming = "0.1" hamming = "0.1"
bk-tree = "0.3" bkapi-client = { git = "https://github.com/Syfaro/bkapi.git" }
fuzzysearch-common = { path = "../fuzzysearch-common", features = ["video"] } fuzzysearch-common = { path = "../fuzzysearch-common" }

View File

@ -1,14 +1,6 @@
FROM rust:1-slim-buster AS builder
WORKDIR /src
ENV SQLX_OFFLINE=true
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config libavcodec-dev libavformat-dev libavutil-dev libavdevice-dev clang llvm
COPY . .
RUN cargo install --root / --path ./fuzzysearch
FROM debian:buster-slim FROM debian:buster-slim
EXPOSE 8080 8081 EXPOSE 8080 8081
ENV METRICS_HOST=0.0.0.0:8081 ENV METRICS_HOST=0.0.0.0:8081
WORKDIR /app
RUN apt-get update -y && apt-get install -y --no-install-recommends openssl ca-certificates ffmpeg && rm -rf /var/lib/apt/lists/* RUN apt-get update -y && apt-get install -y --no-install-recommends openssl ca-certificates ffmpeg && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch /bin/fuzzysearch COPY ./fuzzysearch/fuzzysearch /bin/fuzzysearch
CMD ["/bin/fuzzysearch"] CMD ["/bin/fuzzysearch"]

View File

@ -1,19 +1,19 @@
use crate::types::*; use crate::{handlers, Pool};
use crate::{handlers, Pool, Tree}; use crate::{types::*, Endpoints};
use std::convert::Infallible; use std::convert::Infallible;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use warp::{Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
pub fn search( pub fn search(
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
endpoints: Endpoints,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
search_image(db.clone(), tree.clone()) search_image(db.clone(), bkapi.clone(), endpoints)
.or(search_hashes(db.clone(), tree.clone())) .or(search_hashes(db.clone(), bkapi.clone()))
.or(search_file(db.clone())) .or(search_file(db.clone()))
.or(search_video(db.clone()))
.or(check_handle(db.clone())) .or(check_handle(db.clone()))
.or(search_image_by_url(db, tree)) .or(search_image_by_url(db, bkapi))
} }
pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
@ -34,7 +34,8 @@ pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Reject
pub fn search_image( pub fn search_image(
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
endpoints: Endpoints,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("image") warp::path("image")
.and(warp::header::headers_cloned()) .and(warp::header::headers_cloned())
@ -42,65 +43,51 @@ pub fn search_image(
.and(warp::multipart::form().max_length(1024 * 1024 * 10)) .and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(warp::query::<ImageSearchOpts>()) .and(warp::query::<ImageSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_bkapi(bkapi))
.and(with_api_key()) .and(with_api_key())
.and_then(|headers, form, opts, pool, tree, api_key| { .and(with_endpoints(endpoints))
.and_then(|headers, form, opts, pool, bkapi, api_key, endpoints| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_image", ?opts); let span = tracing::info_span!("search_image", ?opts);
span.set_parent(with_telem(headers)); span.set_parent(with_telem(headers));
span.in_scope(|| { span.in_scope(|| {
handlers::search_image(form, opts, pool, tree, api_key).in_current_span() handlers::search_image(form, opts, pool, bkapi, api_key, endpoints)
.in_current_span()
}) })
}) })
} }
pub fn search_image_by_url( pub fn search_image_by_url(
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("url") warp::path("url")
.and(warp::get()) .and(warp::get())
.and(warp::query::<UrlSearchOpts>()) .and(warp::query::<UrlSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_bkapi(bkapi))
.and(with_api_key()) .and(with_api_key())
.and_then(handlers::search_image_by_url) .and_then(handlers::search_image_by_url)
} }
pub fn search_hashes( pub fn search_hashes(
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("hashes") warp::path("hashes")
.and(warp::header::headers_cloned()) .and(warp::header::headers_cloned())
.and(warp::get()) .and(warp::get())
.and(warp::query::<HashSearchOpts>()) .and(warp::query::<HashSearchOpts>())
.and(with_pool(db)) .and(with_pool(db))
.and(with_tree(tree)) .and(with_bkapi(bkapi))
.and(with_api_key()) .and(with_api_key())
.and_then(|headers, opts, db, tree, api_key| { .and_then(|headers, opts, db, bkapi, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_hashes", ?opts); let span = tracing::info_span!("search_hashes", ?opts);
span.set_parent(with_telem(headers)); span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_hashes(opts, db, tree, api_key).in_current_span()) span.in_scope(|| handlers::search_hashes(opts, db, bkapi, api_key).in_current_span())
})
}
pub fn search_video(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("video")
.and(warp::header::headers_cloned())
.and(warp::post())
.and(warp::multipart::form().max_length(1024 * 1024 * 10))
.and(with_pool(db))
.and(with_api_key())
.and_then(|headers, form, db, api_key| {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!("search_video");
span.set_parent(with_telem(headers));
span.in_scope(|| handlers::search_video(form, db, api_key).in_current_span())
}) })
} }
@ -120,8 +107,16 @@ fn with_pool(db: Pool) -> impl Filter<Extract = (Pool,), Error = Infallible> + C
warp::any().map(move || db.clone()) warp::any().map(move || db.clone())
} }
fn with_tree(tree: Tree) -> impl Filter<Extract = (Tree,), Error = Infallible> + Clone { fn with_bkapi(
warp::any().map(move || tree.clone()) bkapi: bkapi_client::BKApiClient,
) -> impl Filter<Extract = (bkapi_client::BKApiClient,), Error = Infallible> + Clone {
warp::any().map(move || bkapi.clone())
}
fn with_endpoints(
endpoints: Endpoints,
) -> impl Filter<Extract = (Endpoints,), Error = Infallible> + Clone {
warp::any().map(move || endpoints.clone())
} }
fn with_telem(headers: warp::http::HeaderMap) -> opentelemetry::Context { fn with_telem(headers: warp::http::HeaderMap) -> opentelemetry::Context {

View File

@ -1,3 +1,6 @@
use futures::StreamExt;
use futures::TryStreamExt;
use hyper::StatusCode;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter}; use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
use std::convert::TryInto; use std::convert::TryInto;
@ -7,8 +10,12 @@ use warp::{Rejection, Reply};
use crate::models::image_query; use crate::models::image_query;
use crate::types::*; use crate::types::*;
use crate::{early_return, rate_limit, Pool, Tree}; use crate::Endpoints;
use fuzzysearch_common::types::{SearchResult, SiteInfo}; use crate::{early_return, rate_limit, Pool};
use fuzzysearch_common::{
trace::InjectContext,
types::{SearchResult, SiteInfo},
};
lazy_static! { lazy_static! {
static ref IMAGE_HASH_DURATION: Histogram = register_histogram!( static ref IMAGE_HASH_DURATION: Histogram = register_histogram!(
@ -37,6 +44,7 @@ lazy_static! {
enum Error { enum Error {
Postgres(sqlx::Error), Postgres(sqlx::Error),
Reqwest(reqwest::Error), Reqwest(reqwest::Error),
Warp(warp::Error),
InvalidData, InvalidData,
InvalidImage, InvalidImage,
ApiKey, ApiKey,
@ -46,7 +54,7 @@ enum Error {
impl warp::Reply for Error { impl warp::Reply for Error {
fn into_response(self) -> warp::reply::Response { fn into_response(self) -> warp::reply::Response {
let msg = match self { let msg = match self {
Error::Postgres(_) | Error::Reqwest(_) => ErrorMessage { Error::Postgres(_) | Error::Reqwest(_) | Error::Warp(_) => ErrorMessage {
code: 500, code: 500,
message: "Internal server error".to_string(), message: "Internal server error".to_string(),
}, },
@ -89,98 +97,89 @@ impl From<reqwest::Error> for Error {
} }
} }
async fn get_field_bytes(form: warp::multipart::FormData, field: &str) -> bytes::BytesMut { impl From<warp::Error> for Error {
fn from(err: warp::Error) -> Self {
Self::Warp(err)
}
}
#[tracing::instrument(skip(endpoints, form))]
async fn hash_input(
endpoints: &Endpoints,
mut form: warp::multipart::FormData,
) -> Result<i64, Error> {
let mut image_part = None;
tracing::debug!("looking at image parts");
while let Ok(Some(part)) = form.try_next().await {
if part.name() == "image" {
image_part = Some(part);
}
}
let image_part = image_part.ok_or(Error::InvalidImage)?;
tracing::debug!("found image part, reading data");
let bytes = image_part
.stream()
.fold(bytes::BytesMut::new(), |mut buf, chunk| {
use bytes::BufMut; use bytes::BufMut;
use futures::StreamExt;
let parts: Vec<_> = form.collect().await; buf.put(chunk.unwrap());
let mut parts = parts async move { buf }
.into_iter()
.map(|part| {
let part = part.unwrap();
(part.name().to_string(), part)
}) })
.collect::<std::collections::HashMap<_, _>>(); .await;
let data = parts.remove(field).unwrap(); let part = reqwest::multipart::Part::bytes(bytes.to_vec());
data.stream() let form = reqwest::multipart::Form::new().part("image", part);
.fold(bytes::BytesMut::new(), |mut b, data| {
b.put(data.unwrap()); tracing::debug!("sending image to hash input service");
async move { b } let client = reqwest::Client::new();
}) let resp = client
.await .post(&endpoints.hash_input)
.inject_context()
.multipart(form)
.send()
.await?;
tracing::debug!("got response");
if resp.status() != StatusCode::OK {
return Err(Error::InvalidImage);
} }
#[tracing::instrument(skip(form))] let hash: i64 = resp
async fn hash_input(form: warp::multipart::FormData) -> i64 { .text()
let bytes = get_field_bytes(form, "image").await; .await?
.parse()
.map_err(|_err| Error::InvalidImage)?;
let len = bytes.len(); Ok(hash)
let _timer = IMAGE_HASH_DURATION.start_timer();
let hash = tokio::task::spawn_blocking(move || {
let hasher = fuzzysearch_common::get_hasher();
let image = image::load_from_memory(&bytes).unwrap();
hasher.hash_image(&image)
})
.instrument(span!(tracing::Level::TRACE, "hashing image", len))
.await
.unwrap();
drop(_timer);
let mut buf: [u8; 8] = [0; 8];
buf.copy_from_slice(&hash.as_bytes());
i64::from_be_bytes(buf)
}
#[tracing::instrument(skip(form))]
async fn hash_video(form: warp::multipart::FormData) -> Option<Vec<[u8; 8]>> {
use bytes::Buf;
let bytes = get_field_bytes(form, "video").await;
let _timer = VIDEO_HASH_DURATION.start_timer();
let hashes = tokio::task::spawn_blocking(move || {
if infer::is_video(&bytes) {
fuzzysearch_common::video::extract_video_hashes(bytes.reader()).ok()
} else if infer::image::is_gif(&bytes) {
fuzzysearch_common::video::extract_gif_hashes(bytes.reader()).ok()
} else {
None
}
})
.instrument(span!(tracing::Level::TRACE, "hashing video"))
.await
.unwrap();
drop(_timer);
hashes
} }
pub async fn search_image( pub async fn search_image(
form: warp::multipart::FormData, form: warp::multipart::FormData,
opts: ImageSearchOpts, opts: ImageSearchOpts,
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
api_key: String, api_key: String,
endpoints: Endpoints,
) -> Result<Box<dyn Reply>, Rejection> { ) -> Result<Box<dyn Reply>, Rejection> {
let image_remaining = rate_limit!(&api_key, &db, image_limit, "image"); let image_remaining = rate_limit!(&api_key, &db, image_limit, "image");
let hash_remaining = rate_limit!(&api_key, &db, hash_limit, "hash"); let hash_remaining = rate_limit!(&api_key, &db, hash_limit, "hash");
let num = hash_input(form).await; let num = early_return!(hash_input(&endpoints, form).await);
let mut items = { let mut items = {
if opts.search_type == Some(ImageSearchType::Force) { if opts.search_type == Some(ImageSearchType::Force) {
image_query(db.clone(), tree.clone(), vec![num], 10) image_query(db.clone(), bkapi.clone(), vec![num], 10)
.await .await
.unwrap() .unwrap()
} else { } else {
let results = image_query(db.clone(), tree.clone(), vec![num], 0) let results = image_query(db.clone(), bkapi.clone(), vec![num], 0)
.await .await
.unwrap(); .unwrap();
if results.is_empty() && opts.search_type != Some(ImageSearchType::Exact) { if results.is_empty() && opts.search_type != Some(ImageSearchType::Exact) {
image_query(db.clone(), tree.clone(), vec![num], 10) image_query(db.clone(), bkapi.clone(), vec![num], 10)
.await .await
.unwrap() .unwrap()
} else { } else {
@ -220,7 +219,7 @@ pub async fn search_image(
pub async fn search_hashes( pub async fn search_hashes(
opts: HashSearchOpts, opts: HashSearchOpts,
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
api_key: String, api_key: String,
) -> Result<Box<dyn Reply>, Rejection> { ) -> Result<Box<dyn Reply>, Rejection> {
let pool = db.clone(); let pool = db.clone();
@ -239,7 +238,7 @@ pub async fn search_hashes(
let image_remaining = rate_limit!(&api_key, &db, image_limit, "image", hashes.len() as i16); let image_remaining = rate_limit!(&api_key, &db, image_limit, "image", hashes.len() as i16);
let results = let results =
early_return!(image_query(pool, tree, hashes.clone(), opts.distance.unwrap_or(10),).await); early_return!(image_query(pool, bkapi, hashes.clone(), opts.distance.unwrap_or(10)).await);
let resp = warp::http::Response::builder() let resp = warp::http::Response::builder()
.header("x-rate-limit-total-image", image_remaining.1.to_string()) .header("x-rate-limit-total-image", image_remaining.1.to_string())
@ -385,16 +384,6 @@ pub async fn search_file(
Ok(Box::new(resp)) Ok(Box::new(resp))
} }
pub async fn search_video(
form: warp::multipart::FormData,
_db: Pool,
_api_key: String,
) -> Result<impl Reply, Rejection> {
let hashes = hash_video(form).await;
Ok(warp::reply::json(&hashes))
}
pub async fn check_handle(opts: HandleOpts, db: Pool) -> Result<Box<dyn Reply>, Rejection> { pub async fn check_handle(opts: HandleOpts, db: Pool) -> Result<Box<dyn Reply>, Rejection> {
let exists = if let Some(handle) = opts.twitter { let exists = if let Some(handle) = opts.twitter {
let result = sqlx::query_scalar!("SELECT exists(SELECT 1 FROM twitter_user WHERE lower(data->>'screen_name') = lower($1))", handle) let result = sqlx::query_scalar!("SELECT exists(SELECT 1 FROM twitter_user WHERE lower(data->>'screen_name') = lower($1))", handle)
@ -413,7 +402,7 @@ pub async fn check_handle(opts: HandleOpts, db: Pool) -> Result<Box<dyn Reply>,
pub async fn search_image_by_url( pub async fn search_image_by_url(
opts: UrlSearchOpts, opts: UrlSearchOpts,
db: Pool, db: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
api_key: String, api_key: String,
) -> Result<Box<dyn Reply>, Rejection> { ) -> Result<Box<dyn Reply>, Rejection> {
use bytes::BufMut; use bytes::BufMut;
@ -470,7 +459,7 @@ pub async fn search_image_by_url(
let hash: [u8; 8] = hash.as_bytes().try_into().unwrap(); let hash: [u8; 8] = hash.as_bytes().try_into().unwrap();
let num = i64::from_be_bytes(hash); let num = i64::from_be_bytes(hash);
let results = image_query(db.clone(), tree.clone(), vec![num], 3) let results = image_query(db.clone(), bkapi.clone(), vec![num], 3)
.await .await
.unwrap(); .unwrap();

View File

@ -1,7 +1,5 @@
#![recursion_limit = "256"] #![recursion_limit = "256"]
use std::sync::Arc;
use tokio::sync::RwLock;
use warp::Filter; use warp::Filter;
mod filters; mod filters;
@ -10,37 +8,18 @@ mod models;
mod types; mod types;
mod utils; mod utils;
type Tree = Arc<RwLock<bk_tree::BKTree<Node, Hamming>>>;
type Pool = sqlx::PgPool; type Pool = sqlx::PgPool;
#[derive(Debug)] #[derive(Clone)]
pub struct Node(pub [u8; 8]); pub struct Endpoints {
pub hash_input: String,
impl Node { pub bkapi: String,
pub fn new(hash: i64) -> Self {
Self(hash.to_be_bytes())
}
pub fn query(hash: [u8; 8]) -> Self {
Self(hash)
}
pub fn num(&self) -> i64 {
i64::from_be_bytes(self.0)
}
}
pub struct Hamming;
impl bk_tree::Metric<Node> for Hamming {
fn distance(&self, a: &Node, b: &Node) -> u64 {
hamming::distance_fast(&a.0, &b.0).unwrap()
}
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
configure_tracing(); fuzzysearch_common::trace::configure_tracing("fuzzysearch");
fuzzysearch_common::trace::serve_metrics().await;
ffmpeg_next::init().expect("Unable to initialize ffmpeg"); ffmpeg_next::init().expect("Unable to initialize ffmpeg");
@ -50,11 +29,12 @@ async fn main() {
.await .await
.expect("Unable to create Postgres pool"); .expect("Unable to create Postgres pool");
serve_metrics().await; let endpoints = Endpoints {
hash_input: std::env::var("ENDPOINT_HASH_INPUT").expect("Missing ENDPOINT_HASH_INPUT"),
bkapi: std::env::var("ENDPOINT_BKAPI").expect("Missing ENDPOINT_BKAPI"),
};
let tree: Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(Hamming))); let bkapi = bkapi_client::BKApiClient::new(&endpoints.bkapi);
load_updates(db_pool.clone(), tree.clone()).await;
let log = warp::log("fuzzysearch"); let log = warp::log("fuzzysearch");
let cors = warp::cors() let cors = warp::cors()
@ -64,7 +44,7 @@ async fn main() {
let options = warp::options().map(|| ""); let options = warp::options().map(|| "");
let api = options.or(filters::search(db_pool, tree)); let api = options.or(filters::search(db_pool, bkapi, endpoints));
let routes = api let routes = api
.or(warp::path::end() .or(warp::path::end()
.map(|| warp::redirect(warp::http::Uri::from_static("https://fuzzysearch.net")))) .map(|| warp::redirect(warp::http::Uri::from_static("https://fuzzysearch.net"))))
@ -74,173 +54,3 @@ async fn main() {
warp::serve(routes).run(([0, 0, 0, 0], 8080)).await; warp::serve(routes).run(([0, 0, 0, 0], 8080)).await;
} }
fn configure_tracing() {
use opentelemetry::KeyValue;
use tracing_subscriber::layer::SubscriberExt;
let env = std::env::var("ENVIRONMENT");
let env = if let Ok(env) = env.as_ref() {
env.as_str()
} else if cfg!(debug_assertions) {
"debug"
} else {
"release"
};
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
.with_service_name("fuzzysearch")
.with_tags(vec![
KeyValue::new("environment", env.to_owned()),
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
])
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let trace = tracing_opentelemetry::layer().with_tracer(tracer);
let env_filter = tracing_subscriber::EnvFilter::from_default_env();
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) {
let subscriber = tracing_subscriber::fmt::layer()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.with_target(true);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
} else {
let subscriber = tracing_subscriber::fmt::layer();
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(trace)
.with(subscriber);
tracing::subscriber::set_global_default(subscriber).unwrap();
}
}
async fn metrics(
_: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response};
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(Response::new(Body::from(buffer)))
}
async fn serve_metrics() {
use hyper::{
service::{make_service_fn, service_fn},
Server,
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect("Missing METRICS_HOST")
.parse()
.expect("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
server.await.expect("Metrics server error");
});
}
#[derive(serde::Deserialize)]
struct HashRow {
hash: i64,
}
async fn create_tree(conn: &Pool) -> bk_tree::BKTree<Node, Hamming> {
use futures::TryStreamExt;
let mut tree = bk_tree::BKTree::new(Hamming);
let mut rows = sqlx::query!(
"SELECT hash_int hash FROM submission WHERE hash_int IS NOT NULL
UNION ALL
SELECT hash FROM e621 WHERE hash IS NOT NULL
UNION ALL
SELECT hash FROM tweet_media WHERE hash IS NOT NULL
UNION ALL
SELECT hash FROM weasyl WHERE hash IS NOT NULL"
)
.fetch(conn);
let mut count = 0;
while let Some(row) = rows.try_next().await.expect("Unable to get row") {
if let Some(hash) = row.hash {
let node = Node::new(hash);
if tree.find_exact(&node).is_none() {
tree.add(node);
}
count += 1;
if count % 250_000 == 0 {
tracing::debug!(count, "Made progress in loading tree rows");
}
}
}
tracing::info!(count, "Completed loading rows for tree");
tree
}
async fn load_updates(conn: Pool, tree: Tree) {
let mut listener = sqlx::postgres::PgListener::connect_with(&conn)
.await
.unwrap();
listener.listen("fuzzysearch_hash_added").await.unwrap();
let new_tree = create_tree(&conn).await;
let mut lock = tree.write().await;
*lock = new_tree;
drop(lock);
tokio::spawn(async move {
loop {
while let Some(notification) = listener
.try_recv()
.await
.expect("Unable to recv notification")
{
let payload: HashRow = serde_json::from_str(notification.payload()).unwrap();
tracing::debug!(hash = payload.hash, "Adding new hash to tree");
let lock = tree.read().await;
if lock.find_exact(&Node::new(payload.hash)).is_some() {
continue;
}
drop(lock);
let mut lock = tree.write().await;
lock.add(Node(payload.hash.to_be_bytes()));
drop(lock);
}
tracing::error!("Lost connection to Postgres, recreating tree");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
let new_tree = create_tree(&conn).await;
let mut lock = tree.write().await;
*lock = new_tree;
drop(lock);
tracing::info!("Replaced tree");
}
});
}

View File

@ -2,15 +2,10 @@ use lazy_static::lazy_static;
use prometheus::{register_histogram, Histogram}; use prometheus::{register_histogram, Histogram};
use crate::types::*; use crate::types::*;
use crate::{Pool, Tree}; use crate::Pool;
use fuzzysearch_common::types::{SearchResult, SiteInfo}; use fuzzysearch_common::types::{SearchResult, SiteInfo};
lazy_static! { lazy_static! {
static ref IMAGE_TREE_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_tree_seconds",
"Duration to search for hashes in tree"
)
.unwrap();
static ref IMAGE_QUERY_DURATION: Histogram = register_histogram!( static ref IMAGE_QUERY_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_query_seconds", "fuzzysearch_api_image_query_seconds",
"Duration to perform a single image lookup query" "Duration to perform a single image lookup query"
@ -51,28 +46,30 @@ struct HashSearch {
distance: u64, distance: u64,
} }
#[tracing::instrument(skip(pool, tree))] #[tracing::instrument(skip(pool, bkapi))]
pub async fn image_query( pub async fn image_query(
pool: Pool, pool: Pool,
tree: Tree, bkapi: bkapi_client::BKApiClient,
hashes: Vec<i64>, hashes: Vec<i64>,
distance: i64, distance: i64,
) -> Result<Vec<SearchResult>, sqlx::Error> { ) -> Result<Vec<SearchResult>, sqlx::Error> {
let timer = IMAGE_TREE_DURATION.start_timer(); let found_hashes: Vec<HashSearch> = bkapi
let lock = tree.read().await; .search_many(&hashes, distance as u64)
let found_hashes: Vec<HashSearch> = hashes .await
.unwrap()
.into_iter()
.flat_map(|results| {
results
.hashes
.iter() .iter()
.flat_map(|hash| { .map(|hash| HashSearch {
lock.find(&crate::Node::new(*hash), distance as u64) searched_hash: results.hash,
.map(|(dist, node)| HashSearch { found_hash: hash.hash,
searched_hash: *hash, distance: hash.distance,
found_hash: node.num(),
distance: dist,
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
}) })
.collect(); .collect();
timer.stop_and_record();
let timer = IMAGE_QUERY_DURATION.start_timer(); let timer = IMAGE_QUERY_DURATION.start_timer();
let matches = sqlx::query!( let matches = sqlx::query!(