Skip to content
This repository was archived by the owner on Aug 29, 2025. It is now read-only.

Commit 0a8aec7

Browse files
committed
feat(queue): validate jobs table shouldbe there
1 parent 9481858 commit 0a8aec7

File tree

1 file changed

+47
-44
lines changed

1 file changed

+47
-44
lines changed

pkg/queue/queue.go

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,54 +27,57 @@ func New(bufferSize int) *Queue {
2727
}
2828

2929
func (q *Queue) Start(workerCount int) {
30-
for i := 0; i < workerCount; i++ {
31-
go func() {
32-
for {
33-
var jobRecord models.Job
34-
err := db.Connect.
35-
Where("state = ? AND available_at <= ?", models.JobPending, time.Now()).
36-
Order("created_at ASC").
37-
First(&jobRecord).Error
38-
39-
if err != nil {
40-
time.Sleep(1 * time.Second)
41-
continue
42-
}
43-
44-
job, err := ResolveJob(jobRecord.Type, jobRecord.Payload)
45-
if err != nil {
46-
failJob(&jobRecord, err)
47-
continue
48-
}
30+
if db.Connect.Migrator().HasTable(&models.Job{}) {
31+
for i := 0; i < workerCount; i++ {
32+
go func() {
33+
for {
34+
var jobRecord models.Job
35+
err := db.Connect.
36+
Where("state = ? AND available_at <= ?", models.JobPending, time.Now()).
37+
Order("created_at ASC").
38+
First(&jobRecord).Error
39+
40+
if err != nil {
41+
time.Sleep(1 * time.Second)
42+
continue
43+
}
4944

50-
start := time.Now()
51-
db.Connect.Model(&jobRecord).Updates(models.Job{
52-
State: models.JobStarted,
53-
StartedAt: &start,
54-
})
45+
job, err := ResolveJob(jobRecord.Type, jobRecord.Payload)
46+
if err != nil {
47+
failJob(&jobRecord, err)
48+
continue
49+
}
5550

56-
err = job.Handle(jobRecord.Payload)
51+
start := time.Now()
52+
db.Connect.Model(&jobRecord).Updates(models.Job{
53+
State: models.JobStarted,
54+
StartedAt: &start,
55+
})
5756

58-
if err != nil {
59-
jobRecord.Attempts++
60-
if jobRecord.Attempts >= job.MaxAttempts() {
61-
failJob(&jobRecord, err)
57+
err = job.Handle(jobRecord.Payload)
58+
59+
if err != nil {
60+
jobRecord.Attempts++
61+
if jobRecord.Attempts >= job.MaxAttempts() {
62+
failJob(&jobRecord, err)
63+
} else {
64+
db.Connect.Model(&jobRecord).Updates(models.Job{
65+
State: models.JobPending,
66+
ErrorMsg: err.Error(),
67+
Attempts: jobRecord.Attempts,
68+
AvailableAt: time.Now().Add(job.RetryAfter()),
69+
})
70+
}
6271
} else {
6372
db.Connect.Model(&jobRecord).Updates(models.Job{
64-
State: models.JobPending,
65-
ErrorMsg: err.Error(),
66-
Attempts: jobRecord.Attempts,
67-
AvailableAt: time.Now().Add(job.RetryAfter()),
73+
State: models.JobFinished,
74+
FinishedAt: ptr(time.Now()),
6875
})
6976
}
70-
} else {
71-
db.Connect.Model(&jobRecord).Updates(models.Job{
72-
State: models.JobFinished,
73-
FinishedAt: ptr(time.Now()),
74-
})
7577
}
76-
}
77-
}()
78+
}()
79+
}
80+
7881
}
7982
}
8083

@@ -134,11 +137,11 @@ func Dispatch(job Job, params ...any) error {
134137
Type: job.Type(),
135138
Payload: params,
136139
})
137-
if err != nil {
138-
return fmt.Errorf("failed to save job to DB: %w", err)
139-
}
140+
if err != nil {
141+
return fmt.Errorf("failed to save job to DB: %w", err)
142+
}
140143

141-
return nil
144+
return nil
142145
}
143146

144147
func SaveJobToDB(req JobEnqueueRequest) (*models.Job, error) {

0 commit comments

Comments
 (0)