Skip to content
This repository was archived by the owner on Aug 29, 2025. It is now read-only.

Commit 9481858

Browse files
committed
feat(queue): init queue
1 parent 0eeb117 commit 9481858

File tree

9 files changed

+367
-35
lines changed

9 files changed

+367
-35
lines changed

Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ help:
8787
@echo "🏗️ Code generation:"
8888
@echo " model Generate a new model"
8989
@echo " dto Generate a new DTO"
90-
@echo " cron Generate a new cron job"
90+
@echo " cron Generate a new cron scheduler file"
91+
@echo " job Generate a new listener queue job file"
9192
@echo ""
9293
@echo "💡 Examples:"
9394
@echo " make dev # Start development server"
@@ -171,3 +172,8 @@ dto:
171172
cron:
172173
@read -p "Enter cron file name: " cron_file; \
173174
./scripts/generate_cronfile.sh $$cron_file
175+
176+
.PHONY: job
177+
job:
178+
@read -p "Enter job file name: " job_file; \
179+
./scripts/generate_job.sh $$job_file

db/connect.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package db
33
import (
44
"fmt"
55
"log"
6+
"os"
7+
"time"
68

79
"github.com/sheenazien8/goplate/env"
810
"gorm.io/driver/mysql"
911
"gorm.io/driver/postgres"
1012
"gorm.io/gorm"
13+
"gorm.io/gorm/logger"
1114
)
1215

1316
var Connect *gorm.DB
@@ -18,6 +21,20 @@ func ConnectDB() {
1821
var dbType = env.Get("DB_CONNECTION")
1922
var db *gorm.DB
2023

24+
var gormConfig = &gorm.Config{
25+
DisableForeignKeyConstraintWhenMigrating: true,
26+
Logger: logger.New(
27+
log.New(os.Stdout, "\r\n", log.LstdFlags),
28+
logger.Config{
29+
SlowThreshold: time.Second,
30+
LogLevel: logger.Warn,
31+
IgnoreRecordNotFoundError: true,
32+
ParameterizedQueries: true,
33+
Colorful: true,
34+
},
35+
),
36+
}
37+
2138
switch dbType {
2239
case "postgres":
2340
p := env.Get("DB_PORT")
@@ -32,9 +49,7 @@ func ConnectDB() {
3249

3350
db, err = gorm.Open(
3451
postgres.Open(dsn),
35-
&gorm.Config{
36-
DisableForeignKeyConstraintWhenMigrating: true,
37-
},
52+
gormConfig,
3853
)
3954

4055
case "mysql":
@@ -48,9 +63,7 @@ func ConnectDB() {
4863

4964
db, err = gorm.Open(
5065
mysql.Open(dsn),
51-
&gorm.Config{
52-
DisableForeignKeyConstraintWhenMigrating: true,
53-
},
66+
gormConfig,
5467
)
5568
default:
5669
log.Panic("Unsupported database type")

main.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,43 +3,18 @@ package main
33
import (
44
"encoding/json"
55
"errors"
6-
"fmt"
7-
"io"
8-
"os"
9-
"path"
10-
"time"
116

127
"github.com/gofiber/fiber/v2"
13-
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
148
"github.com/sheenazien8/goplate/db"
159
"github.com/sheenazien8/goplate/env"
1610
"github.com/sheenazien8/goplate/logs"
11+
"github.com/sheenazien8/goplate/pkg/queue"
1712
"github.com/sheenazien8/goplate/pkg/scheduler"
1813
"github.com/sheenazien8/goplate/pkg/utils"
1914
"github.com/sheenazien8/goplate/router"
2015
)
2116

2217
func main() {
23-
logPath := "logs/"
24-
25-
if _, err := os.Stat(logPath); os.IsNotExist(err) {
26-
if os.Mkdir(logPath, os.ModePerm) != nil {
27-
logs.Fatal("Error creating log path")
28-
}
29-
}
30-
31-
logFile, err := rotatelogs.New(
32-
path.Join(logPath, "app_%Y%m%d.log"),
33-
rotatelogs.WithLinkName(path.Join(logPath, "app.log")),
34-
rotatelogs.WithRotationTime(24*time.Hour),
35-
rotatelogs.WithMaxAge(7*24*time.Hour),
36-
)
37-
if err != nil {
38-
fmt.Println("Error creating log file:", err)
39-
}
40-
multiLogFile := io.MultiWriter(logFile, os.Stdout)
41-
logs.SetOutput(multiLogFile)
42-
4318
screet := env.Get("APP_SCREET")
4419
if screet == "" {
4520
logs.Fatal("You must generate the screet key first")
@@ -75,8 +50,11 @@ func main() {
7550

7651
router.SetupRouter(app)
7752

53+
q := queue.New(100)
54+
q.Start(5)
55+
7856
sch := scheduler.New()
79-
sch.RunTasks()
57+
sch.RunTasks()
8058
sch.Start()
8159

8260
if err := app.Listen(":" + p); err != nil {

pkg/models/job.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package models
2+
3+
import (
4+
"encoding/json"
5+
"time"
6+
)
7+
8+
type JobState string
9+
10+
const (
11+
JobPending JobState = "pending"
12+
JobStarted JobState = "started"
13+
JobFinished JobState = "finished"
14+
JobFailed JobState = "failed"
15+
)
16+
17+
type Job struct {
18+
ID uint `gorm:"primaryKey" json:"id"`
19+
Type string `gorm:"not null" json:"type"`
20+
Payload json.RawMessage `gorm:"type:text" json:"payload"`
21+
State JobState `gorm:"type:varchar(16);not null" json:"state"`
22+
ErrorMsg string `json:"error_msg"`
23+
Attempts int `json:"attempts"`
24+
AvailableAt time.Time `json:"available_at"`
25+
CreatedAt time.Time `json:"created_at"`
26+
StartedAt *time.Time `json:"started_at"`
27+
FinishedAt *time.Time `json:"finished_at"`
28+
}

pkg/queue/queue.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package queue
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/sheenazien8/goplate/db"
10+
"github.com/sheenazien8/goplate/pkg/models"
11+
)
12+
13+
// Task defines the work to be processed by the queue
14+
type Task func()
15+
16+
// Queue represents a simple in-memory task queue with worker pool
17+
type Queue struct {
18+
tasks chan Task
19+
wg sync.WaitGroup
20+
}
21+
22+
// New creates a new Queue with the specified buffer size
23+
func New(bufferSize int) *Queue {
24+
return &Queue{
25+
tasks: make(chan Task, bufferSize),
26+
}
27+
}
28+
29+
func (q *Queue) Start(workerCount int) {
30+
for i := 0; i < workerCount; i++ {
31+
go func() {
32+
for {
33+
var jobRecord models.Job
34+
err := db.Connect.
35+
Where("state = ? AND available_at <= ?", models.JobPending, time.Now()).
36+
Order("created_at ASC").
37+
First(&jobRecord).Error
38+
39+
if err != nil {
40+
time.Sleep(1 * time.Second)
41+
continue
42+
}
43+
44+
job, err := ResolveJob(jobRecord.Type, jobRecord.Payload)
45+
if err != nil {
46+
failJob(&jobRecord, err)
47+
continue
48+
}
49+
50+
start := time.Now()
51+
db.Connect.Model(&jobRecord).Updates(models.Job{
52+
State: models.JobStarted,
53+
StartedAt: &start,
54+
})
55+
56+
err = job.Handle(jobRecord.Payload)
57+
58+
if err != nil {
59+
jobRecord.Attempts++
60+
if jobRecord.Attempts >= job.MaxAttempts() {
61+
failJob(&jobRecord, err)
62+
} else {
63+
db.Connect.Model(&jobRecord).Updates(models.Job{
64+
State: models.JobPending,
65+
ErrorMsg: err.Error(),
66+
Attempts: jobRecord.Attempts,
67+
AvailableAt: time.Now().Add(job.RetryAfter()),
68+
})
69+
}
70+
} else {
71+
db.Connect.Model(&jobRecord).Updates(models.Job{
72+
State: models.JobFinished,
73+
FinishedAt: ptr(time.Now()),
74+
})
75+
}
76+
}
77+
}()
78+
}
79+
}
80+
81+
func failJob(job *models.Job, err error) {
82+
db.Connect.Model(job).Updates(models.Job{
83+
State: models.JobFailed,
84+
ErrorMsg: err.Error(),
85+
FinishedAt: ptr(time.Now()),
86+
})
87+
}
88+
89+
func ptr[T any](v T) *T {
90+
return &v
91+
}
92+
93+
// Registry maps job type string to a function that builds and returns a Job
94+
var registry = map[string]func() Job{}
95+
96+
// RegisterJob registers a job handler with a type string
97+
func RegisterJob(job Job) {
98+
registry[job.Type()] = func() Job {
99+
return job
100+
}
101+
}
102+
103+
// ResolveJob looks up and creates a job instance from type
104+
func ResolveJob(typeName string, payload json.RawMessage) (Job, error) {
105+
creator, exists := registry[typeName]
106+
if !exists {
107+
return nil, fmt.Errorf("job type '%s' not registered", typeName)
108+
}
109+
110+
job := creator()
111+
112+
// // optional: decode payload into the job struct if needed
113+
// if err := json.Unmarshal(payload, &job); err != nil {
114+
// return nil, fmt.Errorf("failed to unmarshal payload for '%s': %w", typeName, err)
115+
// }
116+
117+
return job, nil
118+
}
119+
120+
type JobEnqueueRequest struct {
121+
Type string
122+
Payload any
123+
}
124+
125+
type Job interface {
126+
Type() string
127+
Handle(payload json.RawMessage) error
128+
MaxAttempts() int
129+
RetryAfter() time.Duration
130+
}
131+
132+
func Dispatch(job Job, params ...any) error {
133+
_, err := SaveJobToDB(JobEnqueueRequest{
134+
Type: job.Type(),
135+
Payload: params,
136+
})
137+
if err != nil {
138+
return fmt.Errorf("failed to save job to DB: %w", err)
139+
}
140+
141+
return nil
142+
}
143+
144+
func SaveJobToDB(req JobEnqueueRequest) (*models.Job, error) {
145+
payloadJSON, err := json.Marshal(req.Payload)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
now := time.Now()
151+
152+
job := models.Job{
153+
Type: req.Type,
154+
Payload: payloadJSON,
155+
State: models.JobPending,
156+
Attempts: 0,
157+
AvailableAt: now,
158+
CreatedAt: now,
159+
}
160+
161+
if err := db.Connect.Create(&job).Error; err != nil {
162+
return nil, err
163+
}
164+
return &job, nil
165+
}

scripts/generate_dto.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ STRUCT_NAME=$(to_struct_name "$DTO_NAME")
2222
DTO_DIR="./pkg/dto"
2323
DTO_FILE="$DTO_DIR/${DTO_NAME}.go"
2424

25+
GO_MOD_FILE="./go.mod"
26+
MODULE_NAME=$(grep -o '^module .*' "$GO_MOD_FILE" | cut -d ' ' -f 2)
27+
2528
# Create the models directory if it doesn't exist
2629
mkdir -p $DTO_DIR
2730

@@ -31,7 +34,7 @@ package dto
3134
3235
import (
3336
"github.com/gofiber/fiber/v2"
34-
"github.com/sheenazien8/goplate/pkg/utils"
37+
"$MODULE_NAME/pkg/utils"
3538
)
3639
3740
type $STRUCT_NAME struct {

0 commit comments

Comments
 (0)