mirror of
https://github.com/Syfaro/fuzzysearch.git
synced 2024-11-23 15:22:31 +00:00
Add 'fuzzysearch-ingest-furaffinity/' from commit 'c44b877f70bf478e599110b10e17d152f89c8d61'
git-subtree-dir: fuzzysearch-ingest-furaffinity git-subtree-mainline:59da1e99a8
git-subtree-split:c44b877f70
This commit is contained in:
commit
43e5c834a9
1
fuzzysearch-ingest-furaffinity/.dockerignore
Normal file
1
fuzzysearch-ingest-furaffinity/.dockerignore
Normal file
@ -0,0 +1 @@
|
|||||||
|
target
|
17
fuzzysearch-ingest-furaffinity/.drone.yml
Normal file
17
fuzzysearch-ingest-furaffinity/.drone.yml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
---
|
||||||
|
kind: pipeline
|
||||||
|
name: default
|
||||||
|
type: docker
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: docker
|
||||||
|
image: plugins/docker
|
||||||
|
settings:
|
||||||
|
auto_tag: true
|
||||||
|
registry: registry.huefox.com
|
||||||
|
repo: registry.huefox.com/fa-watcher
|
||||||
|
username:
|
||||||
|
from_secret: docker_username
|
||||||
|
password:
|
||||||
|
from_secret: docker_password
|
||||||
|
...
|
2
fuzzysearch-ingest-furaffinity/.gitignore
vendored
Normal file
2
fuzzysearch-ingest-furaffinity/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
/target
|
||||||
|
**/*.rs.bk
|
2468
fuzzysearch-ingest-furaffinity/Cargo.lock
generated
Normal file
2468
fuzzysearch-ingest-furaffinity/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
22
fuzzysearch-ingest-furaffinity/Cargo.toml
Normal file
22
fuzzysearch-ingest-furaffinity/Cargo.toml
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
[package]
|
||||||
|
name = "fa-watcher"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Syfaro <syfaro@huefox.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
reqwest = "0.10"
|
||||||
|
postgres = { version = "0.17.0", features = ["with-chrono-0_4"] }
|
||||||
|
tokio = { version = "0.2", features = ["full"] }
|
||||||
|
tokio-postgres = { version = "0.5.0" }
|
||||||
|
r2d2_postgres = " 0.16.0"
|
||||||
|
r2d2 = "0.8"
|
||||||
|
chrono = "0.4"
|
||||||
|
hyper = "0.13"
|
||||||
|
prometheus = { version = "0.10", features = ["process"] }
|
||||||
|
lazy_static = "1"
|
||||||
|
hex = "0.4"
|
||||||
|
|
||||||
|
[dependencies.furaffinity-rs]
|
||||||
|
git = "https://github.com/Syfaro/furaffinity-rs"
|
||||||
|
features = ["cloudflare-bypass"]
|
5
fuzzysearch-ingest-furaffinity/Dockerfile
Normal file
5
fuzzysearch-ingest-furaffinity/Dockerfile
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
FROM rustlang/rust:nightly-slim
|
||||||
|
COPY . .
|
||||||
|
RUN apt-get -y update && apt-get -y install pkg-config libssl-dev
|
||||||
|
RUN cargo install --root / --path .
|
||||||
|
CMD ["/bin/fa-watcher"]
|
13
fuzzysearch-ingest-furaffinity/notes.md
Normal file
13
fuzzysearch-ingest-furaffinity/notes.md
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
misc notes about watching fa submissions
|
||||||
|
|
||||||
|
## important data
|
||||||
|
|
||||||
|
* id
|
||||||
|
* artist
|
||||||
|
* image url
|
||||||
|
* image hash
|
||||||
|
* image filename
|
||||||
|
* rating
|
||||||
|
* posted date
|
||||||
|
* tags
|
||||||
|
* description
|
259
fuzzysearch-ingest-furaffinity/src/main.rs
Normal file
259
fuzzysearch-ingest-furaffinity/src/main.rs
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
use lazy_static::lazy_static;
|
||||||
|
use tokio_postgres::Client;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref SUBMISSION_DURATION: prometheus::Histogram = prometheus::register_histogram!(
|
||||||
|
"fuzzysearch_watcher_fa_processing_seconds",
|
||||||
|
"Duration to process a submission"
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn lookup_tag(client: &Client, tag: &str) -> i32 {
|
||||||
|
if let Some(row) = client
|
||||||
|
.query("SELECT id FROM tag WHERE name = $1", &[&tag])
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
{
|
||||||
|
return row.get("id");
|
||||||
|
}
|
||||||
|
|
||||||
|
client
|
||||||
|
.query("INSERT INTO tag (name) VALUES ($1) RETURNING id", &[&tag])
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.unwrap()
|
||||||
|
.get("id")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn lookup_artist(client: &Client, artist: &str) -> i32 {
|
||||||
|
if let Some(row) = client
|
||||||
|
.query("SELECT id FROM artist WHERE name = $1", &[&artist])
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
{
|
||||||
|
return row.get("id");
|
||||||
|
}
|
||||||
|
|
||||||
|
client
|
||||||
|
.query(
|
||||||
|
"INSERT INTO artist (name) VALUES ($1) RETURNING id",
|
||||||
|
&[&artist],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.unwrap()
|
||||||
|
.get("id")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn has_submission(client: &Client, id: i32) -> bool {
|
||||||
|
client
|
||||||
|
.query("SELECT id FROM submission WHERE id = $1", &[&id])
|
||||||
|
.await
|
||||||
|
.expect("unable to run query")
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ids_to_check(client: &Client, max: i32) -> Vec<i32> {
|
||||||
|
let rows = client.query("SELECT sid FROM generate_series((SELECT max(id) FROM submission), $1::int) sid WHERE sid NOT IN (SELECT id FROM submission where id = sid)", &[&max]).await.unwrap();
|
||||||
|
|
||||||
|
rows.iter().map(|row| row.get("sid")).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn insert_submission(
|
||||||
|
client: &Client,
|
||||||
|
sub: &furaffinity_rs::Submission,
|
||||||
|
) -> Result<(), postgres::Error> {
|
||||||
|
let artist_id = lookup_artist(&client, &sub.artist).await;
|
||||||
|
let mut tag_ids = Vec::with_capacity(sub.tags.len());
|
||||||
|
for tag in &sub.tags {
|
||||||
|
tag_ids.push(lookup_tag(&client, &tag).await);
|
||||||
|
}
|
||||||
|
|
||||||
|
let hash = sub.hash.clone();
|
||||||
|
let url = sub.content.url();
|
||||||
|
|
||||||
|
let size = sub.file_size.map(|size| size as i32);
|
||||||
|
|
||||||
|
client.execute("INSERT INTO submission (id, artist_id, url, filename, hash, rating, posted_at, description, hash_int, file_id, file_size, file_sha256) VALUES ($1, $2, $3, $4, decode($5, 'base64'), $6, $7, $8, $9, CASE WHEN isnumeric(split_part($4, '.', 1)) THEN split_part($4, '.', 1)::int ELSE null END, $10, $11)", &[
|
||||||
|
&sub.id, &artist_id, &url, &sub.filename, &hash, &sub.rating.serialize(), &sub.posted_at, &sub.description, &sub.hash_num, &size, &sub.file_sha256,
|
||||||
|
]).await?;
|
||||||
|
|
||||||
|
let stmt = client
|
||||||
|
.prepare("INSERT INTO tag_to_post (tag_id, post_id) VALUES ($1, $2) ON CONFLICT DO NOTHING")
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for tag_id in tag_ids {
|
||||||
|
client.execute(&stmt, &[&tag_id, &sub.id]).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn insert_null_submission(client: &Client, id: i32) -> Result<u64, postgres::Error> {
|
||||||
|
client
|
||||||
|
.execute("INSERT INTO SUBMISSION (id) VALUES ($1)", &[&id])
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn request(
|
||||||
|
req: hyper::Request<hyper::Body>,
|
||||||
|
) -> Result<hyper::Response<hyper::Body>, hyper::Error> {
|
||||||
|
match (req.method(), req.uri().path()) {
|
||||||
|
(&hyper::Method::GET, "/health") => Ok(hyper::Response::new(hyper::Body::from("OK"))),
|
||||||
|
|
||||||
|
(&hyper::Method::GET, "/metrics") => {
|
||||||
|
use prometheus::Encoder;
|
||||||
|
|
||||||
|
let encoder = prometheus::TextEncoder::new();
|
||||||
|
|
||||||
|
let metric_families = prometheus::gather();
|
||||||
|
let mut buffer = vec![];
|
||||||
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||||
|
|
||||||
|
Ok(hyper::Response::new(hyper::Body::from(buffer)))
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => {
|
||||||
|
let mut not_found = hyper::Response::default();
|
||||||
|
*not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
|
||||||
|
Ok(not_found)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn web() {
|
||||||
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
|
|
||||||
|
let addr: std::net::SocketAddr = std::env::var("HTTP_HOST").unwrap().parse().unwrap();
|
||||||
|
|
||||||
|
let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(request)) });
|
||||||
|
|
||||||
|
let server = hyper::Server::bind(&addr).serve(service);
|
||||||
|
|
||||||
|
println!("Listening on http://{}", addr);
|
||||||
|
|
||||||
|
server.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let (cookie_a, cookie_b) = (
|
||||||
|
std::env::var("FA_A").expect("missing fa cookie a"),
|
||||||
|
std::env::var("FA_B").expect("missing fa cookie b"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let path = std::env::var("OUT_DIR").expect("missing output directory");
|
||||||
|
let path = std::path::Path::new(&path);
|
||||||
|
|
||||||
|
let user_agent = std::env::var("USER_AGENT").expect("missing user agent");
|
||||||
|
|
||||||
|
let fa = furaffinity_rs::FurAffinity::new(cookie_a, cookie_b, user_agent);
|
||||||
|
|
||||||
|
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
|
||||||
|
|
||||||
|
let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = connection.await {
|
||||||
|
panic!(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::spawn(async move { web().await });
|
||||||
|
|
||||||
|
println!("Started");
|
||||||
|
|
||||||
|
'main: loop {
|
||||||
|
println!("Fetching latest ID");
|
||||||
|
let latest_id = fa.latest_id().await.expect("unable to get latest id");
|
||||||
|
|
||||||
|
for id in ids_to_check(&client, latest_id).await {
|
||||||
|
'attempt: for attempt in 0..3 {
|
||||||
|
if !has_submission(&client, id).await {
|
||||||
|
println!("loading submission {}", id);
|
||||||
|
|
||||||
|
let timer = SUBMISSION_DURATION.start_timer();
|
||||||
|
|
||||||
|
let sub = match fa.get_submission(id).await {
|
||||||
|
Ok(sub) => sub,
|
||||||
|
Err(e) => {
|
||||||
|
println!("got error: {:?}, retry {}", e.message, e.retry);
|
||||||
|
timer.stop_and_discard();
|
||||||
|
if e.retry {
|
||||||
|
tokio::time::delay_for(std::time::Duration::from_secs(attempt + 1))
|
||||||
|
.await;
|
||||||
|
continue 'attempt;
|
||||||
|
} else {
|
||||||
|
println!("unrecoverable, exiting");
|
||||||
|
break 'main;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let sub = match sub {
|
||||||
|
Some(sub) => sub,
|
||||||
|
None => {
|
||||||
|
println!("did not exist");
|
||||||
|
timer.stop_and_discard();
|
||||||
|
insert_null_submission(&client, id).await.unwrap();
|
||||||
|
break 'attempt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let sub = match fa.calc_image_hash(sub.clone()).await {
|
||||||
|
Ok(sub) => sub,
|
||||||
|
Err(e) => {
|
||||||
|
println!("unable to hash image: {:?}", e);
|
||||||
|
sub
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
timer.stop_and_record();
|
||||||
|
|
||||||
|
if let Some(sha) = &sub.file_sha256 {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let file = sub.file.as_ref().unwrap();
|
||||||
|
let ext = sub.filename.split('.').last().unwrap();
|
||||||
|
|
||||||
|
let h = hex::encode(sha);
|
||||||
|
let p = path.join(&h[0..2]).join(&h[2..4]);
|
||||||
|
std::fs::create_dir_all(&p).expect("unable to create hash directory");
|
||||||
|
|
||||||
|
let name = format!("{}.{}", h, ext);
|
||||||
|
let name = std::path::Path::new(&name);
|
||||||
|
let name = p.join(name);
|
||||||
|
|
||||||
|
if !name.exists() {
|
||||||
|
let mut f = tokio::fs::File::create(&name).await.expect("unable to create submission file");
|
||||||
|
f.write_all(file).await.expect("unable to write file contents");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
insert_submission(&client, &sub).await.unwrap();
|
||||||
|
|
||||||
|
break 'attempt;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("ran out of attempts");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("completed fetch, waiting a minute before loading more");
|
||||||
|
|
||||||
|
tokio::time::delay_for(std::time::Duration::from_secs(60)).await;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user