diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3b6963c..c34b38b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -102,6 +102,16 @@ build:ingest-weasyl: - cargo build --verbose --release --bin fuzzysearch-ingest-weasyl - mv ./target/release/fuzzysearch-ingest-weasyl ./fuzzysearch-ingest-weasyl/fuzzysearch-ingest-weasyl +build:refresh: + <<: *base_build + artifacts: + expire_in: 1 day + paths: + - ./fuzzysearch-refresh/fuzzysearch-refresh + script: + - cargo build --verbose --release --bin fuzzysearch-refresh + - mv ./target/release/fuzzysearch-refresh ./fuzzysearch-refresh/fuzzysearch-refresh + images:api: &base_images stage: image image: @@ -144,3 +154,9 @@ images:ingest-weasyl: needs: ['build:ingest-weasyl'] script: - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-ingest-weasyl/Dockerfile --destination $CI_REGISTRY_IMAGE/ingest-weasyl:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/ingest-weasyl:latest --cache=true + +images:refresh: + <<: *base_images + needs: ['build:refresh'] + script: + - /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/fuzzysearch-refresh/Dockerfile --destination $CI_REGISTRY_IMAGE/refresh:$CI_COMMIT_SHA --destination $CI_REGISTRY_IMAGE/refresh:latest --cache=true diff --git a/Cargo.lock b/Cargo.lock index a5c9513..ffbca05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,7 +927,7 @@ checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" [[package]] name = "furaffinity-rs" version = "0.1.0" -source = "git+https://github.com/Syfaro/furaffinity-rs?branch=main#dffa06f1bf803e3a327e5b0d757f20228c68f63d" +source = "git+https://github.com/Syfaro/furaffinity-rs#dffa06f1bf803e3a327e5b0d757f20228c68f63d" dependencies = [ "chrono", "image", @@ -1209,6 +1209,25 @@ dependencies = [ "tracing-unwrap", ] +[[package]] +name = "fuzzysearch-refresh" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "faktory", + "furaffinity-rs", + "futures", + "fuzzysearch-common", + "reqwest", + "sqlx", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "tracing-unwrap", +] + [[package]] name = "fuzzysearch-webhook" version = "0.1.0" @@ -2958,6 +2977,7 @@ dependencies = [ "bitflags", "byteorder", "bytes", + "chrono", "crc", "crossbeam-channel", "crossbeam-queue", diff --git a/Cargo.toml b/Cargo.toml index 1520568..411f7f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,8 @@ members = [ "fuzzysearch-ingest-e621", "fuzzysearch-ingest-furaffinity", "fuzzysearch-ingest-weasyl", + + "fuzzysearch-refresh", ] [profile.dev.package."*"] diff --git a/fuzzysearch-ingest-furaffinity/Cargo.toml b/fuzzysearch-ingest-furaffinity/Cargo.toml index 431cd33..aa4be73 100644 --- a/fuzzysearch-ingest-furaffinity/Cargo.toml +++ b/fuzzysearch-ingest-furaffinity/Cargo.toml @@ -20,7 +20,4 @@ anyhow = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] } - -[dependencies.furaffinity-rs] -git = "https://github.com/Syfaro/furaffinity-rs" -branch = "main" +furaffinity-rs = { git = "https://github.com/Syfaro/furaffinity-rs" } diff --git a/fuzzysearch-ingest-furaffinity/src/main.rs b/fuzzysearch-ingest-furaffinity/src/main.rs index eb7adc5..50302b0 100644 --- a/fuzzysearch-ingest-furaffinity/src/main.rs +++ b/fuzzysearch-ingest-furaffinity/src/main.rs @@ -294,16 +294,15 @@ async fn main() { tracing::info!("Started"); loop { - let duration = INDEX_DURATION.start_timer(); tracing::debug!("Fetching latest ID... "); - let latest_id = fa + let duration = INDEX_DURATION.start_timer(); + let (latest_id, online) = fa .latest_id() .await .expect_or_log("Unable to get latest id"); duration.stop_and_record(); - tracing::info!(latest_id = latest_id.0, "Got latest ID"); + tracing::info!(latest_id = latest_id, "Got latest ID"); - let online = latest_id.1; tracing::debug!(?online, "Got updated users online"); USERS_ONLINE .with_label_values(&["guest"]) @@ -315,7 +314,7 @@ async fn main() { .with_label_values(&["other"]) .set(online.other as i64); - for id in ids_to_check(&client, latest_id.0).await { + for id in ids_to_check(&client, latest_id).await { process_submission(&client, &fa, &faktory, id, &download_folder).await; } diff --git a/fuzzysearch-refresh/Cargo.toml b/fuzzysearch-refresh/Cargo.toml new file mode 100644 index 0000000..596daa0 --- /dev/null +++ b/fuzzysearch-refresh/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "fuzzysearch-refresh" +version = "0.1.0" +authors = ["Syfaro "] +edition = "2018" + +[dependencies] +tracing = "0.1" +tracing-unwrap = "0.9" +anyhow = "1" +thiserror = "1" + +tokio = "1" +tokio-stream = "0.1" +futures = "0.3" + +faktory = "0.11" +sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline", "chrono"] } + +chrono = "0.4" +reqwest = "0.11" + +furaffinity-rs = { git = "https://github.com/Syfaro/furaffinity-rs" } + +fuzzysearch-common = { path = "../fuzzysearch-common" } diff --git a/fuzzysearch-refresh/Dockerfile b/fuzzysearch-refresh/Dockerfile new file mode 100644 index 0000000..4a0110f --- /dev/null +++ b/fuzzysearch-refresh/Dockerfile @@ -0,0 +1,4 @@ +FROM debian:buster-slim +RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* +COPY ./fuzzysearch-refresh/fuzzysearch-refresh /bin/fuzzysearch-refresh +CMD ["/bin/fuzzysearch-refresh"] diff --git a/fuzzysearch-refresh/sqlx-data.json b/fuzzysearch-refresh/sqlx-data.json new file mode 100644 index 0000000..68609b5 --- /dev/null +++ b/fuzzysearch-refresh/sqlx-data.json @@ -0,0 +1,168 @@ +{ + "db": "PostgreSQL", + "36c9a44cf5d7e004912ae72b7f1e3550deb0531aa07144c3cef140381da9bc97": { + "query": "SELECT id FROM artist WHERE name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + } + }, + "58683afdbc3906ed624e0daf3acec7079db9bc455b30d647d932a35838419b1b": { + "query": "SELECT id FROM submission", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + } + }, + "8fb99c8859fdcc51f095ba191924f3a336358ce6a6e5223e86f8b15cd7ec7f37": { + "query": "INSERT INTO submission (id, updated_at, deleted) VALUES ($1, current_timestamp, true) ON CONFLICT (id) DO UPDATE SET deleted = true", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + } + }, + "921fcab0b8fed99671fe84fe1b011650b7fa4cfaae3843a5a724f928db4c9734": { + "query": "SELECT id FROM tag WHERE name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + } + }, + "a1dff4a02afe1a8a3ffd42cf86b557709fdb0994518b72342f15f9535f5b6a02": { + "query": "INSERT INTO submission\n (id, artist_id, url, filename, hash, rating, posted_at, description, hash_int, file_id, file_size, file_sha256, updated_at) VALUES\n ($1, $2, $3, $4, decode($5, 'base64'), $6, $7, $8, $9, CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, $10, $11, current_timestamp)\n ON CONFLICT (id) DO UPDATE SET url = $3, filename = $4, hash = decode($5, 'base64'), rating = $6, description = $8, hash_int = $9, file_id = CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, file_size = $10, file_sha256 = $11, updated_at = current_timestamp", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Int4", + "Text", + "Text", + "Text", + "Bpchar", + "Timestamptz", + "Text", + "Int8", + "Int4", + "Bytea" + ] + }, + "nullable": [] + } + }, + "a6d0113ac38781a41a717aee7e28940b7f362951402bec50bc54932a6939b217": { + "query": "INSERT INTO artist (name) VALUES ($1) RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + } + }, + "b9323d762b487be18d991f84cfde591c7b33e0a2530be186ab77ad802781772e": { + "query": "SELECT updated_at FROM submission WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + true + ] + } + }, + "cb877c1de895efa7753b25f401036ae61711d95a1c1db233580b50fb36eec0cb": { + "query": "INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Int4" + ] + }, + "nullable": [] + } + }, + "f9dfb3a7414c35f112dc30d053fdc546ec4776761346db98982858ddf3afb6d3": { + "query": "INSERT INTO tag (name) VALUES ($1) RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + } + } +} \ No newline at end of file diff --git a/fuzzysearch-refresh/src/main.rs b/fuzzysearch-refresh/src/main.rs new file mode 100644 index 0000000..a51ac98 --- /dev/null +++ b/fuzzysearch-refresh/src/main.rs @@ -0,0 +1,308 @@ +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +use furaffinity_rs::FurAffinity; +use tracing_unwrap::ResultExt; + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +enum Error { + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + #[error("missing data: {0}")] + MissingData(&'static str), + #[error("furaffinity error")] + FurAffinity(furaffinity_rs::Error), + #[error("faktory error")] + Faktory, +} + +static FURAFFINITY_QUEUE: &str = "fuzzysearch_refresh_furaffinity"; + +type Producer = Arc>>; +type Db = sqlx::Pool; + +fn main() { + fuzzysearch_common::init_logger(); + + tracing::info!("initializing"); + + let rt = Arc::new(tokio::runtime::Runtime::new().unwrap()); + + let mut faktory = faktory::ConsumerBuilder::default(); + faktory.workers(2); + + let p = Arc::new(Mutex::new(faktory::Producer::connect(None).unwrap_or_log())); + + let pool = rt + .block_on( + sqlx::postgres::PgPoolOptions::new() + .max_connections(2) + .connect(&std::env::var("DATABASE_URL").unwrap_or_log()), + ) + .unwrap_or_log(); + + let (cookie_a, cookie_b) = ( + std::env::var("FA_A").unwrap_or_log(), + std::env::var("FA_B").unwrap_or_log(), + ); + let user_agent = std::env::var("USER_AGENT").unwrap_or_log(); + let client = reqwest::Client::new(); + let fa = Arc::new(FurAffinity::new( + cookie_a, + cookie_b, + user_agent, + Some(client), + )); + + rt.spawn(poll_fa_online(fa.clone(), p.clone())); + + let rt_clone = rt.clone(); + let pool_clone = pool.clone(); + faktory.register("furaffinity_load", move |job| -> Result<(), Error> { + use std::convert::TryFrom; + + let id = job + .args() + .iter() + .next() + .ok_or(Error::MissingData("submission id"))? + .as_i64() + .ok_or(Error::MissingData("submission id"))?; + + let id = i32::try_from(id).map_err(|_| Error::MissingData("invalid id"))?; + + let last_updated = rt_clone + .block_on( + sqlx::query_scalar!("SELECT updated_at FROM submission WHERE id = $1", id) + .fetch_optional(&pool_clone), + )? + .flatten(); + + if let Some(last_updated) = last_updated { + let diff = last_updated.signed_duration_since(chrono::Utc::now()); + if diff.num_days() < 30 { + tracing::warn!("attempted to check recent submission, skipping"); + return Ok(()); + } + } + + let sub = rt_clone + .block_on(fa.get_submission(id)) + .map_err(Error::FurAffinity)?; + + tracing::debug!("loaded furaffinity submission"); + + rt_clone.block_on(update_furaffinity_submission( + pool_clone.clone(), + fa.clone(), + id, + sub, + ))?; + + Ok(()) + }); + + faktory.register( + "furaffinity_calculate_missing", + move |job| -> Result<(), Error> { + use std::collections::HashSet; + + let batch_size = job + .args() + .iter() + .next() + .map(|arg| arg.as_i64()) + .flatten() + .unwrap_or(1_000); + + tracing::debug!(batch_size, "calculating missing submissions"); + + let known_ids: HashSet<_> = rt + .block_on(sqlx::query_scalar!("SELECT id FROM submission").fetch_all(&pool))? + .into_iter() + .collect(); + let all_ids: HashSet<_> = (1..=*known_ids.iter().max().unwrap_or(&1)).collect(); + let missing_ids: Vec<_> = all_ids + .difference(&known_ids) + .take(batch_size as usize) + .collect(); + + tracing::info!( + missing = missing_ids.len(), + "enqueueing batch of missing submissions" + ); + + let mut p = p.lock().unwrap_or_log(); + + for id in missing_ids { + let job = + faktory::Job::new("furaffinity_load", vec![*id]).on_queue(FURAFFINITY_QUEUE); + p.enqueue(job).map_err(|_err| Error::Faktory)?; + } + + Ok(()) + }, + ); + + let faktory = faktory.connect(None).unwrap_or_log(); + tracing::info!("starting to run queues"); + faktory.run_to_completion(&["fuzzysearch_refresh", FURAFFINITY_QUEUE]); +} + +/// Check the number of users on FurAffinity every minute and control if queues +/// are allowed to run. +async fn poll_fa_online(fa: Arc, p: Producer) { + use futures::StreamExt; + use std::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, + }; + use tokio::time::interval; + use tokio_stream::wrappers::IntervalStream; + + let max_online = std::env::var("MAX_ONLINE") + .ok() + .and_then(|num| num.parse().ok()) + .unwrap_or(10_000); + + tracing::info!(max_online, "got max fa users online before pause"); + + // Ensure initial state of the queue being enabled. + { + let p = p.clone(); + tokio::task::spawn_blocking(move || { + let mut p = p.lock().unwrap_or_log(); + p.queue_resume(&[FURAFFINITY_QUEUE]).unwrap_or_log(); + }) + .await + .expect_or_log("could not set initial queue state"); + } + + let queue_state = AtomicBool::new(true); + + IntervalStream::new(interval(Duration::from_secs(300))) + .for_each(|_| { + let p = p.clone(); + + async { + let continue_queue = match fa.latest_id().await { + Ok((_latest_id, online)) => { + tracing::debug!(registered = online.registered, "got updated fa online"); + online.registered < max_online + } + Err(err) => { + tracing::error!("unable to get fa online: {:?}", err); + false + } + }; + + if queue_state.load(Ordering::SeqCst) == continue_queue { + tracing::trace!("fa queue was already in correct state"); + return; + } + + tracing::info!(continue_queue, "updating fa queue state"); + + let result = tokio::task::spawn_blocking(move || { + let mut p = p.lock().unwrap_or_log(); + + if continue_queue { + p.queue_resume(&[FURAFFINITY_QUEUE]) + } else { + p.queue_pause(&[FURAFFINITY_QUEUE]) + } + }) + .await; + + match result { + Err(err) => tracing::error!("unable to join queue change: {:?}", err), + Ok(Err(err)) => tracing::error!("unable to change fa queue state: {:?}", err), + _ => queue_state.store(continue_queue, Ordering::SeqCst), + } + } + }) + .await; +} + +async fn get_furaffinity_artist(db: &Db, artist: &str) -> Result { + if let Some(id) = sqlx::query_scalar!("SELECT id FROM artist WHERE name = $1", artist) + .fetch_optional(db) + .await? + { + return Ok(id); + } + + sqlx::query_scalar!("INSERT INTO artist (name) VALUES ($1) RETURNING id", artist) + .fetch_one(db) + .await +} + +async fn get_furaffinity_tag(db: &Db, tag: &str) -> Result { + if let Some(id) = sqlx::query_scalar!("SELECT id FROM tag WHERE name = $1", tag) + .fetch_optional(db) + .await? + { + return Ok(id); + } + + sqlx::query_scalar!("INSERT INTO tag (name) VALUES ($1) RETURNING id", tag) + .fetch_one(db) + .await +} + +async fn associate_furaffinity_tag(db: &Db, id: i32, tag_id: i32) -> Result<(), sqlx::Error> { + sqlx::query!( + "INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", + tag_id, + id + ) + .execute(db) + .await + .map(|_| ()) +} + +async fn update_furaffinity_submission( + db: Db, + fa: Arc, + id: i32, + sub: Option, +) -> Result<(), Error> { + let sub = match sub { + Some(sub) => sub, + None => { + tracing::info!(id, "furaffinity submission did not exist"); + sqlx::query!("INSERT INTO submission (id, updated_at, deleted) VALUES ($1, current_timestamp, true) ON CONFLICT (id) DO UPDATE SET deleted = true", id).execute(&db).await?; + return Ok(()); + } + }; + + let sub = fa.calc_image_hash(sub).await.map_err(Error::FurAffinity)?; + + let artist_id = get_furaffinity_artist(&db, &sub.artist).await?; + + let mut tag_ids = Vec::with_capacity(sub.tags.len()); + for tag in &sub.tags { + tag_ids.push(get_furaffinity_tag(&db, tag).await?); + } + + let hash = sub.hash.clone(); + let url = sub.content.url(); + + let size = sub.file_size.map(|size| size as i32); + + sqlx::query!( + "INSERT INTO submission + (id, artist_id, url, filename, hash, rating, posted_at, description, hash_int, file_id, file_size, file_sha256, updated_at) VALUES + ($1, $2, $3, $4, decode($5, 'base64'), $6, $7, $8, $9, CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, $10, $11, current_timestamp) + ON CONFLICT (id) DO UPDATE SET url = $3, filename = $4, hash = decode($5, 'base64'), rating = $6, description = $8, hash_int = $9, file_id = CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, file_size = $10, file_sha256 = $11, updated_at = current_timestamp", + sub.id, artist_id, url, sub.filename, hash, sub.rating.serialize(), sub.posted_at, sub.description, sub.hash_num, size, sub.file_sha256, + ) + .execute(&db).await?; + + for tag_id in tag_ids { + associate_furaffinity_tag(&db, id, tag_id).await?; + } + + Ok(()) +} diff --git a/migrations/20210822052313_deleted_flag.down.sql b/migrations/20210822052313_deleted_flag.down.sql new file mode 100644 index 0000000..c47c49f --- /dev/null +++ b/migrations/20210822052313_deleted_flag.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE submission DROP COLUMN deleted; +ALTER TABLE e621 DROP COLUMN deleted; +ALTER TABLE weasyl DROP COLUMN deleted; diff --git a/migrations/20210822052313_deleted_flag.up.sql b/migrations/20210822052313_deleted_flag.up.sql new file mode 100644 index 0000000..650be94 --- /dev/null +++ b/migrations/20210822052313_deleted_flag.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE submission ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE e621 ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE weasyl ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT false;