diff --git a/src/bin/load_hashes.rs b/src/bin/load_hashes.rs index 8611bbe..5663c6c 100644 --- a/src/bin/load_hashes.rs +++ b/src/bin/load_hashes.rs @@ -34,6 +34,37 @@ async fn hash_url( Ok((hash, num)) } +async fn load_next_posts( + db: Pool>, +) -> Vec { + db.get() + .await + .unwrap() + .query( + "SELECT + id, + data->>'file_url' file_url + FROM + post + WHERE + hash IS NULL AND + hash_error IS NULL AND + data->>'file_ext' IN ('jpg', 'png') AND + data->>'file_url' <> '/images/deleted-preview.png' + ORDER BY id DESC + LIMIT 384", + &[], + ) + .await + .expect("Unable to get posts") + .into_iter() + .map(|row| NeededPost { + id: row.get("id"), + full_url: row.get("file_url"), + }) + .collect() +} + #[tokio::main] async fn main() { let dsn = std::env::var("POSTGRES_DSN").expect("missing postgres dsn"); @@ -55,42 +86,23 @@ async fn main() { .expect("Unable to build http client"); let client = std::sync::Arc::new(client); + let mut needed_posts = load_next_posts(pool.clone()).await; + loop { - println!("getting next 384 posts"); + println!("running loop"); + + if needed_posts.is_empty() { + println!("no posts, waiting a minute"); + tokio::time::delay_for(std::time::Duration::from_secs(60)).await; + continue; + } let db = pool.clone(); - - let needed_posts: Vec<_> = db - .get() - .await - .unwrap() - .query( - "SELECT - id, - data->>'file_url' file_url - FROM - post - WHERE - hash IS NULL AND - hash_error IS NULL AND - data->>'file_ext' IN ('jpg', 'png') AND - data->>'file_url' <> '/images/deleted-preview.png' - ORDER BY id DESC - LIMIT 384", - &[], - ) - .await - .expect("Unable to get posts") - .into_iter() - .map(|row| NeededPost { - id: row.get("id"), - full_url: row.get("file_url"), - }) - .collect(); + let posts_fut = tokio::spawn(async move { load_next_posts(db).await }); for chunk in needed_posts.chunks(8) { let futs = chunk.iter().map(|post| { - let db = db.clone(); + let db = pool.clone(); let client = client.clone(); let id = post.id; @@ -116,7 +128,7 @@ async fn main() { &[&id, &desc], ) .await - .expect("Unable to update hash in database"); + .expect("Unable to update hash error in database"); } }; }) @@ -124,5 +136,7 @@ async fn main() { futures::future::join_all(futs).await; } + + needed_posts = posts_fut.await.unwrap(); } }