Load next posts to get hashes for in background.

This commit is contained in:
Syfaro 2020-01-13 16:46:09 -06:00
parent af15b20f8e
commit 5dc771421c

View File

@ -34,6 +34,37 @@ async fn hash_url(
Ok((hash, num)) Ok((hash, num))
} }
async fn load_next_posts(
db: Pool<PostgresConnectionManager<tokio_postgres::NoTls>>,
) -> Vec<NeededPost> {
db.get()
.await
.unwrap()
.query(
"SELECT
id,
data->>'file_url' file_url
FROM
post
WHERE
hash IS NULL AND
hash_error IS NULL AND
data->>'file_ext' IN ('jpg', 'png') AND
data->>'file_url' <> '/images/deleted-preview.png'
ORDER BY id DESC
LIMIT 384",
&[],
)
.await
.expect("Unable to get posts")
.into_iter()
.map(|row| NeededPost {
id: row.get("id"),
full_url: row.get("file_url"),
})
.collect()
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn"); let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn");
@ -55,42 +86,23 @@ async fn main() {
.expect("Unable to build http client"); .expect("Unable to build http client");
let client = std::sync::Arc::new(client); let client = std::sync::Arc::new(client);
let mut needed_posts = load_next_posts(pool.clone()).await;
loop { loop {
println!("getting next 384 posts"); println!("running loop");
if needed_posts.is_empty() {
println!("no posts, waiting a minute");
tokio::time::delay_for(std::time::Duration::from_secs(60)).await;
continue;
}
let db = pool.clone(); let db = pool.clone();
let posts_fut = tokio::spawn(async move { load_next_posts(db).await });
let needed_posts: Vec<_> = db
.get()
.await
.unwrap()
.query(
"SELECT
id,
data->>'file_url' file_url
FROM
post
WHERE
hash IS NULL AND
hash_error IS NULL AND
data->>'file_ext' IN ('jpg', 'png') AND
data->>'file_url' <> '/images/deleted-preview.png'
ORDER BY id DESC
LIMIT 384",
&[],
)
.await
.expect("Unable to get posts")
.into_iter()
.map(|row| NeededPost {
id: row.get("id"),
full_url: row.get("file_url"),
})
.collect();
for chunk in needed_posts.chunks(8) { for chunk in needed_posts.chunks(8) {
let futs = chunk.iter().map(|post| { let futs = chunk.iter().map(|post| {
let db = db.clone(); let db = pool.clone();
let client = client.clone(); let client = client.clone();
let id = post.id; let id = post.id;
@ -116,7 +128,7 @@ async fn main() {
&[&id, &desc], &[&id, &desc],
) )
.await .await
.expect("Unable to update hash in database"); .expect("Unable to update hash error in database");
} }
}; };
}) })
@ -124,5 +136,7 @@ async fn main() {
futures::future::join_all(futs).await; futures::future::join_all(futs).await;
} }
needed_posts = posts_fut.await.unwrap();
} }
} }