Add Prometheus for request and process monitoring (#4)

* Add Prometheus metrics.

* Set default METRICS_HOST and EXPOSE in Dockerfile.
This commit is contained in:
Syfaro 2021-02-17 17:41:55 -05:00 committed by GitHub
parent 908cda8ce9
commit c345c51a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 164 additions and 6 deletions

51
Cargo.lock generated
View File

@ -410,6 +410,18 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "flate2"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0"
dependencies = [
"cfg-if 1.0.0",
"crc32fast",
"libc",
"miniz_oxide 0.4.3",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -555,8 +567,10 @@ dependencies = [
"hyper", "hyper",
"image", "image",
"img_hash", "img_hash",
"lazy_static",
"opentelemetry", "opentelemetry",
"opentelemetry-jaeger", "opentelemetry-jaeger",
"prometheus 0.11.0",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@ -1027,9 +1041,9 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.7.7" version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7" checksum = "dc250d6848c90d719ea2ce34546fb5df7af1d3fd189d10bf7bad80bfcebecd95"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@ -1228,7 +1242,7 @@ dependencies = [
"lazy_static", "lazy_static",
"percent-encoding", "percent-encoding",
"pin-project 0.4.27", "pin-project 0.4.27",
"prometheus", "prometheus 0.7.0",
"rand 0.7.3", "rand 0.7.3",
] ]
@ -1379,6 +1393,20 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "procfs"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab8809e0c18450a2db0f236d2a44ec0b4c1412d0eb936233579f0990faa5d5cd"
dependencies = [
"bitflags",
"byteorder",
"flate2",
"hex",
"lazy_static",
"libc",
]
[[package]] [[package]]
name = "prometheus" name = "prometheus"
version = "0.7.0" version = "0.7.0"
@ -1393,6 +1421,23 @@ dependencies = [
"spin", "spin",
] ]
[[package]]
name = "prometheus"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8425533e7122f0c3cc7a37e6244b16ad3a2cc32ae7ac6276e2a75da0d9c200d"
dependencies = [
"cfg-if 1.0.0",
"fnv",
"lazy_static",
"libc",
"parking_lot",
"procfs",
"protobuf",
"regex",
"thiserror",
]
[[package]] [[package]]
name = "protobuf" name = "protobuf"
version = "2.22.0" version = "2.22.0"

View File

@ -9,6 +9,9 @@ tracing = "0.1"
tracing-subscriber = "0.2" tracing-subscriber = "0.2"
tracing-futures = "0.2" tracing-futures = "0.2"
prometheus = { version = "0.11", features = ["process"] }
lazy_static = "1"
opentelemetry = "0.6" opentelemetry = "0.6"
opentelemetry-jaeger = "0.5" opentelemetry-jaeger = "0.5"
tracing-opentelemetry = "0.5" tracing-opentelemetry = "0.5"

View File

@ -6,7 +6,8 @@ COPY . .
RUN cargo install --root / --path . RUN cargo install --root / --path .
FROM debian:buster-slim FROM debian:buster-slim
EXPOSE 8080 EXPOSE 8080 8081
ENV METRICS_HOST=0.0.0.0:8081
WORKDIR /app WORKDIR /app
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /bin/fuzzysearch /bin/fuzzysearch COPY --from=builder /bin/fuzzysearch /bin/fuzzysearch

View File

@ -1,11 +1,31 @@
use crate::models::{image_query, image_query_sync}; use crate::models::{image_query, image_query_sync};
use crate::types::*; use crate::types::*;
use crate::{early_return, rate_limit, Pool, Tree}; use crate::{early_return, rate_limit, Pool, Tree};
use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
use std::convert::TryInto; use std::convert::TryInto;
use tracing::{span, warn}; use tracing::{span, warn};
use tracing_futures::Instrument; use tracing_futures::Instrument;
use warp::{Rejection, Reply}; use warp::{Rejection, Reply};
lazy_static! {
static ref IMAGE_HASH_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_hash_seconds",
"Duration to perform an image hash operation"
)
.unwrap();
static ref IMAGE_URL_DOWNLOAD_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_url_download_seconds",
"Duration to download an image from a provided URL"
)
.unwrap();
static ref UNHANDLED_REJECTIONS: IntCounter = register_int_counter!(
"fuzzysearch_api_unhandled_rejections_count",
"Number of unhandled HTTP rejections"
)
.unwrap();
}
#[derive(Debug)] #[derive(Debug)]
enum Error { enum Error {
Postgres(sqlx::Error), Postgres(sqlx::Error),
@ -87,6 +107,7 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas
let len = bytes.len(); let len = bytes.len();
let _timer = IMAGE_HASH_DURATION.start_timer();
let hash = tokio::task::spawn_blocking(move || { let hash = tokio::task::spawn_blocking(move || {
let hasher = crate::get_hasher(); let hasher = crate::get_hasher();
let image = image::load_from_memory(&bytes).unwrap(); let image = image::load_from_memory(&bytes).unwrap();
@ -95,6 +116,7 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas
.instrument(span!(tracing::Level::TRACE, "hashing image", len)) .instrument(span!(tracing::Level::TRACE, "hashing image", len))
.await .await
.unwrap(); .unwrap();
drop(_timer);
let mut buf: [u8; 8] = [0; 8]; let mut buf: [u8; 8] = [0; 8];
buf.copy_from_slice(&hash.as_bytes()); buf.copy_from_slice(&hash.as_bytes());
@ -396,6 +418,8 @@ pub async fn search_image_by_url(
let image_remaining = rate_limit!(&api_key, &db, image_limit, "image"); let image_remaining = rate_limit!(&api_key, &db, image_limit, "image");
let hash_remaining = rate_limit!(&api_key, &db, hash_limit, "hash"); let hash_remaining = rate_limit!(&api_key, &db, hash_limit, "hash");
let _timer = IMAGE_URL_DOWNLOAD_DURATION.start_timer();
let mut resp = match reqwest::get(&url).await { let mut resp = match reqwest::get(&url).await {
Ok(resp) => resp, Ok(resp) => resp,
Err(_err) => return Ok(Box::new(Error::InvalidImage)), Err(_err) => return Ok(Box::new(Error::InvalidImage)),
@ -425,6 +449,9 @@ pub async fn search_image_by_url(
buf.put(chunk); buf.put(chunk);
} }
drop(_timer);
let _timer = IMAGE_HASH_DURATION.start_timer();
let hash = tokio::task::spawn_blocking(move || { let hash = tokio::task::spawn_blocking(move || {
let hasher = crate::get_hasher(); let hasher = crate::get_hasher();
let image = image::load_from_memory(&buf).unwrap(); let image = image::load_from_memory(&buf).unwrap();
@ -433,6 +460,7 @@ pub async fn search_image_by_url(
.instrument(span!(tracing::Level::TRACE, "hashing image")) .instrument(span!(tracing::Level::TRACE, "hashing image"))
.await .await
.unwrap(); .unwrap();
drop(_timer);
let hash: [u8; 8] = hash.as_bytes().try_into().unwrap(); let hash: [u8; 8] = hash.as_bytes().try_into().unwrap();
let num = i64::from_be_bytes(hash); let num = i64::from_be_bytes(hash);
@ -461,6 +489,8 @@ pub async fn search_image_by_url(
pub async fn handle_rejection(err: Rejection) -> Result<Box<dyn Reply>, std::convert::Infallible> { pub async fn handle_rejection(err: Rejection) -> Result<Box<dyn Reply>, std::convert::Infallible> {
warn!("had rejection"); warn!("had rejection");
UNHANDLED_REJECTIONS.inc();
let (code, message) = if err.is_not_found() { let (code, message) = if err.is_not_found() {
( (
warp::http::StatusCode::NOT_FOUND, warp::http::StatusCode::NOT_FOUND,

View File

@ -43,6 +43,8 @@ async fn main() {
.await .await
.expect("Unable to create Postgres pool"); .expect("Unable to create Postgres pool");
serve_metrics().await;
let tree: Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(Hamming))); let tree: Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(Hamming)));
load_updates(db_pool.clone(), tree.clone()).await; load_updates(db_pool.clone(), tree.clone()).await;
@ -119,6 +121,43 @@ fn configure_tracing() {
registry.init(); registry.init();
} }
async fn metrics(
_: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
use hyper::{Body, Response};
use prometheus::{Encoder, TextEncoder};
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(Response::new(Body::from(buffer)))
}
async fn serve_metrics() {
use hyper::{
service::{make_service_fn, service_fn},
Server,
};
use std::convert::Infallible;
use std::net::SocketAddr;
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect("Missing METRICS_HOST")
.parse()
.expect("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
server.await.expect("Metrics server error");
});
}
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct HashRow { struct HashRow {
id: i32, id: i32,

View File

@ -1,7 +1,22 @@
use crate::types::*; use crate::types::*;
use crate::{Pool, Tree}; use crate::{Pool, Tree};
use lazy_static::lazy_static;
use prometheus::{register_histogram, Histogram};
use tracing_futures::Instrument; use tracing_futures::Instrument;
lazy_static! {
static ref IMAGE_LOOKUP_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_lookup_seconds",
"Duration to perform an image lookup"
)
.unwrap();
static ref IMAGE_QUERY_DURATION: Histogram = register_histogram!(
"fuzzysearch_api_image_query_seconds",
"Duration to perform a single image lookup query"
)
.unwrap();
}
#[tracing::instrument(skip(db))] #[tracing::instrument(skip(db))]
pub async fn lookup_api_key(key: &str, db: &sqlx::PgPool) -> Option<ApiKey> { pub async fn lookup_api_key(key: &str, db: &sqlx::PgPool) -> Option<ApiKey> {
sqlx::query_as!( sqlx::query_as!(
@ -62,6 +77,8 @@ pub fn image_query_sync(
for query_hash in hashes { for query_hash in hashes {
let mut seen = std::collections::HashSet::new(); let mut seen = std::collections::HashSet::new();
let _timer = IMAGE_LOOKUP_DURATION.start_timer();
let node = crate::Node::query(query_hash.to_be_bytes()); let node = crate::Node::query(query_hash.to_be_bytes());
let lock = tree.read().await; let lock = tree.read().await;
let items = lock.find(&node, distance as u64); let items = lock.find(&node, distance as u64);
@ -72,6 +89,8 @@ pub fn image_query_sync(
} }
seen.insert(item.id); seen.insert(item.id);
let _timer = IMAGE_QUERY_DURATION.start_timer();
let row = sqlx::query!("SELECT let row = sqlx::query!("SELECT
hashes.id, hashes.id,
hashes.hash, hashes.hash,

View File

@ -1,4 +1,15 @@
use crate::types::*; use crate::types::*;
use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec, IntCounterVec};
lazy_static! {
pub static ref RATE_LIMIT_STATUS: IntCounterVec = register_int_counter_vec!(
"fuzzysearch_api_rate_limit_count",
"Number of allowed and rate limited requests",
&["status"]
)
.unwrap();
}
#[macro_export] #[macro_export]
macro_rules! rate_limit { macro_rules! rate_limit {
@ -26,8 +37,18 @@ macro_rules! rate_limit {
}; };
match rate_limit { match rate_limit {
crate::types::RateLimit::Limited => return Ok(Box::new(Error::RateLimit)), crate::types::RateLimit::Limited => {
crate::types::RateLimit::Available(count) => count, crate::utils::RATE_LIMIT_STATUS
.with_label_values(&["limited"])
.inc();
return Ok(Box::new(Error::RateLimit));
}
crate::types::RateLimit::Available(count) => {
crate::utils::RATE_LIMIT_STATUS
.with_label_values(&["allowed"])
.inc();
count
}
} }
}}; }};
} }