mirror of
https://github.com/Syfaro/fuzzysearch.git
synced 2024-11-23 15:22:31 +00:00
Significant refactoring.
This commit is contained in:
parent
5e76d4ecf2
commit
cc8a88fe67
@ -1,12 +1,11 @@
|
|||||||
SELECT
|
SELECT
|
||||||
api_key.id,
|
api_key.id,
|
||||||
|
api_key.user_id,
|
||||||
api_key.name_limit,
|
api_key.name_limit,
|
||||||
api_key.image_limit,
|
api_key.image_limit,
|
||||||
api_key.hash_limit,
|
api_key.hash_limit,
|
||||||
api_key.name,
|
api_key.name
|
||||||
account.email owner_email
|
|
||||||
FROM
|
FROM
|
||||||
api_key
|
api_key
|
||||||
JOIN account ON account.id = api_key.user_id
|
|
||||||
WHERE
|
WHERE
|
||||||
api_key.key = $1
|
api_key.key = $1
|
||||||
|
47
fuzzysearch-api/src/api/auth.rs
Normal file
47
fuzzysearch-api/src/api/auth.rs
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
use poem::Request;
|
||||||
|
use poem_openapi::{auth::ApiKey, SecurityScheme};
|
||||||
|
|
||||||
|
use crate::Pool;
|
||||||
|
|
||||||
|
/// Simple authentication using a static API key. Must be manually requested.
|
||||||
|
#[derive(SecurityScheme)]
|
||||||
|
#[oai(
|
||||||
|
type = "api_key",
|
||||||
|
key_name = "X-Api-Key",
|
||||||
|
in = "header",
|
||||||
|
checker = "api_checker"
|
||||||
|
)]
|
||||||
|
pub(crate) struct ApiKeyAuthorization(pub(crate) UserApiKey);
|
||||||
|
|
||||||
|
pub(crate) struct UserApiKey {
|
||||||
|
pub(crate) id: i32,
|
||||||
|
pub(crate) name: Option<String>,
|
||||||
|
pub(crate) user_id: i32,
|
||||||
|
pub(crate) name_limit: i16,
|
||||||
|
pub(crate) image_limit: i16,
|
||||||
|
pub(crate) hash_limit: i16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(req, api_key))]
|
||||||
|
async fn api_checker(req: &Request, api_key: ApiKey) -> Option<UserApiKey> {
|
||||||
|
let pool: &Pool = req.data().unwrap();
|
||||||
|
|
||||||
|
let user_api_key = sqlx::query_file_as!(UserApiKey, "queries/lookup_api_key.sql", api_key.key)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
if let Some(user_api_key) = user_api_key.as_ref() {
|
||||||
|
tracing::debug!(
|
||||||
|
api_key_id = user_api_key.id,
|
||||||
|
app_name = user_api_key.name.as_deref().unwrap_or("unknown"),
|
||||||
|
owner_id = %user_api_key.user_id,
|
||||||
|
"found valid api key"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
tracing::warn!("request had invalid api key: {}", api_key.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
user_api_key
|
||||||
|
}
|
282
fuzzysearch-api/src/api/mod.rs
Normal file
282
fuzzysearch-api/src/api/mod.rs
Normal file
@ -0,0 +1,282 @@
|
|||||||
|
use bkapi_client::BKApiClient;
|
||||||
|
use bytes::BufMut;
|
||||||
|
use poem_openapi::{
|
||||||
|
param::{Path, Query},
|
||||||
|
payload::{Json, Response},
|
||||||
|
types::ToJSON,
|
||||||
|
ApiResponse, Object,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
hash_input, lookup_hashes, rate_limit, Endpoints, FurAffinityFile, HashLookupResult,
|
||||||
|
ImageSearchPayload, ImageSearchResult, ImageSearchType, KnownServiceName, Pool,
|
||||||
|
ResponseRateLimitHeaders,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod auth;
|
||||||
|
|
||||||
|
pub(crate) use auth::ApiKeyAuthorization;
|
||||||
|
|
||||||
|
#[derive(Object)]
|
||||||
|
pub(crate) struct RateLimitResponse {
|
||||||
|
bucket: String,
|
||||||
|
retry_after: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Object)]
|
||||||
|
pub(crate) struct BadRequestResponse {
|
||||||
|
message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(ApiResponse)]
|
||||||
|
pub(crate) enum RateLimitedResponse<T, E = BadRequestResponse>
|
||||||
|
where
|
||||||
|
T: ToJSON,
|
||||||
|
E: ToJSON,
|
||||||
|
{
|
||||||
|
/// The request was successful.
|
||||||
|
#[oai(status = 200)]
|
||||||
|
Available(Json<T>),
|
||||||
|
|
||||||
|
/// The request was not valid. A description can be found within the
|
||||||
|
/// resulting message.
|
||||||
|
#[oai(status = 400)]
|
||||||
|
BadRequest(Json<E>),
|
||||||
|
|
||||||
|
/// The API key has exhaused the maximum permissible requests for the
|
||||||
|
/// current time window. The `retry_after` field contains the number of
|
||||||
|
/// seconds before a request is likely to succeed.
|
||||||
|
#[oai(status = 429)]
|
||||||
|
Limited(Json<RateLimitResponse>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E> RateLimitedResponse<T, E>
|
||||||
|
where
|
||||||
|
T: ToJSON,
|
||||||
|
E: ToJSON,
|
||||||
|
{
|
||||||
|
pub(crate) fn available(json: T) -> Response<Self> {
|
||||||
|
Self::Available(Json(json)).response()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn limited<S: ToString>(bucket: S, retry_after: i32) -> Response<Self> {
|
||||||
|
Self::Limited(Json(RateLimitResponse {
|
||||||
|
bucket: bucket.to_string(),
|
||||||
|
retry_after,
|
||||||
|
}))
|
||||||
|
.response()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response(self) -> Response<Self> {
|
||||||
|
Response::new(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RateLimitedResponse<T, BadRequestResponse>
|
||||||
|
where
|
||||||
|
T: ToJSON,
|
||||||
|
{
|
||||||
|
pub(crate) fn bad_request<M: ToString>(message: M) -> Response<Self> {
|
||||||
|
Self::BadRequest(Json(BadRequestResponse {
|
||||||
|
message: message.to_string(),
|
||||||
|
}))
|
||||||
|
.response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(pool, bkapi, auth, hashes, distance), fields(hashes = %hashes.0, distance = ?distance.0))]
|
||||||
|
pub(crate) async fn hashes(
|
||||||
|
pool: &Pool,
|
||||||
|
bkapi: &BKApiClient,
|
||||||
|
auth: ApiKeyAuthorization,
|
||||||
|
hashes: Query<String>,
|
||||||
|
distance: Query<Option<u64>>,
|
||||||
|
) -> poem::Result<Response<RateLimitedResponse<Vec<HashLookupResult>>>> {
|
||||||
|
let hashes: Vec<i64> = hashes
|
||||||
|
.0
|
||||||
|
.split(',')
|
||||||
|
.take(10)
|
||||||
|
.filter_map(|hash| hash.parse().ok())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if hashes.is_empty() {
|
||||||
|
return Ok(RateLimitedResponse::bad_request("hashes must be provided"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let image_remaining = rate_limit!(auth, pool, image_limit, "image", hashes.len() as i16);
|
||||||
|
|
||||||
|
let results = lookup_hashes(pool, bkapi, &hashes, distance.unwrap_or(3)).await?;
|
||||||
|
|
||||||
|
let resp =
|
||||||
|
RateLimitedResponse::available(results).inject_rate_limit_headers("image", image_remaining);
|
||||||
|
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(pool, bkapi, client, endpoints, auth, search_type, payload))]
|
||||||
|
pub(crate) async fn image(
|
||||||
|
pool: &Pool,
|
||||||
|
bkapi: &BKApiClient,
|
||||||
|
client: &reqwest::Client,
|
||||||
|
endpoints: &Endpoints,
|
||||||
|
auth: ApiKeyAuthorization,
|
||||||
|
search_type: Query<Option<ImageSearchType>>,
|
||||||
|
payload: ImageSearchPayload,
|
||||||
|
) -> poem::Result<Response<RateLimitedResponse<ImageSearchResult>>> {
|
||||||
|
let image_remaining = rate_limit!(auth, pool, image_limit, "image");
|
||||||
|
let hash_remaining = rate_limit!(auth, pool, hash_limit, "hash");
|
||||||
|
|
||||||
|
let stream = tokio_util::io::ReaderStream::new(payload.image.into_async_read());
|
||||||
|
let body = reqwest::Body::wrap_stream(stream);
|
||||||
|
|
||||||
|
let hash = hash_input(client, &endpoints.hash_input, body).await?;
|
||||||
|
|
||||||
|
let search_type = search_type.0.unwrap_or(ImageSearchType::Close);
|
||||||
|
let hashes = vec![hash];
|
||||||
|
|
||||||
|
let mut results = {
|
||||||
|
if search_type == ImageSearchType::Force {
|
||||||
|
tracing::debug!("search type is force, starting with distance of 10");
|
||||||
|
lookup_hashes(pool, bkapi, &hashes, 10).await?
|
||||||
|
} else {
|
||||||
|
tracing::debug!("close or exact search type, starting with distance of 0");
|
||||||
|
let results = lookup_hashes(pool, bkapi, &hashes, 0).await?;
|
||||||
|
|
||||||
|
if results.is_empty() && search_type != ImageSearchType::Exact {
|
||||||
|
tracing::debug!("results were empty and type is not force, expanding search");
|
||||||
|
lookup_hashes(pool, bkapi, &hashes, 10).await?
|
||||||
|
} else {
|
||||||
|
tracing::debug!("results were not empty or search type was force, ending search");
|
||||||
|
results
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::info!("search ended with {} results", results.len());
|
||||||
|
|
||||||
|
results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
|
||||||
|
|
||||||
|
let resp = RateLimitedResponse::available(ImageSearchResult {
|
||||||
|
hash,
|
||||||
|
matches: results,
|
||||||
|
})
|
||||||
|
.header("x-image-hash", hash)
|
||||||
|
.inject_rate_limit_headers("image", image_remaining)
|
||||||
|
.inject_rate_limit_headers("hash", hash_remaining);
|
||||||
|
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(pool, bkapi, client, endpoints, auth, url, distance), fields(url = %url.0, distance = ?distance.0))]
|
||||||
|
pub(crate) async fn url(
|
||||||
|
pool: &Pool,
|
||||||
|
bkapi: &BKApiClient,
|
||||||
|
client: &reqwest::Client,
|
||||||
|
endpoints: &Endpoints,
|
||||||
|
auth: ApiKeyAuthorization,
|
||||||
|
url: Query<String>,
|
||||||
|
distance: Query<Option<u64>>,
|
||||||
|
) -> poem::Result<Response<RateLimitedResponse<ImageSearchResult>>> {
|
||||||
|
let image_remaining = rate_limit!(auth, pool, image_limit, "image");
|
||||||
|
let hash_remaining = rate_limit!(auth, pool, hash_limit, "hash");
|
||||||
|
|
||||||
|
let mut resp = client
|
||||||
|
.get(&url.0)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(crate::Error::from)?;
|
||||||
|
|
||||||
|
let distance = distance.unwrap_or(3);
|
||||||
|
|
||||||
|
let content_length = resp
|
||||||
|
.headers()
|
||||||
|
.get("content-length")
|
||||||
|
.and_then(|len| {
|
||||||
|
String::from_utf8_lossy(len.as_bytes())
|
||||||
|
.parse::<usize>()
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
if content_length > 10_000_000 {
|
||||||
|
return Ok(RateLimitedResponse::bad_request(format!(
|
||||||
|
"image too large: {} bytes",
|
||||||
|
content_length
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buf = bytes::BytesMut::with_capacity(content_length);
|
||||||
|
|
||||||
|
while let Some(chunk) = resp.chunk().await.map_err(crate::Error::from)? {
|
||||||
|
if buf.len() + chunk.len() > 10_000_000 {
|
||||||
|
return Ok(RateLimitedResponse::bad_request(format!(
|
||||||
|
"image too large: {}+ bytes",
|
||||||
|
buf.len() + chunk.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.put(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
let body = reqwest::Body::from(buf.to_vec());
|
||||||
|
let hash = hash_input(client, &endpoints.hash_input, body).await?;
|
||||||
|
|
||||||
|
let results = lookup_hashes(pool, bkapi, &[hash], distance).await?;
|
||||||
|
|
||||||
|
let resp = RateLimitedResponse::available(ImageSearchResult {
|
||||||
|
hash,
|
||||||
|
matches: results,
|
||||||
|
})
|
||||||
|
.header("x-image-hash", hash)
|
||||||
|
.inject_rate_limit_headers("image", image_remaining)
|
||||||
|
.inject_rate_limit_headers("hash", hash_remaining);
|
||||||
|
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(pool, auth, file_id), fields(file_id = %file_id.0))]
|
||||||
|
pub(crate) async fn furaffinity_data(
|
||||||
|
pool: &Pool,
|
||||||
|
auth: ApiKeyAuthorization,
|
||||||
|
file_id: Query<i32>,
|
||||||
|
) -> poem::Result<Response<RateLimitedResponse<Vec<FurAffinityFile>>>> {
|
||||||
|
let file_remaining = rate_limit!(auth, pool, image_limit, "file");
|
||||||
|
|
||||||
|
let matches = sqlx::query_file!("queries/lookup_furaffinity_file_id.sql", file_id.0)
|
||||||
|
.map(|row| FurAffinityFile {
|
||||||
|
id: row.id,
|
||||||
|
url: row.url,
|
||||||
|
filename: row.filename,
|
||||||
|
file_id: row.file_id,
|
||||||
|
rating: row.rating.and_then(|rating| rating.parse().ok()),
|
||||||
|
posted_at: row.posted_at,
|
||||||
|
artist: Some(row.artist),
|
||||||
|
hash: row.hash,
|
||||||
|
})
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(crate::Error::from)?;
|
||||||
|
|
||||||
|
let resp =
|
||||||
|
RateLimitedResponse::available(matches).inject_rate_limit_headers("file", file_remaining);
|
||||||
|
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(pool, service, handle), fields(service = %service.0, handle = %handle.0))]
|
||||||
|
pub(crate) async fn known_service(
|
||||||
|
pool: &Pool,
|
||||||
|
service: Path<KnownServiceName>,
|
||||||
|
handle: Query<String>,
|
||||||
|
) -> poem::Result<Json<bool>> {
|
||||||
|
let handle_exists = match service.0 {
|
||||||
|
KnownServiceName::Twitter => {
|
||||||
|
sqlx::query_file_scalar!("queries/handle_twitter.sql", handle.0)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
.map_err(poem::error::InternalServerError)?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Json(handle_exists))
|
||||||
|
}
|
@ -1,17 +1,18 @@
|
|||||||
use std::{borrow::Cow, str::FromStr};
|
use std::{borrow::Cow, fmt::Display, str::FromStr};
|
||||||
|
|
||||||
|
use api::ApiKeyAuthorization;
|
||||||
use bkapi_client::BKApiClient;
|
use bkapi_client::BKApiClient;
|
||||||
use bytes::BufMut;
|
|
||||||
use hyper::StatusCode;
|
use hyper::StatusCode;
|
||||||
use poem::{error::ResponseError, listener::TcpListener, web::Data, EndpointExt, Request, Route};
|
use poem::{error::ResponseError, listener::TcpListener, web::Data, EndpointExt, Route};
|
||||||
use poem_openapi::{
|
use poem_openapi::{
|
||||||
auth::ApiKey,
|
|
||||||
param::{Path, Query},
|
param::{Path, Query},
|
||||||
payload::{Json, Response},
|
payload::{Json, Response},
|
||||||
types::multipart::Upload,
|
types::multipart::Upload,
|
||||||
Multipart, Object, OneOf, OpenApi, OpenApiService, SecurityScheme,
|
Multipart, Object, OneOf, OpenApi, OpenApiService,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
mod api;
|
||||||
|
|
||||||
type Pool = sqlx::PgPool;
|
type Pool = sqlx::PgPool;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -22,41 +23,20 @@ pub struct Endpoints {
|
|||||||
|
|
||||||
struct Api;
|
struct Api;
|
||||||
|
|
||||||
/// Simple authentication using a static API key. Must be manually requested.
|
|
||||||
#[derive(SecurityScheme)]
|
|
||||||
#[oai(
|
|
||||||
type = "api_key",
|
|
||||||
key_name = "X-Api-Key",
|
|
||||||
in = "header",
|
|
||||||
checker = "api_checker"
|
|
||||||
)]
|
|
||||||
struct ApiKeyAuthorization(UserApiKey);
|
|
||||||
|
|
||||||
struct UserApiKey {
|
|
||||||
id: i32,
|
|
||||||
name: Option<String>,
|
|
||||||
owner_email: String,
|
|
||||||
name_limit: i16,
|
|
||||||
image_limit: i16,
|
|
||||||
hash_limit: i16,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn api_checker(req: &Request, api_key: ApiKey) -> Option<UserApiKey> {
|
|
||||||
let pool: &Pool = req.data().unwrap();
|
|
||||||
|
|
||||||
sqlx::query_file_as!(UserApiKey, "queries/lookup_api_key.sql", api_key.key)
|
|
||||||
.fetch_optional(pool)
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(poem_openapi::Enum, Debug, PartialEq)]
|
#[derive(poem_openapi::Enum, Debug, PartialEq)]
|
||||||
#[oai(rename_all = "snake_case")]
|
#[oai(rename_all = "snake_case")]
|
||||||
enum KnownServiceName {
|
enum KnownServiceName {
|
||||||
Twitter,
|
Twitter,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Display for KnownServiceName {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Twitter => write!(f, "Twitter"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(poem_openapi::Enum, Debug)]
|
#[derive(poem_openapi::Enum, Debug)]
|
||||||
#[oai(rename_all = "lowercase")]
|
#[oai(rename_all = "lowercase")]
|
||||||
enum Rating {
|
enum Rating {
|
||||||
@ -139,7 +119,10 @@ enum Error {
|
|||||||
|
|
||||||
impl ResponseError for Error {
|
impl ResponseError for Error {
|
||||||
fn status(&self) -> hyper::StatusCode {
|
fn status(&self) -> hyper::StatusCode {
|
||||||
hyper::StatusCode::INTERNAL_SERVER_ERROR
|
match self {
|
||||||
|
Self::BadRequest(_) => hyper::StatusCode::BAD_REQUEST,
|
||||||
|
_ => hyper::StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,18 +146,6 @@ impl ResponseError for BadRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
#[error("rate limited")]
|
|
||||||
struct RateLimited {
|
|
||||||
bucket: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ResponseError for RateLimited {
|
|
||||||
fn status(&self) -> hyper::StatusCode {
|
|
||||||
hyper::StatusCode::TOO_MANY_REQUESTS
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The status of an API key's rate limit.
|
/// The status of an API key's rate limit.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum RateLimit {
|
pub enum RateLimit {
|
||||||
@ -215,28 +186,28 @@ async fn update_rate_limit(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
macro_rules! rate_limit {
|
macro_rules! rate_limit {
|
||||||
($api_key:expr, $db:expr, $limit:tt, $group:expr) => {
|
($api_key:expr, $db:expr, $limit:tt, $group:expr) => {
|
||||||
rate_limit!($api_key, $db, $limit, $group, 1)
|
rate_limit!($api_key, $db, $limit, $group, 1)
|
||||||
};
|
};
|
||||||
|
|
||||||
($api_key:expr, $db:expr, $limit:tt, $group:expr, $incr_by:expr) => {{
|
($api_key:expr, $db:expr, $limit:tt, $group:expr, $incr_by:expr) => {{
|
||||||
let rate_limit = update_rate_limit($db, $api_key.0.id, $api_key.0.$limit, $group, $incr_by)
|
let rate_limit =
|
||||||
.await
|
crate::update_rate_limit($db, $api_key.0.id, $api_key.0.$limit, $group, $incr_by)
|
||||||
.map_err(Error::from)?;
|
.await
|
||||||
|
.map_err(crate::Error::from)?;
|
||||||
|
|
||||||
match rate_limit {
|
match rate_limit {
|
||||||
RateLimit::Limited => {
|
crate::RateLimit::Limited => {
|
||||||
return Err(RateLimited {
|
return Ok(crate::api::RateLimitedResponse::limited($group, 60))
|
||||||
bucket: $group.to_string(),
|
|
||||||
}
|
|
||||||
.into())
|
|
||||||
}
|
}
|
||||||
RateLimit::Available(count) => count,
|
crate::RateLimit::Available(count) => count,
|
||||||
}
|
}
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(pool, bkapi))]
|
||||||
async fn lookup_hashes(
|
async fn lookup_hashes(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
bkapi: &BKApiClient,
|
bkapi: &BKApiClient,
|
||||||
@ -247,6 +218,8 @@ async fn lookup_hashes(
|
|||||||
return Err(BadRequest::with_message(format!("distance too large: {}", distance)).into());
|
return Err(BadRequest::with_message(format!("distance too large: {}", distance)).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::info!("looking up {} hashes", hashes.len());
|
||||||
|
|
||||||
let index_hashes: Vec<_> = bkapi
|
let index_hashes: Vec<_> = bkapi
|
||||||
.search_many(hashes, distance)
|
.search_many(hashes, distance)
|
||||||
.await?
|
.await?
|
||||||
@ -262,6 +235,15 @@ async fn lookup_hashes(
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
tracing::info!("found {} results in bkapi index", index_hashes.len());
|
||||||
|
tracing::trace!(
|
||||||
|
"bkapi matches: {:?}",
|
||||||
|
index_hashes
|
||||||
|
.iter()
|
||||||
|
.map(|hash| hash.found_hash)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
|
||||||
let data = serde_json::to_value(index_hashes)?;
|
let data = serde_json::to_value(index_hashes)?;
|
||||||
|
|
||||||
let results = sqlx::query_file!("queries/lookup_hashes.sql", data)
|
let results = sqlx::query_file!("queries/lookup_hashes.sql", data)
|
||||||
@ -294,6 +276,9 @@ async fn lookup_hashes(
|
|||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
tracing::info!("found {} matches from database", results.len());
|
||||||
|
tracing::trace!("database matches: {:?}", results);
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,6 +287,7 @@ struct ImageSearchPayload {
|
|||||||
image: Upload,
|
image: Upload,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(client, hash_input_endpoint, image))]
|
||||||
async fn hash_input(
|
async fn hash_input(
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
hash_input_endpoint: &str,
|
hash_input_endpoint: &str,
|
||||||
@ -310,6 +296,8 @@ async fn hash_input(
|
|||||||
let part = reqwest::multipart::Part::stream(image);
|
let part = reqwest::multipart::Part::stream(image);
|
||||||
let form = reqwest::multipart::Form::new().part("image", part);
|
let form = reqwest::multipart::Form::new().part("image", part);
|
||||||
|
|
||||||
|
tracing::info!("sending image for hashing");
|
||||||
|
|
||||||
let resp = client
|
let resp = client
|
||||||
.post(hash_input_endpoint)
|
.post(hash_input_endpoint)
|
||||||
.multipart(form)
|
.multipart(form)
|
||||||
@ -317,12 +305,21 @@ async fn hash_input(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if resp.status() != StatusCode::OK {
|
if resp.status() != StatusCode::OK {
|
||||||
|
tracing::warn!("got wrong status code: {}", resp.status());
|
||||||
return Err(BadRequest::with_message("invalid image").into());
|
return Err(BadRequest::with_message("invalid image").into());
|
||||||
}
|
}
|
||||||
|
|
||||||
match resp.text().await?.parse() {
|
let text = resp.text().await?;
|
||||||
Ok(hash) => Ok(hash),
|
|
||||||
Err(_err) => Err(BadRequest::with_message("invalid image").into()),
|
match text.parse() {
|
||||||
|
Ok(hash) => {
|
||||||
|
tracing::debug!("image had hash {}", hash);
|
||||||
|
Ok(hash)
|
||||||
|
}
|
||||||
|
Err(_err) => {
|
||||||
|
tracing::warn!("got invalid data: {}", text);
|
||||||
|
Err(BadRequest::with_message("invalid image").into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -352,11 +349,38 @@ struct FurAffinityFile {
|
|||||||
hash: Option<i64>,
|
hash: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait ResponseRateLimitHeaders
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
fn inject_rate_limit_headers(self, name: &'static str, remaining: (i16, i16)) -> Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ResponseRateLimitHeaders for poem_openapi::payload::Response<T> {
|
||||||
|
fn inject_rate_limit_headers(self, name: &'static str, remaining: (i16, i16)) -> Self {
|
||||||
|
self.header(&format!("x-rate-limit-total-{}", name), remaining.1)
|
||||||
|
.header(&format!("x-rate-limit-remaining-{}", name), remaining.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// LimitsResponse
|
||||||
|
///
|
||||||
|
/// The allowed number of requests per minute for an API key.
|
||||||
|
#[derive(Object, Debug)]
|
||||||
|
struct LimitsResponse {
|
||||||
|
/// The number of name lookups.
|
||||||
|
name: i16,
|
||||||
|
/// The number of hash lookups.
|
||||||
|
image: i16,
|
||||||
|
/// The number of image hashes.
|
||||||
|
hash: i16,
|
||||||
|
}
|
||||||
|
|
||||||
#[OpenApi]
|
#[OpenApi]
|
||||||
impl Api {
|
impl Api {
|
||||||
/// Lookup images by hash
|
/// Lookup images by hash
|
||||||
///
|
///
|
||||||
/// Perform a lookup for up to 10 given hashes.
|
/// Perform a lookup using up to 10 hashes.
|
||||||
#[oai(path = "/hashes", method = "get")]
|
#[oai(path = "/hashes", method = "get")]
|
||||||
async fn hashes(
|
async fn hashes(
|
||||||
&self,
|
&self,
|
||||||
@ -365,27 +389,8 @@ impl Api {
|
|||||||
auth: ApiKeyAuthorization,
|
auth: ApiKeyAuthorization,
|
||||||
hashes: Query<String>,
|
hashes: Query<String>,
|
||||||
distance: Query<Option<u64>>,
|
distance: Query<Option<u64>>,
|
||||||
) -> poem::Result<Response<Json<Vec<HashLookupResult>>>> {
|
) -> poem::Result<Response<api::RateLimitedResponse<Vec<HashLookupResult>>>> {
|
||||||
let hashes: Vec<i64> = hashes
|
api::hashes(pool.0, bkapi.0, auth, hashes, distance).await
|
||||||
.0
|
|
||||||
.split(',')
|
|
||||||
.take(10)
|
|
||||||
.filter_map(|hash| hash.parse().ok())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let image_remaining = rate_limit!(auth, pool.0, image_limit, "image", hashes.len() as i16);
|
|
||||||
|
|
||||||
if hashes.is_empty() {
|
|
||||||
return Err(BadRequest::with_message("hashes must be provided").into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let results = lookup_hashes(&pool, &bkapi, &hashes, distance.unwrap_or(3)).await?;
|
|
||||||
|
|
||||||
let resp = Response::new(Json(results))
|
|
||||||
.header("x-rate-limit-total-image", image_remaining.1)
|
|
||||||
.header("x-rate-limit-remaining-image", image_remaining.0);
|
|
||||||
|
|
||||||
Ok(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lookup images by image
|
/// Lookup images by image
|
||||||
@ -401,45 +406,17 @@ impl Api {
|
|||||||
auth: ApiKeyAuthorization,
|
auth: ApiKeyAuthorization,
|
||||||
search_type: Query<Option<ImageSearchType>>,
|
search_type: Query<Option<ImageSearchType>>,
|
||||||
payload: ImageSearchPayload,
|
payload: ImageSearchPayload,
|
||||||
) -> poem::Result<Response<Json<ImageSearchResult>>> {
|
) -> poem::Result<Response<api::RateLimitedResponse<ImageSearchResult>>> {
|
||||||
let image_remaining = rate_limit!(auth, pool.0, image_limit, "image");
|
api::image(
|
||||||
let hash_remaining = rate_limit!(auth, pool.0, hash_limit, "hash");
|
pool.0,
|
||||||
|
bkapi.0,
|
||||||
let stream = tokio_util::io::ReaderStream::new(payload.image.into_async_read());
|
client.0,
|
||||||
let body = reqwest::Body::wrap_stream(stream);
|
endpoints.0,
|
||||||
|
auth,
|
||||||
let hash = hash_input(&client, &endpoints.hash_input, body).await?;
|
search_type,
|
||||||
|
payload,
|
||||||
let search_type = search_type.0.unwrap_or(ImageSearchType::Close);
|
)
|
||||||
let hashes = vec![hash];
|
.await
|
||||||
|
|
||||||
let mut results = {
|
|
||||||
if search_type == ImageSearchType::Force {
|
|
||||||
lookup_hashes(pool.0, bkapi.0, &hashes, 10).await?
|
|
||||||
} else {
|
|
||||||
let results = lookup_hashes(pool.0, bkapi.0, &hashes, 0).await?;
|
|
||||||
|
|
||||||
if results.is_empty() && search_type != ImageSearchType::Exact {
|
|
||||||
lookup_hashes(pool.0, bkapi.0, &hashes, 10).await?
|
|
||||||
} else {
|
|
||||||
results
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
|
|
||||||
|
|
||||||
let resp = Response::new(Json(ImageSearchResult {
|
|
||||||
hash,
|
|
||||||
matches: results,
|
|
||||||
}))
|
|
||||||
.header("x-image-hash", hash)
|
|
||||||
.header("x-rate-limit-total-image", image_remaining.1)
|
|
||||||
.header("x-rate-limit-remaining-image", image_remaining.0)
|
|
||||||
.header("x-rate-limit-total-hash", hash_remaining.1)
|
|
||||||
.header("x-rate-limit-remaining-hash", hash_remaining.0);
|
|
||||||
|
|
||||||
Ok(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lookup images by image URL
|
/// Lookup images by image URL
|
||||||
@ -455,62 +432,8 @@ impl Api {
|
|||||||
auth: ApiKeyAuthorization,
|
auth: ApiKeyAuthorization,
|
||||||
url: Query<String>,
|
url: Query<String>,
|
||||||
distance: Query<Option<u64>>,
|
distance: Query<Option<u64>>,
|
||||||
) -> poem::Result<Response<Json<ImageSearchResult>>> {
|
) -> poem::Result<Response<api::RateLimitedResponse<ImageSearchResult>>> {
|
||||||
let image_remaining = rate_limit!(auth, pool.0, image_limit, "image");
|
api::url(pool.0, bkapi.0, client.0, endpoints.0, auth, url, distance).await
|
||||||
let hash_remaining = rate_limit!(auth, pool.0, hash_limit, "hash");
|
|
||||||
|
|
||||||
let mut resp = client.get(&url.0).send().await.map_err(Error::from)?;
|
|
||||||
|
|
||||||
let distance = distance.unwrap_or(3);
|
|
||||||
|
|
||||||
let content_length = resp
|
|
||||||
.headers()
|
|
||||||
.get("content-length")
|
|
||||||
.and_then(|len| {
|
|
||||||
String::from_utf8_lossy(len.as_bytes())
|
|
||||||
.parse::<usize>()
|
|
||||||
.ok()
|
|
||||||
})
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
if content_length > 10_000_000 {
|
|
||||||
return Err(BadRequest::with_message(format!(
|
|
||||||
"image too large: {} bytes",
|
|
||||||
content_length
|
|
||||||
))
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buf = bytes::BytesMut::with_capacity(content_length);
|
|
||||||
|
|
||||||
while let Some(chunk) = resp.chunk().await.map_err(Error::from)? {
|
|
||||||
if buf.len() + chunk.len() > 10_000_000 {
|
|
||||||
return Err(BadRequest::with_message(format!(
|
|
||||||
"image too large: {} bytes",
|
|
||||||
content_length
|
|
||||||
))
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
buf.put(chunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
let body = reqwest::Body::from(buf.to_vec());
|
|
||||||
let hash = hash_input(&client, &endpoints.hash_input, body).await?;
|
|
||||||
|
|
||||||
let results = lookup_hashes(pool.0, bkapi.0, &[hash], distance).await?;
|
|
||||||
|
|
||||||
let resp = Response::new(Json(ImageSearchResult {
|
|
||||||
hash,
|
|
||||||
matches: results,
|
|
||||||
}))
|
|
||||||
.header("x-image-hash", hash)
|
|
||||||
.header("x-rate-limit-total-image", image_remaining.1)
|
|
||||||
.header("x-rate-limit-remaining-image", image_remaining.0)
|
|
||||||
.header("x-rate-limit-total-hash", hash_remaining.1)
|
|
||||||
.header("x-rate-limit-remaining-hash", hash_remaining.0);
|
|
||||||
|
|
||||||
Ok(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lookup FurAffinity submission by File ID
|
/// Lookup FurAffinity submission by File ID
|
||||||
@ -520,29 +443,21 @@ impl Api {
|
|||||||
pool: Data<&Pool>,
|
pool: Data<&Pool>,
|
||||||
auth: ApiKeyAuthorization,
|
auth: ApiKeyAuthorization,
|
||||||
file_id: Query<i32>,
|
file_id: Query<i32>,
|
||||||
) -> poem::Result<Response<Json<Vec<FurAffinityFile>>>> {
|
) -> poem::Result<Response<api::RateLimitedResponse<Vec<FurAffinityFile>>>> {
|
||||||
let file_remaining = rate_limit!(auth, pool.0, image_limit, "file");
|
api::furaffinity_data(pool.0, auth, file_id).await
|
||||||
|
}
|
||||||
|
|
||||||
let matches = sqlx::query_file!("queries/lookup_furaffinity_file_id.sql", file_id.0)
|
/// Check API key limits
|
||||||
.map(|row| FurAffinityFile {
|
///
|
||||||
id: row.id,
|
/// Determine the number of allowed requests per minute for the current
|
||||||
url: row.url,
|
/// API token.
|
||||||
filename: row.filename,
|
#[oai(path = "/limits", method = "get")]
|
||||||
file_id: row.file_id,
|
async fn limits(&self, auth: ApiKeyAuthorization) -> Json<LimitsResponse> {
|
||||||
rating: row.rating.and_then(|rating| rating.parse().ok()),
|
Json(LimitsResponse {
|
||||||
posted_at: row.posted_at,
|
name: auth.0.name_limit,
|
||||||
artist: Some(row.artist),
|
image: auth.0.image_limit,
|
||||||
hash: row.hash,
|
hash: auth.0.hash_limit,
|
||||||
})
|
})
|
||||||
.fetch_all(pool.0)
|
|
||||||
.await
|
|
||||||
.map_err(Error::from)?;
|
|
||||||
|
|
||||||
let resp = Response::new(Json(matches))
|
|
||||||
.header("x-rate-limit-total-file", file_remaining.1)
|
|
||||||
.header("x-rate-limit-remaining-file", file_remaining.0);
|
|
||||||
|
|
||||||
Ok(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if a handle is known for a given service
|
/// Check if a handle is known for a given service
|
||||||
@ -556,16 +471,7 @@ impl Api {
|
|||||||
service: Path<KnownServiceName>,
|
service: Path<KnownServiceName>,
|
||||||
handle: Query<String>,
|
handle: Query<String>,
|
||||||
) -> poem::Result<Json<bool>> {
|
) -> poem::Result<Json<bool>> {
|
||||||
let handle_exists = match service.0 {
|
api::known_service(pool.0, service, handle).await
|
||||||
KnownServiceName::Twitter => {
|
|
||||||
sqlx::query_file_scalar!("queries/handle_twitter.sql", handle.0)
|
|
||||||
.fetch_one(pool.0)
|
|
||||||
.await
|
|
||||||
.map_err(poem::error::InternalServerError)?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Json(handle_exists))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -607,6 +513,9 @@ async fn main() {
|
|||||||
.data(reqwest::Client::new())
|
.data(reqwest::Client::new())
|
||||||
.with(poem::middleware::Tracing)
|
.with(poem::middleware::Tracing)
|
||||||
.with(poem::middleware::OpenTelemetryMetrics::new())
|
.with(poem::middleware::OpenTelemetryMetrics::new())
|
||||||
|
.with(poem::middleware::OpenTelemetryTracing::new(
|
||||||
|
fuzzysearch_common::trace::get_tracer("fuzzysearch-api"),
|
||||||
|
))
|
||||||
.with(cors);
|
.with(cors);
|
||||||
|
|
||||||
poem::Server::new(TcpListener::bind("0.0.0.0:8080"))
|
poem::Server::new(TcpListener::bind("0.0.0.0:8080"))
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
pub fn configure_tracing(service_name: &'static str) {
|
use opentelemetry::KeyValue;
|
||||||
use opentelemetry::KeyValue;
|
|
||||||
use tracing_subscriber::layer::SubscriberExt;
|
|
||||||
|
|
||||||
tracing_log::LogTracer::init().unwrap();
|
|
||||||
|
|
||||||
|
pub fn get_tracer(service_name: &'static str) -> opentelemetry::sdk::trace::Tracer {
|
||||||
let env = std::env::var("ENVIRONMENT");
|
let env = std::env::var("ENVIRONMENT");
|
||||||
let env = if let Ok(env) = env.as_ref() {
|
let env = if let Ok(env) = env.as_ref() {
|
||||||
env.as_str()
|
env.as_str()
|
||||||
@ -13,9 +10,7 @@ pub fn configure_tracing(service_name: &'static str) {
|
|||||||
"release"
|
"release"
|
||||||
};
|
};
|
||||||
|
|
||||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
opentelemetry_jaeger::new_pipeline()
|
||||||
|
|
||||||
let tracer = opentelemetry_jaeger::new_pipeline()
|
|
||||||
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
|
.with_agent_endpoint(std::env::var("JAEGER_COLLECTOR").expect("Missing JAEGER_COLLECTOR"))
|
||||||
.with_service_name(service_name)
|
.with_service_name(service_name)
|
||||||
.with_tags(vec![
|
.with_tags(vec![
|
||||||
@ -23,7 +18,17 @@ pub fn configure_tracing(service_name: &'static str) {
|
|||||||
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
|
KeyValue::new("version", env!("CARGO_PKG_VERSION")),
|
||||||
])
|
])
|
||||||
.install_batch(opentelemetry::runtime::Tokio)
|
.install_batch(opentelemetry::runtime::Tokio)
|
||||||
.unwrap();
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn configure_tracing(service_name: &'static str) {
|
||||||
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
|
|
||||||
|
tracing_log::LogTracer::init().unwrap();
|
||||||
|
|
||||||
|
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||||
|
|
||||||
|
let tracer = get_tracer(service_name);
|
||||||
|
|
||||||
let trace = tracing_opentelemetry::layer().with_tracer(tracer);
|
let trace = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||||
let env_filter = tracing_subscriber::EnvFilter::from_default_env();
|
let env_filter = tracing_subscriber::EnvFilter::from_default_env();
|
||||||
|
Loading…
Reference in New Issue
Block a user