diff --git a/src/bin/load_hashes.rs b/src/bin/load_hashes.rs index 7c3a2c8..6f0e30a 100644 --- a/src/bin/load_hashes.rs +++ b/src/bin/load_hashes.rs @@ -1,6 +1,6 @@ use bb8::Pool; use bb8_postgres::PostgresConnectionManager; -use futures::future::FutureExt; +use futures::StreamExt; struct NeededPost { id: i32, @@ -8,9 +8,10 @@ struct NeededPost { } async fn hash_url( + id: i32, client: std::sync::Arc, url: String, -) -> Result<(img_hash::ImageHash<[u8; 8]>, i64), image::ImageError> { +) -> (i32, Result) { let data = client .get(&url) .send() @@ -25,7 +26,7 @@ async fn hash_url( Ok(image) => image, Err(e) => { println!("{:?}", &data[0..50]); - return Err(e); + return (id, Err(e)); } }; @@ -37,7 +38,7 @@ async fn hash_url( println!("{} - {}", url, num); - Ok((hash, num)) + (id, Ok(num)) } async fn load_next_posts( @@ -103,59 +104,50 @@ async fn main() { continue; } - for chunk in needed_posts.chunks(8) { - let futs = chunk.iter().map(|post| { - let db = pool.clone(); - let client = client.clone(); - let id = post.id; + futures::stream::iter( + needed_posts + .into_iter() + .map(|post| hash_url(post.id, client.clone(), post.full_url)), + ) + .buffer_unordered(8) + .for_each(|res: (i32, Result)| async { + let db = pool.get().await.expect("unable to get from pool"); - hash_url(client, post.full_url.clone()).then(move |res| async move { - match res { - Ok((_hash, num)) => { - let mut conn = db.get().await.unwrap(); + match res { + (id, Ok(num)) => { + let mut conn = pool.get().await.unwrap(); - let tx = conn - .transaction() - .await - .expect("Unable to create transaction"); + let tx = conn + .transaction() + .await + .expect("Unable to create transaction"); - tx.execute("UPDATE e621 SET hash = $2 WHERE id = $1", &[&id, &num]) - .await - .expect("Unable to update hash in database"); + tx.execute("UPDATE e621 SET hash = $2 WHERE id = $1", &[&id, &num]) + .await + .expect("Unable to update hash in database"); - tx.execute( - "INSERT INTO hashes (e621_id, hash) VALUES ($1, $2)", - &[&id, &num], - ) - .await - .expect("Unable to insert hash to hashes table"); + tx.execute( + "INSERT INTO hashes (e621_id, hash) VALUES ($1, $2)", + &[&id, &num], + ) + .await + .expect("Unable to insert hash to hashes table"); - tx.commit().await.expect("Unable to commit tx"); - - drop(conn); - } - Err(e) => { - let desc = e.to_string(); - println!("[{}] hashing error - {}", id, desc); - db.get() - .await - .unwrap() - .execute( - "UPDATE e621 SET hash_error = $2 WHERE id = $1", - &[&id, &desc], - ) - .await - .expect("Unable to update hash error in database"); - } - }; - }) - }); - - println!("joining futs"); - - futures::future::join_all(futs).await; - - println!("futs completed"); - } + tx.commit().await.expect("Unable to commit tx"); + } + (id, Err(e)) => { + let desc = e.to_string(); + println!("[{}] hashing error - {}", id, desc); + db.execute( + "UPDATE e621 SET hash_error = $2 WHERE id = $1", + &[&id, &desc], + ) + .await + .expect("Unable to update hash error in database"); + } + } + () + }) + .await; } }