More logging updates.

This commit is contained in:
Syfaro 2021-04-21 21:46:10 -04:00
parent 32fbfe1d00
commit 47703694d1
11 changed files with 65 additions and 77 deletions

23
Cargo.lock generated
View File

@ -131,9 +131,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.56" version = "0.3.57"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d117600f438b1707d4e4ae15d3595657288f8235a0eb593e80ecc98ab34e1bc" checksum = "78ed203b9ba68b242c62b3fb7480f589dd49829be1edb3fe8fc8b4ffda2dcb8d"
dependencies = [ dependencies = [
"addr2line", "addr2line",
"cfg-if 1.0.0", "cfg-if 1.0.0",
@ -923,6 +923,7 @@ dependencies = [
"tempfile", "tempfile",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber",
] ]
[[package]] [[package]]
@ -944,7 +945,7 @@ dependencies = [
"sqlx", "sqlx",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-unwrap",
] ]
[[package]] [[package]]
@ -969,7 +970,6 @@ dependencies = [
"tokio", "tokio",
"tokio-postgres", "tokio-postgres",
"tracing", "tracing",
"tracing-subscriber",
"tracing-unwrap", "tracing-unwrap",
] ]
@ -988,7 +988,7 @@ dependencies = [
"sqlx", "sqlx",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-unwrap",
] ]
[[package]] [[package]]
@ -1004,7 +1004,6 @@ dependencies = [
"serde_json", "serde_json",
"thiserror", "thiserror",
"tracing", "tracing",
"tracing-subscriber",
"tracing-unwrap", "tracing-unwrap",
] ]
@ -1257,15 +1256,15 @@ checksum = "4a1ce40d6fc9764887c2fdc7305c3dcc429ba11ff981c1509416afd5697e4437"
[[package]] [[package]]
name = "httpdate" name = "httpdate"
version = "0.3.2" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" checksum = "05842d0d43232b23ccb7060ecb0f0626922c21f30012e97b767b30afd4a5d4b9"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.5" version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf09f61b52cfcf4c00de50df88ae423d6c02354e385a86341133b5338630ad1" checksum = "5f006b8784cfb01fe7aa9c46f5f5cd4cf5c85a8c612a0653ec97642979062665"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
@ -2947,9 +2946,9 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.69" version = "1.0.70"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48fe99c6bd8b1cc636890bcc071842de909d902c81ac7dab53ba33c421ab8ffb" checksum = "b9505f307c872bab8eb46f77ae357c8eba1fdacead58ee5a850116b1d7f82883"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -13,6 +13,7 @@ queue = ["faktory", "tokio", "serde_json"]
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
base64 = "0.13" base64 = "0.13"

View File

@ -14,3 +14,16 @@ pub fn get_hasher() -> img_hash::Hasher<[u8; 8]> {
.preproc_dct() .preproc_dct()
.to_hasher() .to_hasher()
} }
/// Initialize the logger. This should only be called by the running binary.
pub fn init_logger() {
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) {
tracing_subscriber::fmt::Subscriber::builder()
.json()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.init();
} else {
tracing_subscriber::fmt::init();
}
}

View File

@ -21,7 +21,7 @@ img_hash = "3"
sha2 = "0.9" sha2 = "0.9"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-unwrap = "0.9"
anyhow = "1" anyhow = "1"

View File

@ -2,6 +2,7 @@ use anyhow::Context;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_gauge, Histogram, IntGauge}; use prometheus::{register_histogram, register_int_gauge, Histogram, IntGauge};
use sqlx::Connection; use sqlx::Connection;
use tracing_unwrap::ResultExt;
use fuzzysearch_common::faktory::FaktoryClient; use fuzzysearch_common::faktory::FaktoryClient;
@ -12,50 +13,44 @@ lazy_static! {
"fuzzysearch_watcher_e621_submission_backlog", "fuzzysearch_watcher_e621_submission_backlog",
"Number of submissions behind the latest ID" "Number of submissions behind the latest ID"
) )
.unwrap(); .unwrap_or_log();
static ref INDEX_DURATION: Histogram = register_histogram!( static ref INDEX_DURATION: Histogram = register_histogram!(
"fuzzysearch_watcher_e621_index_duration", "fuzzysearch_watcher_e621_index_duration",
"Duration to load an index of submissions" "Duration to load an index of submissions"
) )
.unwrap(); .unwrap_or_log();
static ref SUBMISSION_DURATION: Histogram = register_histogram!( static ref SUBMISSION_DURATION: Histogram = register_histogram!(
"fuzzysearch_watcher_e621_submission_duration", "fuzzysearch_watcher_e621_submission_duration",
"Duration to ingest a submission" "Duration to ingest a submission"
) )
.unwrap(); .unwrap_or_log();
} }
type Auth = (String, Option<String>); type Auth = (String, Option<String>);
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) { fuzzysearch_common::init_logger();
tracing_subscriber::fmt::Subscriber::builder()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.init();
} else {
tracing_subscriber::fmt::init();
}
create_metrics_server().await; create_metrics_server().await;
let login = std::env::var("E621_LOGIN").expect("Missing E621_LOGIN"); let login = std::env::var("E621_LOGIN").expect_or_log("Missing E621_LOGIN");
let api_key = std::env::var("E621_API_KEY").expect("Missing E621_API_KEY"); let api_key = std::env::var("E621_API_KEY").expect_or_log("Missing E621_API_KEY");
let auth = (login, Some(api_key)); let auth = (login, Some(api_key));
let client = reqwest::ClientBuilder::default() let client = reqwest::ClientBuilder::default()
.user_agent(USER_AGENT) .user_agent(USER_AGENT)
.build()?; .build()?;
let mut conn = let mut conn = sqlx::PgConnection::connect(
sqlx::PgConnection::connect(&std::env::var("DATABASE_URL").expect("Missing DATABASE_URL")) &std::env::var("DATABASE_URL").expect_or_log("Missing DATABASE_URL"),
.await?; )
.await?;
let faktory_dsn = std::env::var("FAKTORY_URL").expect("Missing FAKTORY_URL"); let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn) let faktory = FaktoryClient::connect(faktory_dsn)
.await .await
.expect("Unable to connect to Faktory"); .expect_or_log("Unable to connect to Faktory");
let max_id: i32 = sqlx::query!("SELECT max(id) max FROM e621") let max_id: i32 = sqlx::query!("SELECT max(id) max FROM e621")
.fetch_one(&mut conn) .fetch_one(&mut conn)
@ -79,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
let _hist = INDEX_DURATION.start_timer(); let _hist = INDEX_DURATION.start_timer();
let lid = get_latest_id(&client, &auth) let lid = get_latest_id(&client, &auth)
.await .await
.expect("Unable to get latest ID"); .expect_or_log("Unable to get latest ID");
drop(_hist); drop(_hist);
latest_id = Some(lid); latest_id = Some(lid);
@ -100,7 +95,7 @@ async fn main() -> anyhow::Result<()> {
min_id = match post_ids.iter().max() { min_id = match post_ids.iter().max() {
Some(id) => *id, Some(id) => *id,
None => { None => {
tracing::warn!("Found no new posts, sleeping"); tracing::info!("Found no new posts, sleeping");
tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await; tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await;
continue; continue;
} }
@ -359,7 +354,9 @@ async fn provide_metrics(
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let metric_families = prometheus::gather(); let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap(); encoder
.encode(&metric_families, &mut buffer)
.unwrap_or_log();
Ok(Response::new(Body::from(buffer))) Ok(Response::new(Body::from(buffer)))
} }
@ -376,11 +373,11 @@ async fn create_metrics_server() {
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(provide_metrics)) }); make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(provide_metrics)) });
let addr: SocketAddr = std::env::var("METRICS_HOST") let addr: SocketAddr = std::env::var("METRICS_HOST")
.expect("Missing METRICS_HOST") .expect_or_log("Missing METRICS_HOST")
.parse() .parse()
.expect("Invalid METRICS_HOST"); .expect_or_log("Invalid METRICS_HOST");
let server = Server::bind(&addr).serve(make_svc); let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move { server.await.expect("Metrics server error") }); tokio::spawn(async move { server.await.expect_or_log("Metrics server error") });
} }

View File

@ -17,7 +17,6 @@ prometheus = { version = "0.12", features = ["process"] }
lazy_static = "1" lazy_static = "1"
futures-retry = "0.6" futures-retry = "0.6"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2"
tracing-unwrap = "0.9" tracing-unwrap = "0.9"
faktory = "0.11" faktory = "0.11"
anyhow = "1" anyhow = "1"

View File

@ -270,14 +270,7 @@ async fn process_submission(
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) { fuzzysearch_common::init_logger();
tracing_subscriber::fmt::Subscriber::builder()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.init();
} else {
tracing_subscriber::fmt::init();
}
let (cookie_a, cookie_b) = ( let (cookie_a, cookie_b) = (
std::env::var("FA_A").expect_or_log("Missing FA_A"), std::env::var("FA_A").expect_or_log("Missing FA_A"),

View File

@ -8,7 +8,7 @@ edition = "2018"
anyhow = "1" anyhow = "1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-unwrap = "0.9"
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use tracing_unwrap::{OptionExt, ResultExt};
use fuzzysearch_common::faktory::FaktoryClient; use fuzzysearch_common::faktory::FaktoryClient;
@ -130,7 +131,7 @@ async fn process_submission(
tracing::debug!("Processing submission"); tracing::debug!("Processing submission");
let data = client let data = client
.get(&sub.media.submission.first().unwrap().url) .get(&sub.media.submission.first().unwrap_or_log().url)
.send() .send()
.await? .await?
.bytes() .bytes()
@ -158,7 +159,7 @@ async fn process_submission(
site: fuzzysearch_common::types::Site::Weasyl, site: fuzzysearch_common::types::Site::Weasyl,
site_id: sub.id, site_id: sub.id,
artist: sub.owner_login.clone(), artist: sub.owner_login.clone(),
file_url: sub.media.submission.first().unwrap().url.clone(), file_url: sub.media.submission.first().unwrap_or_log().url.clone(),
file_sha256: Some(result.to_vec()), file_sha256: Some(result.to_vec()),
hash: num.map(|hash| hash.to_be_bytes()), hash: num.map(|hash| hash.to_be_bytes()),
}) })
@ -195,39 +196,32 @@ async fn insert_null(
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) { fuzzysearch_common::init_logger();
tracing_subscriber::fmt::Subscriber::builder()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.init();
} else {
tracing_subscriber::fmt::init();
}
let api_key = std::env::var("WEASYL_APIKEY").unwrap(); let api_key = std::env::var("WEASYL_APIKEY").unwrap_or_log();
let pool = sqlx::postgres::PgPoolOptions::new() let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(2) .max_connections(2)
.connect(&std::env::var("DATABASE_URL").unwrap()) .connect(&std::env::var("DATABASE_URL").unwrap_or_log())
.await .await
.unwrap(); .unwrap_or_log();
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let faktory_dsn = std::env::var("FAKTORY_URL").expect("Missing FAKTORY_URL"); let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn) let faktory = FaktoryClient::connect(faktory_dsn)
.await .await
.expect("Unable to connect to Faktory"); .expect_or_log("Unable to connect to Faktory");
loop { loop {
let min = sqlx::query!("SELECT max(id) id FROM weasyl") let min = sqlx::query!("SELECT max(id) id FROM weasyl")
.fetch_one(&pool) .fetch_one(&pool)
.await .await
.unwrap() .unwrap_or_log()
.id .id
.unwrap_or_default(); .unwrap_or_default();
let max = load_frontpage(&client, &api_key).await.unwrap(); let max = load_frontpage(&client, &api_key).await.unwrap_or_log();
tracing::info!(min, max, "Calculated range of submissions to check"); tracing::info!(min, max, "Calculated range of submissions to check");
@ -235,16 +229,16 @@ async fn main() {
let row: Option<_> = sqlx::query!("SELECT id FROM weasyl WHERE id = $1", id) let row: Option<_> = sqlx::query!("SELECT id FROM weasyl WHERE id = $1", id)
.fetch_optional(&pool) .fetch_optional(&pool)
.await .await
.unwrap(); .unwrap_or_log();
if row.is_some() { if row.is_some() {
continue; continue;
} }
match load_submission(&client, &api_key, id).await.unwrap() { match load_submission(&client, &api_key, id).await.unwrap_or_log() {
(Some(sub), json) => process_submission(&pool, &client, &faktory, json, sub) (Some(sub), json) => process_submission(&pool, &client, &faktory, json, sub)
.await .await
.unwrap(), .unwrap_or_log(),
(None, body) => insert_null(&pool, body, id).await.unwrap(), (None, body) => insert_null(&pool, body, id).await.unwrap_or_log(),
} }
} }

View File

@ -6,7 +6,6 @@ edition = "2018"
[dependencies] [dependencies]
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2"
tracing-unwrap = "0.9" tracing-unwrap = "0.9"
thiserror = "1" thiserror = "1"

View File

@ -25,14 +25,7 @@ pub enum WebhookError {
} }
fn main() { fn main() {
if matches!(std::env::var("LOG_FMT").as_deref(), Ok("json")) { fuzzysearch_common::init_logger();
tracing_subscriber::fmt::Subscriber::builder()
.json()
.with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc3339())
.init();
} else {
tracing_subscriber::fmt::init();
}
tracing::info!("Starting..."); tracing::info!("Starting...");