Skip to content

Commit d9803df

Browse files
committed
worker/background_job: Remove sync fns
1 parent c645ab9 commit d9803df

File tree

16 files changed

+61
-166
lines changed

16 files changed

+61
-166
lines changed

crates/crates_io_worker/src/background_job.rs

Lines changed: 6 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use crate::errors::EnqueueError;
22
use crate::schema::background_jobs;
3-
use diesel::connection::LoadConnection;
43
use diesel::dsl::{exists, not};
5-
use diesel::pg::Pg;
64
use diesel::sql_types::{Int2, Jsonb, Text};
75
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
8-
use diesel_async::AsyncPgConnection;
6+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
97
use serde::de::DeserializeOwned;
108
use serde::Serialize;
119
use serde_json::Value;
@@ -40,85 +38,28 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
4038
/// Execute the task. This method should define its logic.
4139
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
4240

43-
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
44-
fn enqueue(
45-
&self,
46-
conn: &mut impl LoadConnection<Backend = Pg>,
47-
) -> Result<Option<i64>, EnqueueError> {
48-
let data = serde_json::to_value(self)?;
49-
let priority = Self::PRIORITY;
50-
51-
if Self::DEDUPLICATED {
52-
Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority)?)
53-
} else {
54-
Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?))
55-
}
56-
}
57-
5841
#[allow(async_fn_in_trait)]
5942
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
60-
async fn async_enqueue(
61-
&self,
62-
conn: &mut AsyncPgConnection,
63-
) -> Result<Option<i64>, EnqueueError> {
43+
async fn enqueue(&self, conn: &mut AsyncPgConnection) -> Result<Option<i64>, EnqueueError> {
6444
let data = serde_json::to_value(self)?;
6545
let priority = Self::PRIORITY;
6646

6747
if Self::DEDUPLICATED {
68-
Ok(async_enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
48+
Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
6949
} else {
7050
Ok(Some(
71-
async_enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
51+
enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
7252
))
7353
}
7454
}
7555
}
7656

77-
fn enqueue_deduplicated(
78-
conn: &mut impl LoadConnection<Backend = Pg>,
79-
job_type: &str,
80-
data: &Value,
81-
priority: i16,
82-
) -> Result<Option<i64>, EnqueueError> {
83-
use diesel::RunQueryDsl;
84-
85-
let similar_jobs = background_jobs::table
86-
.select(background_jobs::id)
87-
.filter(background_jobs::job_type.eq(job_type))
88-
.filter(background_jobs::data.eq(data))
89-
.filter(background_jobs::priority.eq(priority))
90-
.for_update()
91-
.skip_locked();
92-
93-
let deduplicated_select = diesel::select((
94-
job_type.into_sql::<Text>(),
95-
data.into_sql::<Jsonb>(),
96-
priority.into_sql::<Int2>(),
97-
))
98-
.filter(not(exists(similar_jobs)));
99-
100-
let id = diesel::insert_into(background_jobs::table)
101-
.values(deduplicated_select)
102-
.into_columns((
103-
background_jobs::job_type,
104-
background_jobs::data,
105-
background_jobs::priority,
106-
))
107-
.returning(background_jobs::id)
108-
.get_result::<i64>(conn)
109-
.optional()?;
110-
111-
Ok(id)
112-
}
113-
114-
async fn async_enqueue_deduplicated(
57+
async fn enqueue_deduplicated(
11558
conn: &mut AsyncPgConnection,
11659
job_type: &str,
11760
data: &Value,
11861
priority: i16,
11962
) -> Result<Option<i64>, EnqueueError> {
120-
use diesel_async::RunQueryDsl;
121-
12263
let similar_jobs = background_jobs::table
12364
.select(background_jobs::id)
12465
.filter(background_jobs::job_type.eq(job_type))
@@ -149,34 +90,12 @@ async fn async_enqueue_deduplicated(
14990
Ok(id)
15091
}
15192

152-
fn enqueue_simple(
153-
conn: &mut impl LoadConnection<Backend = Pg>,
154-
job_type: &str,
155-
data: &Value,
156-
priority: i16,
157-
) -> Result<i64, EnqueueError> {
158-
use diesel::RunQueryDsl;
159-
160-
let id = diesel::insert_into(background_jobs::table)
161-
.values((
162-
background_jobs::job_type.eq(job_type),
163-
background_jobs::data.eq(data),
164-
background_jobs::priority.eq(priority),
165-
))
166-
.returning(background_jobs::id)
167-
.get_result(conn)?;
168-
169-
Ok(id)
170-
}
171-
172-
async fn async_enqueue_simple(
93+
async fn enqueue_simple(
17394
conn: &mut AsyncPgConnection,
17495
job_type: &str,
17596
data: &Value,
17697
priority: i16,
17798
) -> Result<i64, EnqueueError> {
178-
use diesel_async::RunQueryDsl;
179-
18099
let id = diesel::insert_into(background_jobs::table)
181100
.values((
182101
background_jobs::job_type.eq(job_type),

crates/crates_io_worker/tests/runner.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> {
7676

7777
let runner = runner(pool, test_context.clone()).register_job_type::<TestJob>();
7878

79-
let job_id = assert_some!(TestJob.async_enqueue(&mut conn).await?);
79+
let job_id = assert_some!(TestJob.enqueue(&mut conn).await?);
8080

8181
assert!(job_exists(job_id, &mut conn).await?);
8282
assert!(!job_is_locked(job_id, &mut conn).await?);
@@ -122,7 +122,7 @@ async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> {
122122

123123
assert_eq!(remaining_jobs(&mut conn).await?, 0);
124124

125-
TestJob.async_enqueue(&mut conn).await?;
125+
TestJob.enqueue(&mut conn).await?;
126126
assert_eq!(remaining_jobs(&mut conn).await?, 1);
127127

128128
let runner = runner.start();
@@ -163,7 +163,7 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow:
163163

164164
let runner = runner(pool, test_context.clone()).register_job_type::<TestJob>();
165165

166-
TestJob.async_enqueue(&mut conn).await?;
166+
TestJob.enqueue(&mut conn).await?;
167167

168168
let runner = runner.start();
169169
test_context.job_started_barrier.wait().await;
@@ -214,7 +214,7 @@ async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> {
214214

215215
let runner = runner(pool, ()).register_job_type::<TestJob>();
216216

217-
let job_id = assert_some!(TestJob.async_enqueue(&mut conn).await?);
217+
let job_id = assert_some!(TestJob.enqueue(&mut conn).await?);
218218

219219
let runner = runner.start();
220220
runner.wait_for_shutdown().await;
@@ -282,11 +282,11 @@ async fn jobs_can_be_deduplicated() -> anyhow::Result<()> {
282282
.shutdown_when_queue_empty();
283283

284284
// Enqueue first job
285-
assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await?);
285+
assert_some!(TestJob::new("foo").enqueue(&mut conn).await?);
286286
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}]]"#);
287287

288288
// Try to enqueue the same job again, which should be deduplicated
289-
assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await?);
289+
assert_none!(TestJob::new("foo").enqueue(&mut conn).await?);
290290
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}]]"#);
291291

292292
// Start processing the first job
@@ -295,16 +295,16 @@ async fn jobs_can_be_deduplicated() -> anyhow::Result<()> {
295295

296296
// Enqueue the same job again, which should NOT be deduplicated,
297297
// since the first job already still running
298-
assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await?);
298+
assert_some!(TestJob::new("foo").enqueue(&mut conn).await?);
299299
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
300300

301301
// Try to enqueue the same job again, which should be deduplicated again
302-
assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await?);
302+
assert_none!(TestJob::new("foo").enqueue(&mut conn).await?);
303303
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
304304

305305
// Enqueue the same job but with different data, which should
306306
// NOT be deduplicated
307-
assert_some!(TestJob::new("bar").async_enqueue(&mut conn).await?);
307+
assert_some!(TestJob::new("bar").enqueue(&mut conn).await?);
308308
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#);
309309

310310
// Resolve the final barrier to finish the test

src/bin/crates-admin/delete_crate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,18 +109,18 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
109109

110110
info!("{name}: Enqueuing index sync jobs…");
111111
let job = jobs::SyncToGitIndex::new(name);
112-
if let Err(error) = job.async_enqueue(&mut conn).await {
112+
if let Err(error) = job.enqueue(&mut conn).await {
113113
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
114114
}
115115

116116
let job = jobs::SyncToSparseIndex::new(name);
117-
if let Err(error) = job.async_enqueue(&mut conn).await {
117+
if let Err(error) = job.enqueue(&mut conn).await {
118118
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
119119
}
120120

121121
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
122122
let job = jobs::DeleteCrateFromStorage::new(name.into());
123-
if let Err(error) = job.async_enqueue(&mut conn).await {
123+
if let Err(error) = job.enqueue(&mut conn).await {
124124
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
125125
}
126126
}

src/bin/crates-admin/delete_version.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
9696

9797
info!(%crate_name, "Enqueuing index sync jobs");
9898
let job = jobs::SyncToGitIndex::new(crate_name);
99-
if let Err(error) = job.async_enqueue(&mut conn).await {
99+
if let Err(error) = job.enqueue(&mut conn).await {
100100
warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job");
101101
}
102102
let job = jobs::SyncToSparseIndex::new(crate_name);
103-
if let Err(error) = job.async_enqueue(&mut conn).await {
103+
if let Err(error) = job.enqueue(&mut conn).await {
104104
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");
105105
}
106106

src/bin/crates-admin/enqueue_job.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ pub async fn run(command: Command) -> Result<()> {
6060
before
6161
.map(jobs::ArchiveVersionDownloads::before)
6262
.unwrap_or_default()
63-
.async_enqueue(&mut conn)
63+
.enqueue(&mut conn)
6464
.await?;
6565
}
6666
Command::IndexVersionDownloadsArchive => {
6767
jobs::IndexVersionDownloadsArchive
68-
.async_enqueue(&mut conn)
68+
.enqueue(&mut conn)
6969
.await?;
7070
}
7171
Command::UpdateDownloads => {
@@ -81,16 +81,14 @@ pub async fn run(command: Command) -> Result<()> {
8181
jobs::UpdateDownloads::JOB_NAME
8282
);
8383
} else {
84-
jobs::UpdateDownloads.async_enqueue(&mut conn).await?;
84+
jobs::UpdateDownloads.enqueue(&mut conn).await?;
8585
}
8686
}
8787
Command::CleanProcessedLogFiles => {
88-
jobs::CleanProcessedLogFiles
89-
.async_enqueue(&mut conn)
90-
.await?;
88+
jobs::CleanProcessedLogFiles.enqueue(&mut conn).await?;
9189
}
9290
Command::DumpDb => {
93-
jobs::DumpDb.async_enqueue(&mut conn).await?;
91+
jobs::DumpDb.enqueue(&mut conn).await?;
9492
}
9593
Command::SyncAdmins { force } => {
9694
if !force {
@@ -112,20 +110,20 @@ pub async fn run(command: Command) -> Result<()> {
112110
}
113111
}
114112

115-
jobs::SyncAdmins.async_enqueue(&mut conn).await?;
113+
jobs::SyncAdmins.enqueue(&mut conn).await?;
116114
}
117115
Command::DailyDbMaintenance => {
118-
jobs::DailyDbMaintenance.async_enqueue(&mut conn).await?;
116+
jobs::DailyDbMaintenance.enqueue(&mut conn).await?;
119117
}
120118
Command::ProcessCdnLogQueue(job) => {
121-
job.async_enqueue(&mut conn).await?;
119+
job.enqueue(&mut conn).await?;
122120
}
123121
Command::SquashIndex => {
124-
jobs::SquashIndex.async_enqueue(&mut conn).await?;
122+
jobs::SquashIndex.enqueue(&mut conn).await?;
125123
}
126124
Command::NormalizeIndex { dry_run } => {
127125
jobs::NormalizeIndex::new(dry_run)
128-
.async_enqueue(&mut conn)
126+
.enqueue(&mut conn)
129127
.await?;
130128
}
131129
Command::CheckTyposquat { name } => {
@@ -142,30 +140,26 @@ pub async fn run(command: Command) -> Result<()> {
142140
);
143141
}
144142

145-
jobs::CheckTyposquat::new(&name)
146-
.async_enqueue(&mut conn)
147-
.await?;
143+
jobs::CheckTyposquat::new(&name).enqueue(&mut conn).await?;
148144
}
149145
Command::SendTokenExpiryNotifications => {
150146
jobs::SendTokenExpiryNotifications
151-
.async_enqueue(&mut conn)
147+
.enqueue(&mut conn)
152148
.await?;
153149
}
154150
Command::SyncCratesFeed => {
155-
jobs::rss::SyncCratesFeed.async_enqueue(&mut conn).await?;
151+
jobs::rss::SyncCratesFeed.enqueue(&mut conn).await?;
156152
}
157153
Command::SyncToGitIndex { name } => {
158-
jobs::SyncToGitIndex::new(name)
159-
.async_enqueue(&mut conn)
160-
.await?;
154+
jobs::SyncToGitIndex::new(name).enqueue(&mut conn).await?;
161155
}
162156
Command::SyncToSparseIndex { name } => {
163157
jobs::SyncToSparseIndex::new(name)
164-
.async_enqueue(&mut conn)
158+
.enqueue(&mut conn)
165159
.await?;
166160
}
167161
Command::SyncUpdatesFeed => {
168-
jobs::rss::SyncUpdatesFeed.async_enqueue(&mut conn).await?;
162+
jobs::rss::SyncUpdatesFeed.enqueue(&mut conn).await?;
169163
}
170164
};
171165

src/bin/crates-admin/yank_version.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,11 @@ async fn yank(opts: Opts, conn: &mut AsyncPgConnection) -> anyhow::Result<()> {
6666
.execute(conn)
6767
.await?;
6868

69-
SyncToGitIndex::new(&krate.name).async_enqueue(conn).await?;
69+
SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
7070

71-
SyncToSparseIndex::new(&krate.name)
72-
.async_enqueue(conn)
73-
.await?;
71+
SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;
7472

75-
UpdateDefaultVersion::new(krate.id)
76-
.async_enqueue(conn)
77-
.await?;
73+
UpdateDefaultVersion::new(krate.id).enqueue(conn).await?;
7874

7975
Ok(())
8076
}

0 commit comments

Comments
 (0)