Fix load_hashes.

This commit is contained in:
Syfaro 2020-09-23 15:49:55 -04:00
parent 2a2c3b88fc
commit 9f70ae0aca

View File

@ -1,6 +1,6 @@
use bb8::Pool; use bb8::Pool;
use bb8_postgres::PostgresConnectionManager; use bb8_postgres::PostgresConnectionManager;
use futures::future::FutureExt; use futures::StreamExt;
struct NeededPost { struct NeededPost {
id: i32, id: i32,
@ -8,9 +8,10 @@ struct NeededPost {
} }
async fn hash_url( async fn hash_url(
id: i32,
client: std::sync::Arc<reqwest::Client>, client: std::sync::Arc<reqwest::Client>,
url: String, url: String,
) -> Result<(img_hash::ImageHash<[u8; 8]>, i64), image::ImageError> { ) -> (i32, Result<i64, image::ImageError>) {
let data = client let data = client
.get(&url) .get(&url)
.send() .send()
@ -25,7 +26,7 @@ async fn hash_url(
Ok(image) => image, Ok(image) => image,
Err(e) => { Err(e) => {
println!("{:?}", &data[0..50]); println!("{:?}", &data[0..50]);
return Err(e); return (id, Err(e));
} }
}; };
@ -37,7 +38,7 @@ async fn hash_url(
println!("{} - {}", url, num); println!("{} - {}", url, num);
Ok((hash, num)) (id, Ok(num))
} }
async fn load_next_posts( async fn load_next_posts(
@ -103,59 +104,50 @@ async fn main() {
continue; continue;
} }
for chunk in needed_posts.chunks(8) { futures::stream::iter(
let futs = chunk.iter().map(|post| { needed_posts
let db = pool.clone(); .into_iter()
let client = client.clone(); .map(|post| hash_url(post.id, client.clone(), post.full_url)),
let id = post.id; )
.buffer_unordered(8)
.for_each(|res: (i32, Result<i64, image::ImageError>)| async {
let db = pool.get().await.expect("unable to get from pool");
hash_url(client, post.full_url.clone()).then(move |res| async move { match res {
match res { (id, Ok(num)) => {
Ok((_hash, num)) => { let mut conn = pool.get().await.unwrap();
let mut conn = db.get().await.unwrap();
let tx = conn let tx = conn
.transaction() .transaction()
.await .await
.expect("Unable to create transaction"); .expect("Unable to create transaction");
tx.execute("UPDATE e621 SET hash = $2 WHERE id = $1", &[&id, &num]) tx.execute("UPDATE e621 SET hash = $2 WHERE id = $1", &[&id, &num])
.await .await
.expect("Unable to update hash in database"); .expect("Unable to update hash in database");
tx.execute( tx.execute(
"INSERT INTO hashes (e621_id, hash) VALUES ($1, $2)", "INSERT INTO hashes (e621_id, hash) VALUES ($1, $2)",
&[&id, &num], &[&id, &num],
) )
.await .await
.expect("Unable to insert hash to hashes table"); .expect("Unable to insert hash to hashes table");
tx.commit().await.expect("Unable to commit tx"); tx.commit().await.expect("Unable to commit tx");
}
drop(conn); (id, Err(e)) => {
} let desc = e.to_string();
Err(e) => { println!("[{}] hashing error - {}", id, desc);
let desc = e.to_string(); db.execute(
println!("[{}] hashing error - {}", id, desc); "UPDATE e621 SET hash_error = $2 WHERE id = $1",
db.get() &[&id, &desc],
.await )
.unwrap() .await
.execute( .expect("Unable to update hash error in database");
"UPDATE e621 SET hash_error = $2 WHERE id = $1", }
&[&id, &desc], }
) ()
.await })
.expect("Unable to update hash error in database"); .await;
}
};
})
});
println!("joining futs");
futures::future::join_all(futs).await;
println!("futs completed");
}
} }
} }