mirror of
https://github.com/Syfaro/fuzzysearch.git
synced 2024-11-23 15:22:31 +00:00
Keep database updated.
This commit is contained in:
parent
0453a84511
commit
d557d9bbae
1
.dockerignore
Normal file
1
.dockerignore
Normal file
@ -0,0 +1 @@
|
|||||||
|
target/
|
25
.drone.yml
Normal file
25
.drone.yml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
---
|
||||||
|
kind: pipeline
|
||||||
|
type: docker
|
||||||
|
name: default
|
||||||
|
|
||||||
|
platform:
|
||||||
|
os: linux
|
||||||
|
arch: amd64
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: build-latest
|
||||||
|
image: plugins/docker
|
||||||
|
settings:
|
||||||
|
auto_tag: true
|
||||||
|
password:
|
||||||
|
from_secret: docker_password
|
||||||
|
registry: registry.huefox.com
|
||||||
|
repo: registry.huefox.com/e621-watcher
|
||||||
|
username:
|
||||||
|
from_secret: docker_username
|
||||||
|
when:
|
||||||
|
branch:
|
||||||
|
- master
|
||||||
|
|
||||||
|
...
|
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +1,2 @@
|
|||||||
/target
|
/target
|
||||||
|
.env
|
||||||
|
762
Cargo.lock
generated
762
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
21
Cargo.toml
21
Cargo.toml
@ -5,20 +5,25 @@ authors = ["Syfaro <syfaro@huefox.com>"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
reqwest = { version = "0.11", features = ["json"] }
|
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
futures = { version = "0.3", features = ["thread-pool"] }
|
|
||||||
hamming = "0.1.3"
|
hyper = { version = "0.14", features = ["server"] }
|
||||||
|
reqwest = { version = "0.11", features = ["json"] }
|
||||||
|
|
||||||
serde = "1"
|
serde = "1"
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
|
|
||||||
tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
|
sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] }
|
||||||
bb8 = "0.7"
|
|
||||||
bb8-postgres = "0.7"
|
|
||||||
|
|
||||||
image = "0.23"
|
image = "0.23"
|
||||||
|
hamming = "0.1.3"
|
||||||
img_hash = "3"
|
img_hash = "3"
|
||||||
|
sha2 = "0.9"
|
||||||
|
|
||||||
[profile.dev.package."*"]
|
tracing = "0.1"
|
||||||
opt-level = 2
|
tracing-subscriber = "0.2"
|
||||||
|
|
||||||
|
anyhow = "1"
|
||||||
|
|
||||||
|
lazy_static = "1"
|
||||||
|
prometheus = { version = "0.11", features = ["process"] }
|
||||||
|
14
Dockerfile
Normal file
14
Dockerfile
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
FROM rust:1-slim AS builder
|
||||||
|
WORKDIR /src
|
||||||
|
ENV SQLX_OFFLINE=true
|
||||||
|
RUN apt-get update -y && apt-get install -y libssl-dev pkg-config
|
||||||
|
COPY . .
|
||||||
|
RUN cargo install --root / --path .
|
||||||
|
|
||||||
|
FROM debian:buster-slim
|
||||||
|
EXPOSE 8080
|
||||||
|
ENV METRICS_HOST=0.0.0.0:8080
|
||||||
|
WORKDIR /app
|
||||||
|
RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
|
||||||
|
COPY --from=builder /bin/e621-watcher /bin/e621-watcher
|
||||||
|
CMD ["/bin/e621-watcher"]
|
37
sqlx-data.json
Normal file
37
sqlx-data.json
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
{
|
||||||
|
"db": "PostgreSQL",
|
||||||
|
"02b98e35cf7d650413c2730df732d7ae08119b11a5b2aaddcee08a7f06338924": {
|
||||||
|
"query": "SELECT max(id) max FROM e621",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "max",
|
||||||
|
"type_info": "Int4"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": []
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
null
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"a054594f7844f32e5968a54c0dab59716149a10411fcb16184a9070a82bb287d": {
|
||||||
|
"query": "INSERT INTO e621\n (id, data, hash, hash_error, sha256) VALUES\n ($1, $2, $3, $4, $5)\n ON CONFLICT (id) DO UPDATE SET\n data = EXCLUDED.data,\n hash = EXCLUDED.hash,\n hash_error = EXCLUDED.hash_error,\n sha256 = EXCLUDED.sha256",
|
||||||
|
"describe": {
|
||||||
|
"columns": [],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Int4",
|
||||||
|
"Jsonb",
|
||||||
|
"Int8",
|
||||||
|
"Text",
|
||||||
|
"Bytea"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,130 +0,0 @@
|
|||||||
async fn load_page(
|
|
||||||
client: &reqwest::Client,
|
|
||||||
before_id: Option<i32>,
|
|
||||||
) -> (Vec<i32>, serde_json::Value) {
|
|
||||||
println!("Loading page with before_id {:?}", before_id);
|
|
||||||
|
|
||||||
let mut query: Vec<(&'static str, String)> = vec![("limit", "320".into())];
|
|
||||||
|
|
||||||
if let Some(before_id) = before_id {
|
|
||||||
query.push(("page", format!("b{}", before_id)));
|
|
||||||
if before_id <= 14 {
|
|
||||||
panic!("that's it.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let body = client
|
|
||||||
.get("https://e621.net/posts.json")
|
|
||||||
.query(&query)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("unable to make request")
|
|
||||||
.text()
|
|
||||||
.await
|
|
||||||
.expect("unable to convert to text");
|
|
||||||
|
|
||||||
let json = serde_json::from_str(&body).expect("Unable to parse data");
|
|
||||||
|
|
||||||
let page = match json {
|
|
||||||
serde_json::Value::Object(ref obj) => obj,
|
|
||||||
_ => panic!("top level value was not object"),
|
|
||||||
};
|
|
||||||
|
|
||||||
let posts = page
|
|
||||||
.get("posts")
|
|
||||||
.expect("unable to get posts object")
|
|
||||||
.as_array()
|
|
||||||
.expect("posts was not array");
|
|
||||||
|
|
||||||
let ids = posts
|
|
||||||
.iter()
|
|
||||||
.map(|post| {
|
|
||||||
let post = match post {
|
|
||||||
serde_json::Value::Object(post) => post,
|
|
||||||
_ => panic!("invalid post data"),
|
|
||||||
};
|
|
||||||
|
|
||||||
match post.get("id").expect("missing post id") {
|
|
||||||
serde_json::Value::Number(num) => {
|
|
||||||
num.as_i64().expect("invalid post id type") as i32
|
|
||||||
}
|
|
||||||
_ => panic!("invalid post id"),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(ids, serde_json::Value::Array(posts.to_vec()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
|
|
||||||
|
|
||||||
let (db, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
|
|
||||||
.await
|
|
||||||
.expect("Unable to connect");
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = connection.await {
|
|
||||||
eprintln!("connection error: {}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
"CREATE TABLE IF NOT EXISTS e621 (id INTEGER PRIMARY KEY, hash BIGINT, data JSONB, hash_error TEXT)",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to create table");
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
"CREATE OR REPLACE FUNCTION extract_post_data() RETURNS TRIGGER AS $$
|
|
||||||
BEGIN
|
|
||||||
NEW.id = NEW.data->'id';
|
|
||||||
RETURN NEW;
|
|
||||||
END $$
|
|
||||||
LANGUAGE 'plpgsql'",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to create function");
|
|
||||||
|
|
||||||
db.execute("DROP TRIGGER IF EXISTS call_extract_post_data ON e621", &[])
|
|
||||||
.await
|
|
||||||
.expect("Unable to drop trigger");
|
|
||||||
db.execute("CREATE TRIGGER call_extract_post_data BEFORE INSERT ON e621 FOR EACH ROW EXECUTE PROCEDURE extract_post_data()", &[]).await.expect("Unable to create trigger");
|
|
||||||
|
|
||||||
let mut min_id = db
|
|
||||||
.query_one("SELECT MIN(id) FROM e621", &[])
|
|
||||||
.await
|
|
||||||
.map(|row| row.get("min"))
|
|
||||||
.expect("Unable to get min post");
|
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.user_agent("Syfaro test client syfaro@huefox.com")
|
|
||||||
.build()
|
|
||||||
.expect("Unable to build http client");
|
|
||||||
|
|
||||||
let mut now;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
now = std::time::Instant::now();
|
|
||||||
|
|
||||||
let (ids, post_data) = load_page(&client, min_id).await;
|
|
||||||
min_id = ids.into_iter().min();
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
"INSERT INTO e621 (data) SELECT json_array_elements($1::json)",
|
|
||||||
&[&post_data],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to insert");
|
|
||||||
|
|
||||||
let elapsed = now.elapsed().as_millis() as u64;
|
|
||||||
if elapsed < 1000 {
|
|
||||||
let delay = 1000 - elapsed;
|
|
||||||
println!("delaying {}ms before loading next page", delay);
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,161 +0,0 @@
|
|||||||
use bb8::Pool;
|
|
||||||
use bb8_postgres::PostgresConnectionManager;
|
|
||||||
use futures::StreamExt;
|
|
||||||
|
|
||||||
struct NeededPost {
|
|
||||||
id: i32,
|
|
||||||
full_url: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_hasher() -> img_hash::Hasher<[u8; 8]> {
|
|
||||||
img_hash::HasherConfig::with_bytes_type::<[u8; 8]>()
|
|
||||||
.hash_alg(img_hash::HashAlg::Gradient)
|
|
||||||
.hash_size(8, 8)
|
|
||||||
.preproc_dct()
|
|
||||||
.to_hasher()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn hash_url(
|
|
||||||
id: i32,
|
|
||||||
client: std::sync::Arc<reqwest::Client>,
|
|
||||||
url: String,
|
|
||||||
) -> (i32, Result<i64, image::ImageError>) {
|
|
||||||
let data = client
|
|
||||||
.get(&url)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("unable to get url")
|
|
||||||
.bytes()
|
|
||||||
.await
|
|
||||||
.expect("unable to get bytes");
|
|
||||||
|
|
||||||
let hasher = get_hasher();
|
|
||||||
let image = match image::load_from_memory(&data) {
|
|
||||||
Ok(image) => image,
|
|
||||||
Err(e) => {
|
|
||||||
println!("{:?}", &data[0..50]);
|
|
||||||
return (id, Err(e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let hash = hasher.hash_image(&image);
|
|
||||||
let mut bytes: [u8; 8] = [0; 8];
|
|
||||||
bytes.copy_from_slice(hash.as_bytes());
|
|
||||||
|
|
||||||
let num = i64::from_be_bytes(bytes);
|
|
||||||
|
|
||||||
println!("{} - {}", url, num);
|
|
||||||
|
|
||||||
(id, Ok(num))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn load_next_posts(
|
|
||||||
db: Pool<PostgresConnectionManager<tokio_postgres::NoTls>>,
|
|
||||||
) -> Vec<NeededPost> {
|
|
||||||
db.get()
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.query(
|
|
||||||
"SELECT
|
|
||||||
id,
|
|
||||||
data->'file'->>'url' file_url
|
|
||||||
FROM
|
|
||||||
e621
|
|
||||||
WHERE
|
|
||||||
hash IS NULL AND
|
|
||||||
hash_error IS NULL AND
|
|
||||||
data->'file'->>'ext' IN ('jpg', 'png') AND
|
|
||||||
data->'file'->>'url' <> '/images/deleted-preview.png'
|
|
||||||
ORDER BY id DESC
|
|
||||||
LIMIT 384",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to get posts")
|
|
||||||
.into_iter()
|
|
||||||
.map(|row| NeededPost {
|
|
||||||
id: row.get("id"),
|
|
||||||
full_url: row.get("file_url"),
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
|
|
||||||
|
|
||||||
use std::str::FromStr;
|
|
||||||
let manager = PostgresConnectionManager::new(
|
|
||||||
tokio_postgres::Config::from_str(&dsn).expect("unable to parse postgres dsn"),
|
|
||||||
tokio_postgres::NoTls,
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = Pool::builder()
|
|
||||||
.build(manager)
|
|
||||||
.await
|
|
||||||
.expect("unable to build pool");
|
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.user_agent("Syfaro test client syfaro@huefox.com")
|
|
||||||
.build()
|
|
||||||
.expect("Unable to build http client");
|
|
||||||
let client = std::sync::Arc::new(client);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
println!("running loop");
|
|
||||||
|
|
||||||
let needed_posts = load_next_posts(pool.clone()).await;
|
|
||||||
|
|
||||||
if needed_posts.is_empty() {
|
|
||||||
println!("no posts, waiting a minute");
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
futures::stream::iter(
|
|
||||||
needed_posts
|
|
||||||
.into_iter()
|
|
||||||
.map(|post| hash_url(post.id, client.clone(), post.full_url)),
|
|
||||||
)
|
|
||||||
.buffer_unordered(8)
|
|
||||||
.for_each(|res: (i32, Result<i64, image::ImageError>)| async {
|
|
||||||
let db = pool.get().await.expect("unable to get from pool");
|
|
||||||
|
|
||||||
match res {
|
|
||||||
(id, Ok(num)) => {
|
|
||||||
let mut conn = pool.get().await.unwrap();
|
|
||||||
|
|
||||||
let tx = conn
|
|
||||||
.transaction()
|
|
||||||
.await
|
|
||||||
.expect("Unable to create transaction");
|
|
||||||
|
|
||||||
tx.execute("UPDATE e621 SET hash = $2 WHERE id = $1", &[&id, &num])
|
|
||||||
.await
|
|
||||||
.expect("Unable to update hash in database");
|
|
||||||
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO hashes (e621_id, hash) VALUES ($1, $2)",
|
|
||||||
&[&id, &num],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to insert hash to hashes table");
|
|
||||||
|
|
||||||
tx.commit().await.expect("Unable to commit tx");
|
|
||||||
}
|
|
||||||
(id, Err(e)) => {
|
|
||||||
let desc = e.to_string();
|
|
||||||
println!("[{}] hashing error - {}", id, desc);
|
|
||||||
db.execute(
|
|
||||||
"UPDATE e621 SET hash_error = $2 WHERE id = $1",
|
|
||||||
&[&id, &desc],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to update hash error in database");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
()
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,171 +0,0 @@
|
|||||||
async fn load_page(client: &reqwest::Client, after_id: i32) -> (Vec<i32>, Vec<serde_json::Value>) {
|
|
||||||
println!("Loading page with after_id {:?}", after_id);
|
|
||||||
|
|
||||||
let mut query: Vec<(&'static str, String)> = vec![("limit", "320".into())];
|
|
||||||
query.push(("page", format!("a{}", after_id)));
|
|
||||||
|
|
||||||
let body = client
|
|
||||||
.get("https://e621.net/posts.json")
|
|
||||||
.query(&query)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("unable to make request")
|
|
||||||
.text()
|
|
||||||
.await
|
|
||||||
.expect("unable to convert to text");
|
|
||||||
|
|
||||||
let json = serde_json::from_str(&body).expect("Unable to parse data");
|
|
||||||
|
|
||||||
let page = match json {
|
|
||||||
serde_json::Value::Object(ref obj) => obj,
|
|
||||||
_ => panic!("top level value was not object"),
|
|
||||||
};
|
|
||||||
|
|
||||||
let posts = page
|
|
||||||
.get("posts")
|
|
||||||
.expect("unable to get posts object")
|
|
||||||
.as_array()
|
|
||||||
.expect("posts was not array");
|
|
||||||
|
|
||||||
let ids = posts
|
|
||||||
.iter()
|
|
||||||
.map(|post| {
|
|
||||||
let post = match post {
|
|
||||||
serde_json::Value::Object(post) => post,
|
|
||||||
_ => panic!("invalid post data"),
|
|
||||||
};
|
|
||||||
|
|
||||||
match post.get("id").expect("missing post id") {
|
|
||||||
serde_json::Value::Number(num) => {
|
|
||||||
num.as_i64().expect("invalid post id type") as i32
|
|
||||||
}
|
|
||||||
_ => panic!("invalid post id"),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(ids, posts.to_vec())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_latest_id(client: &reqwest::Client) -> i32 {
|
|
||||||
println!("Looking up current highest ID");
|
|
||||||
|
|
||||||
let query = vec![("limit", "1")];
|
|
||||||
|
|
||||||
let body = client
|
|
||||||
.get("https://e621.net/posts.json")
|
|
||||||
.query(&query)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("unable to make request")
|
|
||||||
.text()
|
|
||||||
.await
|
|
||||||
.expect("unable to convert to text");
|
|
||||||
|
|
||||||
let json = serde_json::from_str(&body).expect("Unable to parse data");
|
|
||||||
|
|
||||||
let page = match json {
|
|
||||||
serde_json::Value::Object(ref obj) => obj,
|
|
||||||
_ => panic!("top level value was not object"),
|
|
||||||
};
|
|
||||||
|
|
||||||
let posts = page
|
|
||||||
.get("posts")
|
|
||||||
.expect("unable to get posts object")
|
|
||||||
.as_array()
|
|
||||||
.expect("posts was not array");
|
|
||||||
|
|
||||||
let ids: Vec<i32> = posts
|
|
||||||
.iter()
|
|
||||||
.map(|post| {
|
|
||||||
let post = match post {
|
|
||||||
serde_json::Value::Object(post) => post,
|
|
||||||
_ => panic!("invalid post data"),
|
|
||||||
};
|
|
||||||
|
|
||||||
match post.get("id").expect("missing post id") {
|
|
||||||
serde_json::Value::Number(num) => {
|
|
||||||
num.as_i64().expect("invalid post id type") as i32
|
|
||||||
}
|
|
||||||
_ => panic!("invalid post id"),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
ids.into_iter().max().expect("no ids found")
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
|
|
||||||
|
|
||||||
let (mut db, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls)
|
|
||||||
.await
|
|
||||||
.expect("Unable to connect");
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = connection.await {
|
|
||||||
eprintln!("connection error: {}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let max_id: i32 = db
|
|
||||||
.query_one("SELECT max(id) FROM e621", &[])
|
|
||||||
.await
|
|
||||||
.map(|row| row.get("max"))
|
|
||||||
.expect("Unable to get max post");
|
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.user_agent("Syfaro test client syfaro@huefox.com")
|
|
||||||
.build()
|
|
||||||
.expect("Unable to build http client");
|
|
||||||
|
|
||||||
println!("max is id: {}", max_id);
|
|
||||||
|
|
||||||
let mut now;
|
|
||||||
|
|
||||||
// Start with the minimum ID we're requesting being our previous highest
|
|
||||||
// ID found.
|
|
||||||
let mut min_id = max_id;
|
|
||||||
|
|
||||||
// Find highest ID to look for. Once we get this value back, we've gotten
|
|
||||||
// as many new posts as we were looking for.
|
|
||||||
let latest_id = get_latest_id(&client).await;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
now = std::time::Instant::now();
|
|
||||||
|
|
||||||
// Load any posts with an ID higher than our previous run.
|
|
||||||
let (ids, post_data) = load_page(&client, min_id).await;
|
|
||||||
|
|
||||||
// Calculate a new minimum value to find posts after by looking at the
|
|
||||||
// maximum value returned in this run.
|
|
||||||
min_id = *ids.iter().max().expect("no ids found");
|
|
||||||
|
|
||||||
let tx = db.transaction().await.expect("unable to start transaction");
|
|
||||||
|
|
||||||
for post in post_data {
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO e621 (data) VALUES ($1::json) ON CONFLICT DO NOTHING",
|
|
||||||
&[&post],
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("Unable to insert");
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.commit().await.expect("unable to commit transaction");
|
|
||||||
|
|
||||||
// If it contains the latest ID, we're done.
|
|
||||||
if ids.contains(&latest_id) {
|
|
||||||
println!("finished run, latest_id {}, max_id {}", latest_id, max_id);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let elapsed = now.elapsed().as_millis() as u64;
|
|
||||||
if elapsed < 1000 {
|
|
||||||
let delay = 1000 - elapsed;
|
|
||||||
println!("delaying {}ms before loading next page", delay);
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
354
src/main.rs
Normal file
354
src/main.rs
Normal file
@ -0,0 +1,354 @@
|
|||||||
|
use anyhow::Context;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use prometheus::{register_histogram, register_int_gauge, Histogram, IntGauge};
|
||||||
|
use sqlx::Connection;
|
||||||
|
|
||||||
|
static USER_AGENT: &str = "e621-watcher / FuzzySearch Ingester / Syfaro <syfaro@huefox.com>";
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref SUBMISSION_BACKLOG: IntGauge = register_int_gauge!(
|
||||||
|
"fuzzysearch_watcher_e621_submission_backlog",
|
||||||
|
"Number of submissions behind the latest ID"
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
static ref INDEX_DURATION: Histogram = register_histogram!(
|
||||||
|
"fuzzysearch_watcher_e621_index_duration",
|
||||||
|
"Duration to load an index of submissions"
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
static ref SUBMISSION_DURATION: Histogram = register_histogram!(
|
||||||
|
"fuzzysearch_watcher_e621_submission_duration",
|
||||||
|
"Duration to ingest a submission"
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
|
create_metrics_server().await;
|
||||||
|
|
||||||
|
let client = reqwest::ClientBuilder::default()
|
||||||
|
.user_agent(USER_AGENT)
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
let mut conn = sqlx::PgConnection::connect(&std::env::var("DATABASE_URL")?).await?;
|
||||||
|
|
||||||
|
let max_id: i32 = sqlx::query!("SELECT max(id) max FROM e621")
|
||||||
|
.fetch_one(&mut conn)
|
||||||
|
.await?
|
||||||
|
.max
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
tracing::info!(max_id, "Found maximum ID in database");
|
||||||
|
|
||||||
|
let mut now;
|
||||||
|
let mut min_id = max_id;
|
||||||
|
|
||||||
|
let mut latest_id: Option<i32> = None;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
now = std::time::Instant::now();
|
||||||
|
|
||||||
|
let lid = match latest_id {
|
||||||
|
Some(latest_id) => latest_id,
|
||||||
|
None => {
|
||||||
|
let _hist = INDEX_DURATION.start_timer();
|
||||||
|
let lid = get_latest_id(&client)
|
||||||
|
.await
|
||||||
|
.expect("Unable to get latest ID");
|
||||||
|
drop(_hist);
|
||||||
|
|
||||||
|
latest_id = Some(lid);
|
||||||
|
|
||||||
|
lid
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let _hist = INDEX_DURATION.start_timer();
|
||||||
|
let page = load_page(&client, min_id).await?;
|
||||||
|
drop(_hist);
|
||||||
|
|
||||||
|
let posts = get_page_posts(&page)?;
|
||||||
|
let post_ids = get_post_ids(&posts);
|
||||||
|
|
||||||
|
tracing::trace!(?post_ids, "Collected posts");
|
||||||
|
|
||||||
|
min_id = match post_ids.iter().max() {
|
||||||
|
Some(id) => *id,
|
||||||
|
None => {
|
||||||
|
tracing::warn!("Found no new posts, sleeping");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
SUBMISSION_BACKLOG.set((lid - min_id).into());
|
||||||
|
|
||||||
|
let mut tx = conn.begin().await?;
|
||||||
|
|
||||||
|
for post in posts {
|
||||||
|
let _hist = SUBMISSION_DURATION.start_timer();
|
||||||
|
insert_submission(&mut tx, &client, post).await?;
|
||||||
|
drop(_hist);
|
||||||
|
|
||||||
|
SUBMISSION_BACKLOG.sub(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.commit().await?;
|
||||||
|
|
||||||
|
let elapsed = now.elapsed().as_millis() as u64;
|
||||||
|
if post_ids.contains(&lid) {
|
||||||
|
tracing::warn!(lid, "Page contained latest ID, sleeping");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await;
|
||||||
|
|
||||||
|
latest_id = None;
|
||||||
|
} else if elapsed < 1000 {
|
||||||
|
let delay = 1000 - elapsed;
|
||||||
|
tracing::warn!(delay, "Delaying before next request");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_page_posts(page: &serde_json::Value) -> anyhow::Result<&Vec<serde_json::Value>> {
|
||||||
|
let page = match page {
|
||||||
|
serde_json::Value::Object(ref obj) => obj,
|
||||||
|
_ => return Err(anyhow::anyhow!("Top level object was not an object")),
|
||||||
|
};
|
||||||
|
|
||||||
|
let posts = page
|
||||||
|
.get("posts")
|
||||||
|
.context("Page did not contain posts object")?
|
||||||
|
.as_array()
|
||||||
|
.context("Posts was not an array")?;
|
||||||
|
|
||||||
|
Ok(posts)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_post_ids(posts: &[serde_json::Value]) -> Vec<i32> {
|
||||||
|
let ids: Vec<i32> = posts
|
||||||
|
.iter()
|
||||||
|
.filter_map(|post| {
|
||||||
|
let post = match post {
|
||||||
|
serde_json::Value::Object(post) => post,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let id = match post.get("id")? {
|
||||||
|
serde_json::Value::Number(num) => num.as_i64()? as i32,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(id)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
ids
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(client))]
|
||||||
|
async fn get_latest_id(client: &reqwest::Client) -> anyhow::Result<i32> {
|
||||||
|
tracing::debug!("Looking up current highest ID");
|
||||||
|
|
||||||
|
let query = vec![("limit", "1")];
|
||||||
|
|
||||||
|
let page: serde_json::Value = client
|
||||||
|
.get("https://e621.net/posts.json")
|
||||||
|
.query(&query)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let posts = get_page_posts(&page)?;
|
||||||
|
|
||||||
|
let id = get_post_ids(&posts)
|
||||||
|
.into_iter()
|
||||||
|
.max()
|
||||||
|
.context("Page had no IDs")?;
|
||||||
|
|
||||||
|
tracing::info!(id, "Found maximum ID");
|
||||||
|
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(client))]
|
||||||
|
async fn load_page(client: &reqwest::Client, after_id: i32) -> anyhow::Result<serde_json::Value> {
|
||||||
|
tracing::debug!("Attempting to load page");
|
||||||
|
|
||||||
|
let query = vec![
|
||||||
|
("limit", "320".to_string()),
|
||||||
|
("page", format!("a{}", after_id)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let body = client
|
||||||
|
.get("https://e621.net/posts.json")
|
||||||
|
.query(&query)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ImageData = (Option<i64>, Option<String>, Option<Vec<u8>>);
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(conn, client, post), fields(id))]
|
||||||
|
async fn insert_submission(
|
||||||
|
conn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||||
|
client: &reqwest::Client,
|
||||||
|
post: &serde_json::Value,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let id = post
|
||||||
|
.get("id")
|
||||||
|
.context("Post was missing ID")?
|
||||||
|
.as_i64()
|
||||||
|
.context("Post ID was not number")? as i32;
|
||||||
|
|
||||||
|
tracing::Span::current().record("id", &id);
|
||||||
|
tracing::debug!("Inserting submission");
|
||||||
|
|
||||||
|
tracing::trace!(?post, "Evaluating post");
|
||||||
|
|
||||||
|
let (hash, hash_error, sha256): ImageData = if let Some((url, ext)) = get_post_url_ext(&post) {
|
||||||
|
if url != "/images/deleted-preview.png" && (ext == "jpg" || ext == "png") {
|
||||||
|
load_image(&client, &url).await?
|
||||||
|
} else {
|
||||||
|
tracing::debug!("Ignoring post as it is deleted or not a supported image format");
|
||||||
|
|
||||||
|
(None, None, None)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!("Post had missing URL or extension");
|
||||||
|
|
||||||
|
(None, None, None)
|
||||||
|
};
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
"INSERT INTO e621
|
||||||
|
(id, data, hash, hash_error, sha256) VALUES
|
||||||
|
($1, $2, $3, $4, $5)
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
data = EXCLUDED.data,
|
||||||
|
hash = EXCLUDED.hash,
|
||||||
|
hash_error = EXCLUDED.hash_error,
|
||||||
|
sha256 = EXCLUDED.sha256",
|
||||||
|
id,
|
||||||
|
post,
|
||||||
|
hash,
|
||||||
|
hash_error,
|
||||||
|
sha256
|
||||||
|
)
|
||||||
|
.execute(conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_post_url_ext(post: &serde_json::Value) -> Option<(&str, &str)> {
|
||||||
|
let file = post.as_object()?.get("file")?.as_object()?;
|
||||||
|
|
||||||
|
let url = file.get("url")?.as_str()?;
|
||||||
|
let ext = file.get("ext")?.as_str()?;
|
||||||
|
|
||||||
|
Some((url, ext))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(err, skip(client))]
|
||||||
|
async fn load_image(client: &reqwest::Client, url: &str) -> anyhow::Result<ImageData> {
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
let bytes = client.get(url).send().await?.bytes().await?;
|
||||||
|
|
||||||
|
tracing::trace!(len = bytes.len(), "Got submission image bytes");
|
||||||
|
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(&bytes);
|
||||||
|
let result = hasher.finalize().to_vec();
|
||||||
|
|
||||||
|
tracing::trace!(?result, "Calculated image SHA256");
|
||||||
|
|
||||||
|
let hasher = get_hasher();
|
||||||
|
let img = match image::load_from_memory(&bytes) {
|
||||||
|
Ok(img) => img,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(?err, "Unable to open image");
|
||||||
|
return Ok((None, Some(err.to_string()), Some(result)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::trace!("Opened image successfully");
|
||||||
|
|
||||||
|
let hash = hasher.hash_image(&img);
|
||||||
|
let hash: [u8; 8] = hash.as_bytes().try_into()?;
|
||||||
|
let hash = i64::from_be_bytes(hash);
|
||||||
|
|
||||||
|
tracing::trace!(?hash, "Calculated image hash");
|
||||||
|
|
||||||
|
Ok((Some(hash), None, Some(result)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_hasher() -> img_hash::Hasher<[u8; 8]> {
|
||||||
|
img_hash::HasherConfig::with_bytes_type::<[u8; 8]>()
|
||||||
|
.hash_alg(img_hash::HashAlg::Gradient)
|
||||||
|
.hash_size(8, 8)
|
||||||
|
.preproc_dct()
|
||||||
|
.to_hasher()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn provide_metrics(
|
||||||
|
_: hyper::Request<hyper::Body>,
|
||||||
|
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
|
||||||
|
use hyper::{Body, Response};
|
||||||
|
use prometheus::{Encoder, TextEncoder};
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
let encoder = TextEncoder::new();
|
||||||
|
|
||||||
|
let metric_families = prometheus::gather();
|
||||||
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||||
|
|
||||||
|
Ok(Response::new(Body::from(buffer)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_metrics_server() {
|
||||||
|
use hyper::{
|
||||||
|
service::{make_service_fn, service_fn},
|
||||||
|
Server,
|
||||||
|
};
|
||||||
|
use std::convert::Infallible;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
let make_svc =
|
||||||
|
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(provide_metrics)) });
|
||||||
|
|
||||||
|
let addr: SocketAddr = std::env::var("METRICS_HOST")
|
||||||
|
.expect("Missing METRICS_HOST")
|
||||||
|
.parse()
|
||||||
|
.expect("Invalid METRICS_HOST");
|
||||||
|
|
||||||
|
let server = Server::bind(&addr).serve(make_svc);
|
||||||
|
|
||||||
|
tokio::spawn(async move { server.await.expect("Metrics server error") });
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_get_latest_id() {
|
||||||
|
let client = reqwest::ClientBuilder::new()
|
||||||
|
.user_agent(super::USER_AGENT)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let latest_id = super::get_latest_id(&client)
|
||||||
|
.await
|
||||||
|
.expect("No error should occur");
|
||||||
|
|
||||||
|
assert!(latest_id > 1_000_000, "Latest ID should be reasonably high");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user