@@ -39,7 +39,7 @@ func Run(c smoothCfg.TesterCommand) {
39
39
40
40
var wg sync.WaitGroup
41
41
42
- var ch chan int
42
+ ch := make ( chan int )
43
43
44
44
queueUrl := createQueue (sqsClient )
45
45
@@ -71,6 +71,10 @@ func Run(c smoothCfg.TesterCommand) {
71
71
pct = float64 (receivedMessages ) / float64 (sentMessages )
72
72
}
73
73
log .Info ().Msg (fmt .Sprintf ("sent: %d, received: %d, pct: %f" , sentMessages , receivedMessages , pct ))
74
+ if (sentMessages > 0 && sentMessages == receivedMessages ) {
75
+ close (ch )
76
+ return
77
+ }
74
78
time .Sleep (1 * time .Second )
75
79
}
76
80
}()
@@ -79,12 +83,15 @@ func Run(c smoothCfg.TesterCommand) {
79
83
80
84
if c .Senders > 0 {
81
85
log .Info ().Msg ("All messages sent" )
82
- if c .Receivers == 0 {
83
- os .Exit (0 )
84
- }
85
86
}
86
87
87
88
<- ch
89
+
90
+ // Test changing message visibility timeout
91
+ queueUrl2 := createQueue (sqsClient )
92
+ log .Info ().Msgf ("queue2: %s" , queueUrl )
93
+ testChangeMessageVisibility (sqsClient , queueUrl2 )
94
+ os .Exit (0 )
88
95
}
89
96
90
97
func GenerateRandomString (n int ) string {
@@ -166,6 +173,47 @@ func sendMessage(client *sqs.Client, queueUrl string, goroutineID, requestID, ba
166
173
// time.Sleep(100 * time.Millisecond)
167
174
}
168
175
176
+ func testChangeMessageVisibility (client * sqs.Client , queueUrl string ) {
177
+ log .Info ().Msg ("Testing ChangeMessageVisibility" )
178
+ // Send a message
179
+ messageBody := "Test message for visibility timeout"
180
+ sendMessageInput := & sqs.SendMessageInput {
181
+ QueueUrl : & queueUrl ,
182
+ MessageBody : & messageBody ,
183
+ }
184
+ sendMessageOutput , err := client .SendMessage (context .TODO (), sendMessageInput )
185
+ if err != nil {
186
+ log .Fatal ().Msgf ("failed to send message, %v" , err )
187
+ }
188
+
189
+ for {
190
+ recvMsg := & sqs.ReceiveMessageInput {
191
+ QueueUrl : aws .String (queueUrl ),
192
+ MaxNumberOfMessages : 1 ,
193
+ }
194
+ msgs , err := client .ReceiveMessage (context .TODO (), recvMsg )
195
+ if err != nil {
196
+ log .Error ().Err (err ).Send ()
197
+ }
198
+ if len (msgs .Messages ) == 1 {
199
+ break
200
+ }
201
+ }
202
+
203
+ // Change the visibility timeout
204
+ changeVisibilityInput := & sqs.ChangeMessageVisibilityInput {
205
+ QueueUrl : & queueUrl ,
206
+ ReceiptHandle : sendMessageOutput .MessageId ,
207
+ VisibilityTimeout : 60 ,
208
+ }
209
+ _ , err = client .ChangeMessageVisibility (context .TODO (), changeVisibilityInput )
210
+ if err != nil {
211
+ log .Fatal ().Msgf ("failed to change message visibility timeout, %v" , err )
212
+ }
213
+
214
+ log .Info ().Msg ("Successfully changed message visibility timeout" )
215
+ }
216
+
169
217
func receiveMessage (client * sqs.Client , queueUrl string , goroutineID int ) int {
170
218
i := & sqs.ReceiveMessageInput {
171
219
QueueUrl : aws .String (queueUrl ),
0 commit comments