Add worker for refreshing submission data (#11)

* Initial attempt at a worker to refresh old data.

* Build refresh image on non-master branches.

* Allow specifying missing batch sizes.

* Store more data for deleted submissions.

* Add refresh to builds.

* Update furaffinity-rs dependency.

* Update furaffinity-rs, again.

* Update refresh Dockerfile to avoid extra build.

* Update faktory dependency.

* Update deleted flag migration order.
This commit is contained in:
Syfaro 2021-08-22 01:48:05 -04:00 committed by GitHub
parent af40099773
commit 15ab9563e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 555 additions and 10 deletions

View File

@ -102,6 +102,16 @@ build:ingest-weasyl:
- cargo build --verbose --release --bin fuzzysearch-ingest-weasyl - cargo build --verbose --release --bin fuzzysearch-ingest-weasyl
- mv ./target/release/fuzzysearch-ingest-weasyl ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl - mv ./target/release/fuzzysearch-ingest-weasyl ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl
build:refresh:
<<: *base_build
artifacts:
expire_in: 1 day
paths:
- ./fuzzysearch-refresh/fuzzysearch-refresh
script:
- cargo build --verbose --release --bin fuzzysearch-refresh
- mv ./target/release/fuzzysearch-refresh ./fuzzysearch-refresh/fuzzysearch-refresh
images:api: &base_images images:api: &base_images
stage: image stage: image
image: image:
@ -144,3 +154,9 @@ images:ingest-weasyl:
needs: ['build:ingest-weasyl'] needs: ['build:ingest-weasyl']
script: script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-weasyl/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-weasyl:latest --cache=true - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-weasyl/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-weasyl:latest --cache=true
images:refresh:
<<: *base_images
needs: ['build:refresh']
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-refresh/Dockerfile --destination $CI_REGISTRY_IMAGE/refresh:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/refresh:latest --cache=true

22
Cargo.lock generated
View File

@ -927,7 +927,7 @@ checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]] [[package]]
name = "furaffinity-rs" name = "furaffinity-rs"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/Syfaro/furaffinity-rs?branch=main#dffa06f1bf803e3a327e5b0d757f20228c68f63d" source = "git+https://github.com/Syfaro/furaffinity-rs#dffa06f1bf803e3a327e5b0d757f20228c68f63d"
dependencies = [ dependencies = [
"chrono", "chrono",
"image", "image",
@ -1209,6 +1209,25 @@ dependencies = [
"tracing-unwrap", "tracing-unwrap",
] ]
[[package]]
name = "fuzzysearch-refresh"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"faktory",
"furaffinity-rs",
"futures",
"fuzzysearch-common",
"reqwest",
"sqlx",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"tracing-unwrap",
]
[[package]] [[package]]
name = "fuzzysearch-webhook" name = "fuzzysearch-webhook"
version = "0.1.0" version = "0.1.0"
@ -2958,6 +2977,7 @@ dependencies = [
"bitflags", "bitflags",
"byteorder", "byteorder",
"bytes", "bytes",
"chrono",
"crc", "crc",
"crossbeam-channel", "crossbeam-channel",
"crossbeam-queue", "crossbeam-queue",

View File

@ -9,6 +9,8 @@ members = [
"fuzzysearch-ingest-e621", "fuzzysearch-ingest-e621",
"fuzzysearch-ingest-furaffinity", "fuzzysearch-ingest-furaffinity",
"fuzzysearch-ingest-weasyl", "fuzzysearch-ingest-weasyl",
"fuzzysearch-refresh",
] ]
[profile.dev.package."*"] [profile.dev.package."*"]

View File

@ -20,7 +20,4 @@ anyhow = "1"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] } fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] }
furaffinity-rs = { git = "https://github.com/Syfaro/furaffinity-rs" }
[dependencies.furaffinity-rs]
git = "https://github.com/Syfaro/furaffinity-rs"
branch = "main"

View File

@ -294,16 +294,15 @@ async fn main() {
tracing::info!("Started"); tracing::info!("Started");
loop { loop {
let duration = INDEX_DURATION.start_timer();
tracing::debug!("Fetching latest ID... "); tracing::debug!("Fetching latest ID... ");
let latest_id = fa let duration = INDEX_DURATION.start_timer();
let (latest_id, online) = fa
.latest_id() .latest_id()
.await .await
.expect_or_log("Unable to get latest id"); .expect_or_log("Unable to get latest id");
duration.stop_and_record(); duration.stop_and_record();
tracing::info!(latest_id = latest_id.0, "Got latest ID"); tracing::info!(latest_id = latest_id, "Got latest ID");
let online = latest_id.1;
tracing::debug!(?online, "Got updated users online"); tracing::debug!(?online, "Got updated users online");
USERS_ONLINE USERS_ONLINE
.with_label_values(&["guest"]) .with_label_values(&["guest"])
@ -315,7 +314,7 @@ async fn main() {
.with_label_values(&["other"]) .with_label_values(&["other"])
.set(online.other as i64); .set(online.other as i64);
for id in ids_to_check(&client, latest_id.0).await { for id in ids_to_check(&client, latest_id).await {
process_submission(&client, &fa, &faktory, id, &download_folder).await; process_submission(&client, &fa, &faktory, id, &download_folder).await;
} }

View File

@ -0,0 +1,25 @@
[package]
name = "fuzzysearch-refresh"
version = "0.1.0"
authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018"
[dependencies]
tracing = "0.1"
tracing-unwrap = "0.9"
anyhow = "1"
thiserror = "1"
tokio = "1"
tokio-stream = "0.1"
futures = "0.3"
faktory = "0.11"
sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline", "chrono"] }
chrono = "0.4"
reqwest = "0.11"
furaffinity-rs = { git = "https://github.com/Syfaro/furaffinity-rs" }
fuzzysearch-common = { path = "../fuzzysearch-common" }

View File

@ -0,0 +1,4 @@
FROM debian:buster-slim
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
COPY ./fuzzysearch-refresh/fuzzysearch-refresh /bin/fuzzysearch-refresh
CMD ["/bin/fuzzysearch-refresh"]

View File

@ -0,0 +1,168 @@
{
"db": "PostgreSQL",
"36c9a44cf5d7e004912ae72b7f1e3550deb0531aa07144c3cef140381da9bc97": {
"query": "SELECT id FROM artist WHERE name = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
}
},
"58683afdbc3906ed624e0daf3acec7079db9bc455b30d647d932a35838419b1b": {
"query": "SELECT id FROM submission",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
}
},
"8fb99c8859fdcc51f095ba191924f3a336358ce6a6e5223e86f8b15cd7ec7f37": {
"query": "INSERT INTO submission (id, updated_at, deleted) VALUES ($1, current_timestamp, true) ON CONFLICT (id) DO UPDATE SET deleted = true",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": []
}
},
"921fcab0b8fed99671fe84fe1b011650b7fa4cfaae3843a5a724f928db4c9734": {
"query": "SELECT id FROM tag WHERE name = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
}
},
"a1dff4a02afe1a8a3ffd42cf86b557709fdb0994518b72342f15f9535f5b6a02": {
"query": "INSERT INTO submission\n (id, artist_id, url, filename, hash, rating, posted_at, description, hash_int, file_id, file_size, file_sha256, updated_at) VALUES\n ($1, $2, $3, $4, decode($5, 'base64'), $6, $7, $8, $9, CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, $10, $11, current_timestamp)\n ON CONFLICT (id) DO UPDATE SET url = $3, filename = $4, hash = decode($5, 'base64'), rating = $6, description = $8, hash_int = $9, file_id = CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, file_size = $10, file_sha256 = $11, updated_at = current_timestamp",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Int4",
"Text",
"Text",
"Text",
"Bpchar",
"Timestamptz",
"Text",
"Int8",
"Int4",
"Bytea"
]
},
"nullable": []
}
},
"a6d0113ac38781a41a717aee7e28940b7f362951402bec50bc54932a6939b217": {
"query": "INSERT INTO artist (name) VALUES ($1) RETURNING id",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
}
},
"b9323d762b487be18d991f84cfde591c7b33e0a2530be186ab77ad802781772e": {
"query": "SELECT updated_at FROM submission WHERE id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "updated_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": [
true
]
}
},
"cb877c1de895efa7753b25f401036ae61711d95a1c1db233580b50fb36eec0cb": {
"query": "INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Int4"
]
},
"nullable": []
}
},
"f9dfb3a7414c35f112dc30d053fdc546ec4776761346db98982858ddf3afb6d3": {
"query": "INSERT INTO tag (name) VALUES ($1) RETURNING id",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
}
}
}

View File

@ -0,0 +1,308 @@
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use furaffinity_rs::FurAffinity;
use tracing_unwrap::ResultExt;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
enum Error {
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
#[error("missing data: {0}")]
MissingData(&'static str),
#[error("furaffinity error")]
FurAffinity(furaffinity_rs::Error),
#[error("faktory error")]
Faktory,
}
static FURAFFINITY_QUEUE: &str = "fuzzysearch_refresh_furaffinity";
type Producer = Arc<Mutex<faktory::Producer<TcpStream>>>;
type Db = sqlx::Pool<sqlx::Postgres>;
fn main() {
fuzzysearch_common::init_logger();
tracing::info!("initializing");
let rt = Arc::new(tokio::runtime::Runtime::new().unwrap());
let mut faktory = faktory::ConsumerBuilder::default();
faktory.workers(2);
let p = Arc::new(Mutex::new(faktory::Producer::connect(None).unwrap_or_log()));
let pool = rt
.block_on(
sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.connect(&std::env::var("DATABASE_URL").unwrap_or_log()),
)
.unwrap_or_log();
let (cookie_a, cookie_b) = (
std::env::var("FA_A").unwrap_or_log(),
std::env::var("FA_B").unwrap_or_log(),
);
let user_agent = std::env::var("USER_AGENT").unwrap_or_log();
let client = reqwest::Client::new();
let fa = Arc::new(FurAffinity::new(
cookie_a,
cookie_b,
user_agent,
Some(client),
));
rt.spawn(poll_fa_online(fa.clone(), p.clone()));
let rt_clone = rt.clone();
let pool_clone = pool.clone();
faktory.register("furaffinity_load", move |job| -> Result<(), Error> {
use std::convert::TryFrom;
let id = job
.args()
.iter()
.next()
.ok_or(Error::MissingData("submission id"))?
.as_i64()
.ok_or(Error::MissingData("submission id"))?;
let id = i32::try_from(id).map_err(|_| Error::MissingData("invalid id"))?;
let last_updated = rt_clone
.block_on(
sqlx::query_scalar!("SELECT updated_at FROM submission WHERE id = $1", id)
.fetch_optional(&pool_clone),
)?
.flatten();
if let Some(last_updated) = last_updated {
let diff = last_updated.signed_duration_since(chrono::Utc::now());
if diff.num_days() < 30 {
tracing::warn!("attempted to check recent submission, skipping");
return Ok(());
}
}
let sub = rt_clone
.block_on(fa.get_submission(id))
.map_err(Error::FurAffinity)?;
tracing::debug!("loaded furaffinity submission");
rt_clone.block_on(update_furaffinity_submission(
pool_clone.clone(),
fa.clone(),
id,
sub,
))?;
Ok(())
});
faktory.register(
"furaffinity_calculate_missing",
move |job| -> Result<(), Error> {
use std::collections::HashSet;
let batch_size = job
.args()
.iter()
.next()
.map(|arg| arg.as_i64())
.flatten()
.unwrap_or(1_000);
tracing::debug!(batch_size, "calculating missing submissions");
let known_ids: HashSet<_> = rt
.block_on(sqlx::query_scalar!("SELECT id FROM submission").fetch_all(&pool))?
.into_iter()
.collect();
let all_ids: HashSet<_> = (1..=*known_ids.iter().max().unwrap_or(&1)).collect();
let missing_ids: Vec<_> = all_ids
.difference(&known_ids)
.take(batch_size as usize)
.collect();
tracing::info!(
missing = missing_ids.len(),
"enqueueing batch of missing submissions"
);
let mut p = p.lock().unwrap_or_log();
for id in missing_ids {
let job =
faktory::Job::new("furaffinity_load", vec![*id]).on_queue(FURAFFINITY_QUEUE);
p.enqueue(job).map_err(|_err| Error::Faktory)?;
}
Ok(())
},
);
let faktory = faktory.connect(None).unwrap_or_log();
tracing::info!("starting to run queues");
faktory.run_to_completion(&["fuzzysearch_refresh", FURAFFINITY_QUEUE]);
}
/// Check the number of users on FurAffinity every minute and control if queues
/// are allowed to run.
async fn poll_fa_online(fa: Arc<FurAffinity>, p: Producer) {
use futures::StreamExt;
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
let max_online = std::env::var("MAX_ONLINE")
.ok()
.and_then(|num| num.parse().ok())
.unwrap_or(10_000);
tracing::info!(max_online, "got max fa users online before pause");
// Ensure initial state of the queue being enabled.
{
let p = p.clone();
tokio::task::spawn_blocking(move || {
let mut p = p.lock().unwrap_or_log();
p.queue_resume(&[FURAFFINITY_QUEUE]).unwrap_or_log();
})
.await
.expect_or_log("could not set initial queue state");
}
let queue_state = AtomicBool::new(true);
IntervalStream::new(interval(Duration::from_secs(300)))
.for_each(|_| {
let p = p.clone();
async {
let continue_queue = match fa.latest_id().await {
Ok((_latest_id, online)) => {
tracing::debug!(registered = online.registered, "got updated fa online");
online.registered < max_online
}
Err(err) => {
tracing::error!("unable to get fa online: {:?}", err);
false
}
};
if queue_state.load(Ordering::SeqCst) == continue_queue {
tracing::trace!("fa queue was already in correct state");
return;
}
tracing::info!(continue_queue, "updating fa queue state");
let result = tokio::task::spawn_blocking(move || {
let mut p = p.lock().unwrap_or_log();
if continue_queue {
p.queue_resume(&[FURAFFINITY_QUEUE])
} else {
p.queue_pause(&[FURAFFINITY_QUEUE])
}
})
.await;
match result {
Err(err) => tracing::error!("unable to join queue change: {:?}", err),
Ok(Err(err)) => tracing::error!("unable to change fa queue state: {:?}", err),
_ => queue_state.store(continue_queue, Ordering::SeqCst),
}
}
})
.await;
}
async fn get_furaffinity_artist(db: &Db, artist: &str) -> Result<i32, sqlx::Error> {
if let Some(id) = sqlx::query_scalar!("SELECT id FROM artist WHERE name = $1", artist)
.fetch_optional(db)
.await?
{
return Ok(id);
}
sqlx::query_scalar!("INSERT INTO artist (name) VALUES ($1) RETURNING id", artist)
.fetch_one(db)
.await
}
async fn get_furaffinity_tag(db: &Db, tag: &str) -> Result<i32, sqlx::Error> {
if let Some(id) = sqlx::query_scalar!("SELECT id FROM tag WHERE name = $1", tag)
.fetch_optional(db)
.await?
{
return Ok(id);
}
sqlx::query_scalar!("INSERT INTO tag (name) VALUES ($1) RETURNING id", tag)
.fetch_one(db)
.await
}
async fn associate_furaffinity_tag(db: &Db, id: i32, tag_id: i32) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
tag_id,
id
)
.execute(db)
.await
.map(|_| ())
}
async fn update_furaffinity_submission(
db: Db,
fa: Arc<FurAffinity>,
id: i32,
sub: Option<furaffinity_rs::Submission>,
) -> Result<(), Error> {
let sub = match sub {
Some(sub) => sub,
None => {
tracing::info!(id, "furaffinity submission did not exist");
sqlx::query!("INSERT INTO submission (id, updated_at, deleted) VALUES ($1, current_timestamp, true) ON CONFLICT (id) DO UPDATE SET deleted = true", id).execute(&db).await?;
return Ok(());
}
};
let sub = fa.calc_image_hash(sub).await.map_err(Error::FurAffinity)?;
let artist_id = get_furaffinity_artist(&db, &sub.artist).await?;
let mut tag_ids = Vec::with_capacity(sub.tags.len());
for tag in &sub.tags {
tag_ids.push(get_furaffinity_tag(&db, tag).await?);
}
let hash = sub.hash.clone();
let url = sub.content.url();
let size = sub.file_size.map(|size| size as i32);
sqlx::query!(
"INSERT INTO submission
(id, artist_id, url, filename, hash, rating, posted_at, description, hash_int, file_id, file_size, file_sha256, updated_at) VALUES
($1, $2, $3, $4, decode($5, 'base64'), $6, $7, $8, $9, CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, $10, $11, current_timestamp)
ON CONFLICT (id) DO UPDATE SET url = $3, filename = $4, hash = decode($5, 'base64'), rating = $6, description = $8, hash_int = $9, file_id = CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, file_size = $10, file_sha256 = $11, updated_at = current_timestamp",
sub.id, artist_id, url, sub.filename, hash, sub.rating.serialize(), sub.posted_at, sub.description, sub.hash_num, size, sub.file_sha256,
)
.execute(&db).await?;
for tag_id in tag_ids {
associate_furaffinity_tag(&db, id, tag_id).await?;
}
Ok(())
}

View File

@ -0,0 +1,3 @@
ALTER TABLE submission DROP COLUMN deleted;
ALTER TABLE e621 DROP COLUMN deleted;
ALTER TABLE weasyl DROP COLUMN deleted;

View File

@ -0,0 +1,3 @@
ALTER TABLE submission ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE e621 ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE weasyl ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT false;