Improve FurAffinity ingester (#5)

* Improve FurAffinity retry logic, add timeout.

* Only build images when pushing to master.

* Use tracing instead of directly printing messages.

* Make case consistent on messages.

* Use tracing instead of panics directly.

* Record users online.

* Extract submission handling to new function.
This commit is contained in:
Syfaro 2021-02-27 22:26:53 -05:00 committed by GitHub
parent 0e0b7e0ecb
commit 17a4d39a75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 193 deletions

View File

@ -47,8 +47,11 @@ steps:
when:
branch:
- master
event:
- push
paths:
- fuzzysearch/**
- Cargo.lock
- name: Build Ingester e621
image: plugins/docker
@ -64,8 +67,11 @@ steps:
when:
branch:
- master
event:
- push
paths:
- fuzzysearch-ingest-e621/**
- Cargo.lock
- name: Build Ingester FurAffinity
image: plugins/docker
@ -81,8 +87,11 @@ steps:
when:
branch:
- master
event:
- push
paths:
- fuzzysearch-ingest-furaffinity/**
- Cargo.lock
- name: Build Ingester Weasyl
image: plugins/docker
@ -98,11 +107,14 @@ steps:
when:
branch:
- master
event:
- push
paths:
- fuzzysearch-ingest-weasyl/**
- Cargo.lock
---
kind: signature
hmac: 82ee084d72d5ef16c8fb5b756ea1cce82691d3510305b0665c9c618aa9fb4d18
hmac: 6c87d24325f6646ff0fe06eaf62e94cf53508cdf73dc303607e7ec74a8b4486e
...

204
Cargo.lock generated
View File

@ -4,9 +4,9 @@ version = 3
[[package]]
name = "adler"
version = "0.2.3"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler32"
@ -368,14 +368,13 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
version = "0.9.2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d60ab4a8dba064f2fbb5aa270c28da5cf4bbd0e72dae1140a6b0353a779dbe00"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
"lazy_static",
"loom",
"memoffset",
"scopeguard",
]
@ -392,14 +391,13 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bae8f328835f8f5a6ceb6a7842a7f2d0c03692adb5c889347235d59194731fe3"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if 1.0.0",
"lazy_static",
"loom",
]
[[package]]
@ -579,7 +577,7 @@ dependencies = [
"cfg-if 1.0.0",
"crc32fast",
"libc",
"miniz_oxide 0.4.3",
"miniz_oxide 0.4.4",
]
[[package]]
@ -622,7 +620,7 @@ checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]]
name = "furaffinity-rs"
version = "0.1.0"
source = "git+https://github.com/Syfaro/furaffinity-rs#5c058bce332a700379e9f224680313c6cf9d1ee4"
source = "git+https://github.com/Syfaro/furaffinity-rs#238d0b4516f3a11008284c31f991ba063926bb5b"
dependencies = [
"cfscrape",
"chrono",
@ -648,9 +646,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150"
checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1"
dependencies = [
"futures-channel",
"futures-core",
@ -663,9 +661,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846"
checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939"
dependencies = [
"futures-core",
"futures-sink",
@ -673,15 +671,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65"
checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94"
[[package]]
name = "futures-executor"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9"
checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1"
dependencies = [
"futures-core",
"futures-task",
@ -690,15 +688,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"
checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59"
[[package]]
name = "futures-macro"
version = "0.3.12"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7"
dependencies = [
"proc-macro-hack",
"proc-macro2",
@ -707,25 +705,33 @@ dependencies = [
]
[[package]]
name = "futures-sink"
version = "0.3.12"
name = "futures-retry"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6"
[[package]]
name = "futures-task"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86"
checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f"
dependencies = [
"once_cell",
"futures",
"pin-project-lite",
"tokio",
]
[[package]]
name = "futures-util"
version = "0.3.12"
name = "futures-sink"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3"
[[package]]
name = "futures-task"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80"
[[package]]
name = "futures-util"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1"
dependencies = [
"futures-channel",
"futures-core",
@ -814,6 +820,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"furaffinity-rs",
"futures-retry",
"hyper",
"lazy_static",
"postgres",
@ -823,6 +830,9 @@ dependencies = [
"reqwest",
"tokio",
"tokio-postgres",
"tracing",
"tracing-subscriber",
"tracing-unwrap",
]
[[package]]
@ -850,19 +860,6 @@ dependencies = [
"byteorder",
]
[[package]]
name = "generator"
version = "0.6.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc"
dependencies = [
"cc",
"libc",
"log",
"rustc_version",
"winapi",
]
[[package]]
name = "generic-array"
version = "0.12.3"
@ -942,9 +939,9 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "h2"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b67e66362108efccd8ac053abafc8b7a8d86a37e6e48fc4f6f7485eb5e9e6a5"
checksum = "d832b01df74254fe364568d6ddc294443f61cbec82816b60904303af87efae78"
dependencies = [
"bytes",
"fnv",
@ -957,7 +954,6 @@ dependencies = [
"tokio",
"tokio-util",
"tracing",
"tracing-futures",
]
[[package]]
@ -1278,9 +1274,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.47"
version = "0.3.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cfb73131c35423a367daf8cbd24100af0d077668c8c2943f0e7dd775fef0f65"
checksum = "dc9f84f9b115ce7843d60706df1422a916680bfdfcbdb0447c5614ff9d7e4d78"
dependencies = [
"wasm-bindgen",
]
@ -1344,17 +1340,6 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "loom"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d44c73b4636e497b4917eb21c33539efa3816741a2d3ff26c6316f1b529481a4"
dependencies = [
"cfg-if 1.0.0",
"generator",
"scoped-tls",
]
[[package]]
name = "mac"
version = "0.1.1"
@ -1458,9 +1443,9 @@ dependencies = [
[[package]]
name = "miniz_oxide"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f2d26ec3309788e423cfbf68ad1800f061638098d76a83681af979dc4eda19d"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
@ -1468,9 +1453,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.7.8"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc250d6848c90d719ea2ce34546fb5df7af1d3fd189d10bf7bad80bfcebecd95"
checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a"
dependencies = [
"libc",
"log",
@ -1632,9 +1617,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.5.2"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
checksum = "10acf907b94fc1b1a152d08ef97e7759650268cf986bf127f387e602b02c7e5a"
[[package]]
name = "opaque-debug"
@ -2305,15 +2290,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
]
[[package]]
name = "rustdct"
version = "0.4.0"
@ -2403,9 +2379,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.0.0"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1759c2e3c8580017a484a7ac56d3abc5a6c1feadf88db2f3633f12ae4268c69"
checksum = "2dfd318104249865096c8da1dfabf09ddbb6d0330ea176812a62ec75e40c4166"
dependencies = [
"bitflags",
"core-foundation",
@ -2416,9 +2392,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.0.0"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f99b9d5e26d2a71633cc4f2ebae7cc9f874044e0c351a27e17892d76dce5678b"
checksum = "dee48cdde5ed250b0d3252818f646e174ab414036edb884dde62d80a3ac6082d"
dependencies = [
"core-foundation-sys",
"libc",
@ -2444,21 +2420,6 @@ dependencies = [
"thin-slice",
]
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.123"
@ -2481,9 +2442,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.62"
version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea1c6153794552ea7cf7cf63b1231a25de00ec90db326ba6264440fa08e31486"
checksum = "43535db9747a4ba938c0ce0a98cc631a46ebf943c9e1d604e091df6007620bf6"
dependencies = [
"indexmap",
"itoa",
@ -2879,7 +2840,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a53f4706d65497df0c4349241deddf35f84cee19c87ed86ea8ca590f4464437"
dependencies = [
"jpeg-decoder",
"miniz_oxide 0.4.3",
"miniz_oxide 0.4.4",
"weezl",
]
@ -3018,9 +2979,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.24"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f77d3842f76ca899ff2dbcf231c5c65813dea431301d6eb686279c15c4464f12"
checksum = "01ebdc2bb4498ab1ab5f5b73c5803825e60199229ccba0698170e3be0e7f959f"
dependencies = [
"cfg-if 1.0.0",
"log",
@ -3116,6 +3077,15 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracing-unwrap"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7908a5f1475756bd7f81d6fe7e3607f13b33562c3ec8f9fb7502df790350f74a"
dependencies = [
"tracing",
]
[[package]]
name = "transpose"
version = "0.1.0"
@ -3313,9 +3283,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.70"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55c0f7123de74f0dab9b7d00fd614e7b19349cd1e2f5252bbe9b1754b59433be"
checksum = "7ee1280240b7c461d6a0071313e08f34a60b0365f14260362e5a2b17d1d31aa7"
dependencies = [
"cfg-if 1.0.0",
"serde",
@ -3325,9 +3295,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.70"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bc45447f0d4573f3d65720f636bbcc3dd6ce920ed704670118650bcd47764c7"
checksum = "5b7d8b6942b8bb3a9b0e73fc79b98095a27de6fa247615e59d096754a3bc2aa8"
dependencies = [
"bumpalo",
"lazy_static",
@ -3340,9 +3310,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.20"
version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3de431a2910c86679c34283a33f66f4e4abd7e0aec27b6669060148872aadf94"
checksum = "8e67a5806118af01f0d9045915676b22aaebecf4178ae7021bc171dab0b897ab"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
@ -3352,9 +3322,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.70"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b8853882eef39593ad4174dd26fc9865a64e84026d223f63bb2c42affcbba2c"
checksum = "e5ac38da8ef716661f0f36c0d8320b89028efe10c7c0afde65baffb496ce0d3b"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -3362,9 +3332,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.70"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4133b5e7f2a531fa413b3a1695e925038a05a71cf67e87dafa295cb645a01385"
checksum = "cc053ec74d454df287b9374ee8abb36ffd5acb95ba87da3ba5b7d3fe20eb401e"
dependencies = [
"proc-macro2",
"quote",
@ -3375,15 +3345,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.70"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd4945e4943ae02d15c13962b38a5b1e81eadd4b71214eee75af64a4d6a4fd64"
checksum = "7d6f8ec44822dd71f5f221a5847fb34acd9060535c1211b70a05844c0f6383b1"
[[package]]
name = "web-sys"
version = "0.3.47"
version = "0.3.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c40dc691fc48003eba817c38da7113c15698142da971298003cac3ef175680b3"
checksum = "ec600b26223b2948cedfde2a0aa6756dcf1fef616f43d7b3097aaf53a6c4d92b"
dependencies = [
"js-sys",
"wasm-bindgen",

View File

@ -15,6 +15,10 @@ chrono = "0.4"
hyper = { version = "0.14", features = ["server"] }
prometheus = { version = "0.11", features = ["process"] }
lazy_static = "1"
futures-retry = "0.6"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-unwrap = "0.9"
[dependencies.furaffinity-rs]
git = "https://github.com/Syfaro/furaffinity-rs"

View File

@ -1,19 +1,26 @@
use lazy_static::lazy_static;
use tokio_postgres::Client;
use tracing_unwrap::{OptionExt, ResultExt};
lazy_static! {
static ref SUBMISSION_DURATION: prometheus::Histogram = prometheus::register_histogram!(
"fuzzysearch_watcher_fa_processing_seconds",
"Duration to process a submission"
)
.unwrap();
.unwrap_or_log();
static ref USERS_ONLINE: prometheus::IntGaugeVec = prometheus::register_int_gauge_vec!(
"fuzzysearch_watcher_fa_users_online_count",
"Number of users online for each category",
&["group"]
)
.unwrap_or_log();
}
async fn lookup_tag(client: &Client, tag: &str) -> i32 {
if let Some(row) = client
.query("SELECT id FROM tag WHERE name = $1", &[&tag])
.await
.unwrap()
.unwrap_or_log()
.into_iter()
.next()
{
@ -23,10 +30,10 @@ async fn lookup_tag(client: &Client, tag: &str) -> i32 {
client
.query("INSERT INTO tag (name) VALUES ($1) RETURNING id", &[&tag])
.await
.unwrap()
.unwrap_or_log()
.into_iter()
.next()
.unwrap()
.unwrap_or_log()
.get("id")
}
@ -34,7 +41,7 @@ async fn lookup_artist(client: &Client, artist: &str) -> i32 {
if let Some(row) = client
.query("SELECT id FROM artist WHERE name = $1", &[&artist])
.await
.unwrap()
.unwrap_or_log()
.into_iter()
.next()
{
@ -47,10 +54,10 @@ async fn lookup_artist(client: &Client, artist: &str) -> i32 {
&[&artist],
)
.await
.unwrap()
.unwrap_or_log()
.into_iter()
.next()
.unwrap()
.unwrap_or_log()
.get("id")
}
@ -58,14 +65,14 @@ async fn has_submission(client: &Client, id: i32) -> bool {
client
.query("SELECT id FROM submission WHERE id = $1", &[&id])
.await
.expect("unable to run query")
.unwrap_or_log()
.into_iter()
.next()
.is_some()
}
async fn ids_to_check(client: &Client, max: i32) -> Vec<i32> {
let rows = client.query("SELECT sid FROM generate_series((SELECT max(id) FROM submission), $1::int) sid WHERE sid NOT IN (SELECT id FROM submission where id = sid)", &[&max]).await.unwrap();
let rows = client.query("SELECT sid FROM generate_series((SELECT max(id) FROM submission), $1::int) sid WHERE sid NOT IN (SELECT id FROM submission where id = sid)", &[&max]).await.unwrap_or_log();
rows.iter().map(|row| row.get("sid")).collect()
}
@ -119,7 +126,9 @@ async fn request(
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
encoder
.encode(&metric_families, &mut buffer)
.unwrap_or_log();
Ok(hyper::Response::new(hyper::Body::from(buffer)))
}
@ -135,102 +144,167 @@ async fn request(
async fn web() {
use hyper::service::{make_service_fn, service_fn};
let addr: std::net::SocketAddr = std::env::var("HTTP_HOST").unwrap().parse().unwrap();
let addr: std::net::SocketAddr = std::env::var("HTTP_HOST")
.expect_or_log("Missing HTTP_HOST")
.parse()
.unwrap_or_log();
let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(request)) });
let server = hyper::Server::bind(&addr).serve(service);
println!("Listening on http://{}", addr);
tracing::info!("Listening on http://{}", addr);
server.await.unwrap();
server.await.unwrap_or_log();
}
#[tokio::main]
async fn main() {
let (cookie_a, cookie_b) = (
std::env::var("FA_A").expect("missing fa cookie a"),
std::env::var("FA_B").expect("missing fa cookie b"),
);
struct RetryHandler {
max_attempts: usize,
}
let user_agent = std::env::var("USER_AGENT").expect("missing user agent");
impl RetryHandler {
fn new(max_attempts: usize) -> Self {
Self { max_attempts }
}
}
let fa = furaffinity_rs::FurAffinity::new(cookie_a, cookie_b, user_agent);
impl futures_retry::ErrorHandler<furaffinity_rs::Error> for RetryHandler {
type OutError = furaffinity_rs::Error;
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
#[tracing::instrument(skip(self), fields(max_attempts = self.max_attempts))]
fn handle(
&mut self,
attempt: usize,
err: furaffinity_rs::Error,
) -> futures_retry::RetryPolicy<Self::OutError> {
tracing::warn!("Attempt failed");
let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
if attempt >= self.max_attempts {
tracing::error!("All attempts have been used");
return futures_retry::RetryPolicy::ForwardError(err);
}
if !err.retry {
tracing::error!("Error did not ask for retry");
return futures_retry::RetryPolicy::ForwardError(err);
}
futures_retry::RetryPolicy::WaitRetry(std::time::Duration::from_secs(1 + attempt as u64))
}
}
#[tracing::instrument(skip(client, fa))]
async fn process_submission(client: &Client, fa: &furaffinity_rs::FurAffinity, id: i32) {
if has_submission(&client, id).await {
return;
}
tracing::info!("Loading submission");
let _timer = SUBMISSION_DURATION.start_timer();
let sub = futures_retry::FutureRetry::new(|| fa.get_submission(id), RetryHandler::new(3))
.await
.unwrap();
.map(|(sub, _attempts)| sub)
.map_err(|(err, _attempts)| err);
tokio::spawn(async move {
if let Err(e) = connection.await {
panic!("postgres connection error: {:?}", e);
}
});
tokio::spawn(async move { web().await });
println!("Started");
'main: loop {
println!("Fetching latest ID");
let latest_id = fa.latest_id().await.expect("unable to get latest id");
for id in ids_to_check(&client, latest_id).await {
'attempt: for attempt in 0..3 {
if !has_submission(&client, id).await {
println!("loading submission {}", id);
let timer = SUBMISSION_DURATION.start_timer();
let sub = match fa.get_submission(id).await {
let sub = match sub {
Ok(sub) => sub,
Err(e) => {
println!("got error: {:?}, retry {}", e.message, e.retry);
timer.stop_and_discard();
if e.retry {
tokio::time::sleep(std::time::Duration::from_secs(attempt + 1))
.await;
continue 'attempt;
} else {
println!("unrecoverable, exiting");
break 'main;
}
Err(err) => {
tracing::error!("Failed to load submission: {:?}", err);
_timer.stop_and_discard();
insert_null_submission(&client, id).await.unwrap_or_log();
return;
}
};
let sub = match sub {
Some(sub) => sub,
None => {
println!("did not exist");
timer.stop_and_discard();
insert_null_submission(&client, id).await.unwrap();
break 'attempt;
tracing::warn!("Submission did not exist");
_timer.stop_and_discard();
insert_null_submission(&client, id).await.unwrap_or_log();
return;
}
};
let sub = match fa.calc_image_hash(sub.clone()).await {
let image =
futures_retry::FutureRetry::new(|| fa.calc_image_hash(sub.clone()), RetryHandler::new(3))
.await
.map(|(sub, _attempt)| sub)
.map_err(|(err, _attempt)| err);
let sub = match image {
Ok(sub) => sub,
Err(e) => {
println!("unable to hash image: {:?}", e);
Err(err) => {
tracing::error!("Unable to hash submission image: {:?}", err);
sub
}
};
timer.stop_and_record();
_timer.stop_and_record();
insert_submission(&client, &sub).await.unwrap();
insert_submission(&client, &sub).await.unwrap_or_log();
}
break 'attempt;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let (cookie_a, cookie_b) = (
std::env::var("FA_A").expect_or_log("Missing FA_A"),
std::env::var("FA_B").expect_or_log("Missing FA_B"),
);
let user_agent = std::env::var("USER_AGENT").expect_or_log("Missing USER_AGENT");
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_log();
let fa = furaffinity_rs::FurAffinity::new(cookie_a, cookie_b, user_agent, Some(client));
let dsn = std::env::var("POSTGRES_DSN").expect_or_log("Missing POSTGRES_DSN");
let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
.await
.unwrap_or_log();
tokio::spawn(async move {
if let Err(e) = connection.await {
panic!("PostgreSQL connection error: {:?}", e);
}
});
tokio::spawn(async move { web().await });
tracing::info!("Started");
loop {
tracing::debug!("Fetching latest ID... ");
let latest_id = fa
.latest_id()
.await
.expect_or_log("Unable to get latest id");
tracing::info!(latest_id = latest_id.0, "Got latest ID");
let online = latest_id.1;
tracing::debug!(?online, "Got updated users online");
USERS_ONLINE
.with_label_values(&["guest"])
.set(online.guests as i64);
USERS_ONLINE
.with_label_values(&["registered"])
.set(online.registered as i64);
USERS_ONLINE
.with_label_values(&["other"])
.set(online.other as i64);
for id in ids_to_check(&client, latest_id.0).await {
process_submission(&client, &fa, id).await;
}
println!("ran out of attempts");
}
}
println!("completed fetch, waiting a minute before loading more");
tracing::info!("Completed fetch, waiting a minute before loading more");
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
}