Add Prometheus metrics.

This commit is contained in:
Syfaro 2020-09-06 13:40:00 -04:00
parent 8972e2bede
commit b6b21c4ee6
3 changed files with 1060 additions and 890 deletions

1872
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,9 @@ tokio-postgres = { version = "0.5.0" }
r2d2_postgres = " 0.16.0" r2d2_postgres = " 0.16.0"
r2d2 = "0.8" r2d2 = "0.8"
chrono = "0.4" chrono = "0.4"
hyper = "0.13"
prometheus = { version = "0.10", features = ["process"] }
lazy_static = "1"
[dependencies.furaffinity-rs] [dependencies.furaffinity-rs]
git = "https://github.com/Syfaro/furaffinity-rs" git = "https://github.com/Syfaro/furaffinity-rs"

View File

@ -1,5 +1,14 @@
use lazy_static::lazy_static;
use tokio_postgres::Client; use tokio_postgres::Client;
lazy_static! {
static ref SUBMISSION_DURATION: prometheus::Histogram = prometheus::register_histogram!(
"fuzzysearch_watcher_fa_processing_seconds",
"Duration to process a submission"
)
.unwrap();
}
async fn lookup_tag(client: &Client, tag: &str) -> i32 { async fn lookup_tag(client: &Client, tag: &str) -> i32 {
if let Some(row) = client if let Some(row) = client
.query("SELECT id FROM tag WHERE name = $1", &[&tag]) .query("SELECT id FROM tag WHERE name = $1", &[&tag])
@ -62,10 +71,10 @@ async fn ids_to_check(client: &Client, max: i32) -> Vec<i32> {
} }
async fn insert_submission( async fn insert_submission(
mut client: &Client, client: &Client,
sub: &furaffinity_rs::Submission, sub: &furaffinity_rs::Submission,
) -> Result<(), postgres::Error> { ) -> Result<(), postgres::Error> {
let artist_id = lookup_artist(&mut client, &sub.artist).await; let artist_id = lookup_artist(&client, &sub.artist).await;
let mut tag_ids = Vec::with_capacity(sub.tags.len()); let mut tag_ids = Vec::with_capacity(sub.tags.len());
for tag in &sub.tags { for tag in &sub.tags {
tag_ids.push(lookup_tag(&client, &tag).await); tag_ids.push(lookup_tag(&client, &tag).await);
@ -78,9 +87,9 @@ async fn insert_submission(
&sub.id, &artist_id, &url, &sub.filename, &hash, &sub.rating.serialize(), &sub.posted_at, &sub.description, &sub.hash_num, &sub.id, &artist_id, &url, &sub.filename, &hash, &sub.rating.serialize(), &sub.posted_at, &sub.description, &sub.hash_num,
]).await?; ]).await?;
let stmt = client.prepare( let stmt = client
"INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", .prepare("INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING")
).await?; .await?;
for tag_id in tag_ids { for tag_id in tag_ids {
client.execute(&stmt, &[&tag_id, &sub.id]).await?; client.execute(&stmt, &[&tag_id, &sub.id]).await?;
@ -90,7 +99,49 @@ async fn insert_submission(
} }
async fn insert_null_submission(client: &Client, id: i32) -> Result<u64, postgres::Error> { async fn insert_null_submission(client: &Client, id: i32) -> Result<u64, postgres::Error> {
client.execute("INSERT INTO SUBMISSION (id) VALUES ($1)", &[&id]).await client
.execute("INSERT INTO SUBMISSION (id) VALUES ($1)", &[&id])
.await
}
async fn request(
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/health") => Ok(hyper::Response::new(hyper::Body::from("OK"))),
(&hyper::Method::GET, "/metrics") => {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(hyper::Response::new(hyper::Body::from(buffer)))
}
_ => {
let mut not_found = hyper::Response::default();
*not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}
async fn web() {
use hyper::service::{make_service_fn, service_fn};
let addr = ([127, 0, 0, 1], 3000).into();
let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(request)) });
let server = hyper::Server::bind(&addr).serve(service);
println!("Listening on http://{}", addr);
server.await.unwrap();
} }
#[tokio::main] #[tokio::main]
@ -106,7 +157,9 @@ async fn main() {
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn"); let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls).await.unwrap(); let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
.await
.unwrap();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = connection.await { if let Err(e) = connection.await {
@ -114,6 +167,8 @@ async fn main() {
} }
}); });
tokio::spawn(async move { web().await });
println!("Started"); println!("Started");
'main: loop { 'main: loop {
@ -125,10 +180,13 @@ async fn main() {
if !has_submission(&client, id).await { if !has_submission(&client, id).await {
println!("loading submission {}", id); println!("loading submission {}", id);
let timer = SUBMISSION_DURATION.start_timer();
let sub = match fa.get_submission(id).await { let sub = match fa.get_submission(id).await {
Ok(sub) => sub, Ok(sub) => sub,
Err(e) => { Err(e) => {
println!("got error: {:?}, retry {}", e.message, e.retry); println!("got error: {:?}, retry {}", e.message, e.retry);
timer.stop_and_discard();
if e.retry { if e.retry {
tokio::time::delay_for(std::time::Duration::from_secs(attempt + 1)) tokio::time::delay_for(std::time::Duration::from_secs(attempt + 1))
.await; .await;
@ -144,6 +202,7 @@ async fn main() {
Some(sub) => sub, Some(sub) => sub,
None => { None => {
println!("did not exist"); println!("did not exist");
timer.stop_and_discard();
insert_null_submission(&client, id).await.unwrap(); insert_null_submission(&client, id).await.unwrap();
break 'attempt; break 'attempt;
} }
@ -157,6 +216,8 @@ async fn main() {
} }
}; };
timer.stop_and_record();
insert_submission(&client, &sub).await.unwrap(); insert_submission(&client, &sub).await.unwrap();
break 'attempt; break 'attempt;