Skip to content

Commit 7c6150a

Browse files
committed
Make stop operation async, add stop all operation
This makes the stop operation asynchronous. The CLI calls it in a blocking manner, the API will stop the container in the background and return a 202 as soon as it has validated that the named container exists. As part of this change, add a stop all operation as requested for the UI project. This logic is exposed both as a CLI command and API endpoint.
1 parent 84d07a2 commit 7c6150a

File tree

5 files changed

+154
-71
lines changed

5 files changed

+154
-71
lines changed

cmd/thv/app/stop.go

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/spf13/cobra"
8+
"golang.org/x/sync/errgroup"
89

910
"github.com/stacklok/toolhive/pkg/workloads"
1011
)
@@ -13,37 +14,82 @@ var stopCmd = &cobra.Command{
1314
Use: "stop [container-name]",
1415
Short: "Stop an MCP server",
1516
Long: `Stop a running MCP server managed by ToolHive.`,
16-
Args: cobra.ExactArgs(1),
17+
Args: validateStopArgs,
1718
RunE: stopCmdFunc,
1819
}
1920

2021
var (
2122
stopTimeout int
23+
stopAll bool
2224
)
2325

2426
func init() {
2527
stopCmd.Flags().IntVar(&stopTimeout, "timeout", 30, "Timeout in seconds before forcibly stopping the container")
28+
stopCmd.Flags().BoolVar(&stopAll, "all", false, "Stop all running MCP servers")
29+
}
30+
31+
// validateStopArgs validates the arguments for the stop command
32+
func validateStopArgs(cmd *cobra.Command, args []string) error {
33+
// Check if --all flag is set
34+
all, _ := cmd.Flags().GetBool("all")
35+
36+
if all {
37+
// If --all is set, no arguments should be provided
38+
if len(args) > 0 {
39+
return fmt.Errorf("no arguments should be provided when --all flag is set")
40+
}
41+
} else {
42+
// If --all is not set, exactly one argument should be provided
43+
if len(args) != 1 {
44+
return fmt.Errorf("exactly one container name must be provided")
45+
}
46+
}
47+
48+
return nil
2649
}
2750

2851
func stopCmdFunc(cmd *cobra.Command, args []string) error {
2952
ctx := cmd.Context()
30-
// Get container name
31-
containerName := args[0]
3253

3354
manager, err := workloads.NewManager(ctx)
3455
if err != nil {
3556
return fmt.Errorf("failed to create container manager: %v", err)
3657
}
3758

38-
err = manager.StopWorkload(ctx, containerName)
39-
if err != nil {
40-
// If the container is not found, treat as a non-fatal error.
41-
if errors.Is(err, workloads.ErrContainerNotFound) {
42-
fmt.Printf("Container %s is not running\n", containerName)
43-
} else {
44-
return fmt.Errorf("failed to stop container: %v", err)
59+
var group *errgroup.Group
60+
61+
// Check if --all flag is set
62+
if stopAll {
63+
// Stop all workloads
64+
group, err = manager.StopAllWorkloads(ctx)
65+
if err != nil {
66+
return fmt.Errorf("failed to stop all containers: %v", err)
67+
}
68+
69+
// Since the stop operation is asynchronous, wait for the group to finish.
70+
if err := group.Wait(); err != nil {
71+
return fmt.Errorf("failed to stop all containers: %v", err)
4572
}
73+
fmt.Println("All containers stopped successfully")
4674
} else {
75+
// Get container name
76+
containerName := args[0]
77+
78+
// Stop a single workload
79+
group, err = manager.StopWorkload(ctx, containerName)
80+
if err != nil {
81+
// If the container is not found or not running, treat as a non-fatal error.
82+
if errors.Is(err, workloads.ErrContainerNotFound) || errors.Is(err, workloads.ErrContainerNotRunning) {
83+
fmt.Printf("Container %s is not running\n", containerName)
84+
return nil
85+
}
86+
return fmt.Errorf("unexpected error stopping container: %v", err)
87+
}
88+
89+
// Since the stop operation is asynchronous, wait for the group to finish.
90+
if err := group.Wait(); err != nil {
91+
return fmt.Errorf("failed to stop container %s: %v", containerName, err)
92+
}
4793
fmt.Printf("Container %s stopped successfully\n", containerName)
4894
}
4995

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ require (
5454
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
5555
github.com/beorn7/perks v1.0.1 // indirect
5656
github.com/blang/semver v3.5.1+incompatible // indirect
57-
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
5857
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5958
github.com/containerd/errdefs/pkg v0.3.0 // indirect
6059
github.com/containerd/stargz-snapshotter/estargz v0.16.3 // indirect

go.sum

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ cloud.google.com/go/longrunning v0.6.6 h1:XJNDo5MUfMM05xK3ewpbSdmt7R2Zw+aQEMbdQR
1616
cloud.google.com/go/longrunning v0.6.6/go.mod h1:hyeGJUrPHcx0u2Uu1UFSoYZLn4lkMrccJig0t4FI7yw=
1717
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
1818
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
19-
github.com/1password/onepassword-sdk-go v0.3.0 h1:PC3J08hOH7xmt5QjpakhjZzx0XfbBb4SkBVEqgYYG54=
20-
github.com/1password/onepassword-sdk-go v0.3.0/go.mod h1:kssODrGGqHtniqPR91ZPoCMEo79mKulKat7RaD1bunk=
2119
github.com/1password/onepassword-sdk-go v0.3.1 h1:dz0LrYuIh/HrZ7rxr8NMymikNLBIXhyj4NBmo5Tdamc=
2220
github.com/1password/onepassword-sdk-go v0.3.1/go.mod h1:kssODrGGqHtniqPR91ZPoCMEo79mKulKat7RaD1bunk=
2321
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230919221257-8b5d3ce2d11d h1:zjqpY4C7H15HjRPEenkS4SAn3Jy2eRRjkjZbGR30TOg=
@@ -440,8 +438,6 @@ github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ
440438
github.com/luna-duclos/instrumentedsql v1.1.3/go.mod h1:9J1njvFds+zN7y85EDhN9XNQLANWwZt2ULeIC8yMNYs=
441439
github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
442440
github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU=
443-
github.com/mark3labs/mcp-go v0.31.0 h1:4UxSV8aM770OPmTvaVe/b1rA2oZAjBMhGBfUgOGut+4=
444-
github.com/mark3labs/mcp-go v0.31.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
445441
github.com/mark3labs/mcp-go v0.32.0 h1:fgwmbfL2gbd67obg57OfV2Dnrhs1HtSdlY/i5fn7MU8=
446442
github.com/mark3labs/mcp-go v0.32.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
447443
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
@@ -707,12 +703,8 @@ go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM
707703
go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI=
708704
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0 h1:gAU726w9J8fwr4qRDqu1GYMNNs4gXrU+Pv20/N1UpB4=
709705
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0/go.mod h1:RboSDkp7N292rgu+T0MgVt2qgFGu6qa1RpZDOtpL76w=
710-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw=
711-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
712706
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 h1:dNzwXjZKpMpE2JhmO+9HsPl42NIXFIFSUSSs0fiqra0=
713707
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0/go.mod h1:90PoxvaEB5n6AOdZvi+yWJQoE95U8Dhhw2bSyRqnTD0=
714-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
715-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
716708
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 h1:nRVXXvf78e00EwY6Wp0YII8ww2JVWshZ20HfTlE11AM=
717709
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0/go.mod h1:r49hO7CgrxY9Voaj3Xe8pANWtr0Oq916d0XAmOoCZAQ=
718710
go.opentelemetry.io/otel/exporters/prometheus v0.58.0 h1:CJAxWKFIqdBennqxJyOgnt5LqkeFRT+Mz3Yjz3hL+h8=

pkg/api/v1/workloads.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func WorkloadRouter(
4444
r := chi.NewRouter()
4545
r.Get("/", routes.listWorkloads)
4646
r.Post("/", routes.createWorkload)
47+
r.Post("/stop", routes.stopAllWorkloads)
4748
r.Get("/{name}", routes.getWorkload)
4849
r.Post("/{name}/stop", routes.stopWorkload)
4950
r.Post("/{name}/restart", routes.restartWorkload)
@@ -114,23 +115,50 @@ func (s *WorkloadRoutes) getWorkload(w http.ResponseWriter, r *http.Request) {
114115
// @Description Stop a running workload
115116
// @Tags workloads
116117
// @Param name path string true "Workload name"
118+
// @Success 202 {string} string "Accepted"
117119
// @Success 204 {string} string "No Content"
118120
// @Failure 404 {string} string "Not Found"
119121
// @Router /api/v1beta/workloads/{name}/stop [post]
120122
func (s *WorkloadRoutes) stopWorkload(w http.ResponseWriter, r *http.Request) {
121123
ctx := r.Context()
122124
name := chi.URLParam(r, "name")
123-
err := s.manager.StopWorkload(ctx, name)
125+
// Note that this is an asynchronous operation.
126+
// In the API, we do not wait for the operation to complete.
127+
_, err := s.manager.StopWorkload(ctx, name)
124128
if err != nil {
125129
if errors.Is(err, workloads.ErrContainerNotFound) {
126130
http.Error(w, "Workload not found", http.StatusNotFound)
127131
return
132+
} else if errors.Is(err, workloads.ErrContainerNotRunning) {
133+
// Treat this as a non-fatal error.
134+
w.WriteHeader(http.StatusNoContent)
135+
return
128136
}
129137
logger.Errorf("Failed to stop workload: %v", err)
130138
http.Error(w, "Failed to stop workload", http.StatusInternalServerError)
131139
return
132140
}
133-
w.WriteHeader(http.StatusNoContent)
141+
w.WriteHeader(http.StatusAccepted)
142+
}
143+
144+
// stopAllWorkloads
145+
//
146+
// @Summary Stop all workloads
147+
// @Description Stop all running workload
148+
// @Tags workloads
149+
// @Success 202 "Accepted"
150+
// @Router /api/v1beta/workloads/stop [post]
151+
func (s *WorkloadRoutes) stopAllWorkloads(w http.ResponseWriter, r *http.Request) {
152+
ctx := r.Context()
153+
// Note that this is an asynchronous operation.
154+
// In the API, we do not wait for the operation to complete.
155+
_, err := s.manager.StopAllWorkloads(ctx)
156+
if err != nil {
157+
logger.Errorf("Failed to stop workloads: %v", err)
158+
http.Error(w, "Failed to stop workloads", http.StatusInternalServerError)
159+
return
160+
}
161+
w.WriteHeader(http.StatusAccepted)
134162
}
135163

136164
// deleteWorkload

pkg/workloads/manager.go

Lines changed: 68 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os/exec"
1010

1111
"github.com/adrg/xdg"
12+
"golang.org/x/sync/errgroup"
1213

1314
"github.com/stacklok/toolhive/pkg/client"
1415
"github.com/stacklok/toolhive/pkg/config"
@@ -31,8 +32,12 @@ type Manager interface {
3132
// DeleteWorkload deletes a container and its associated proxy process.
3233
// The container will be stopped if it is still running.
3334
DeleteWorkload(ctx context.Context, name string) error
34-
// StopWorkload stops a container and its associated proxy process.
35-
StopWorkload(ctx context.Context, name string) error
35+
// StopWorkload stops the named workload.
36+
// It is implemented as an asynchronous operation which returns a errgroup.Group
37+
StopWorkload(ctx context.Context, name string) (*errgroup.Group, error)
38+
// StopAllWorkloads stops all running workloads.
39+
// It is implemented as an asynchronous operation which returns an errgroup.Group
40+
StopAllWorkloads(ctx context.Context) (*errgroup.Group, error)
3641
// RunWorkload runs a container in the foreground.
3742
RunWorkload(ctx context.Context, runConfig *runner.RunConfig) error
3843
// RunWorkloadDetached runs a container in the background.
@@ -158,44 +163,45 @@ func (d *defaultManager) DeleteWorkload(ctx context.Context, name string) error
158163
return nil
159164
}
160165

161-
func (d *defaultManager) StopWorkload(ctx context.Context, name string) error {
162-
// Find the container ID
163-
containerID, err := d.findContainerID(ctx, name)
166+
func (d *defaultManager) StopWorkload(ctx context.Context, name string) (*errgroup.Group, error) {
167+
// Find the container
168+
container, err := d.findContainerByName(ctx, name)
164169
if err != nil {
165-
return err
170+
return nil, err
166171
}
167172

168-
// Check if the container is running
169-
running, err := d.runtime.IsWorkloadRunning(ctx, containerID)
170-
if err != nil {
171-
return fmt.Errorf("failed to check if container is running: %v", err)
172-
}
173+
containerID := container.ID
174+
containerBaseName := labels.GetContainerBaseName(container.Labels)
175+
running := isContainerRunning(container)
173176

174177
if !running {
175-
return fmt.Errorf("%w: %s", ErrContainerNotRunning, name)
178+
return nil, fmt.Errorf("%w: %s", ErrContainerNotRunning, name)
176179
}
177180

178-
// Get the base container name
179-
containerBaseName, _ := d.getContainerBaseName(ctx, containerID)
180-
181-
// Stop the proxy process
182-
proxy.StopProcess(containerBaseName)
181+
workload := stopWorkloadRequest{Name: containerBaseName, ID: containerID}
182+
// Do the actual stop operation in the background, and return an error group.
183+
return d.stopWorkloads(ctx, []stopWorkloadRequest{workload}), nil
184+
}
183185

184-
// Stop the container
185-
err = d.stopContainer(ctx, containerID, name)
186+
func (d *defaultManager) StopAllWorkloads(ctx context.Context) (*errgroup.Group, error) {
187+
// Get list of all running workloads.
188+
containers, err := d.runtime.ListWorkloads(ctx)
186189
if err != nil {
187-
return err
190+
return nil, fmt.Errorf("failed to list containers: %v", err)
188191
}
189192

190-
if shouldRemoveClientConfig() {
191-
if err := removeClientConfigurations(name); err != nil {
192-
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
193-
} else {
194-
logger.Infof("Client configurations for %s removed", name)
193+
// Duplicates the logic of GetWorkloads, but is simple enough that it's not
194+
// worth duplicating.
195+
stopRequests := make([]stopWorkloadRequest, 0, len(containers))
196+
for _, c := range containers {
197+
// If the caller did not set `listAll` to true, only include running containers.
198+
if labels.IsToolHiveContainer(c.Labels) && isContainerRunning(&c) {
199+
req := stopWorkloadRequest{Name: labels.GetContainerBaseName(c.Labels), ID: c.ID}
200+
stopRequests = append(stopRequests, req)
195201
}
196202
}
197203

198-
return nil
204+
return d.stopWorkloads(ctx, stopRequests), nil
199205
}
200206

201207
func (*defaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunConfig) error {
@@ -449,14 +455,6 @@ func (d *defaultManager) RestartWorkload(ctx context.Context, name string) error
449455
return d.RunWorkloadDetached(mcpRunner.Config)
450456
}
451457

452-
func (d *defaultManager) findContainerID(ctx context.Context, name string) (string, error) {
453-
c, err := d.findContainerByName(ctx, name)
454-
if err != nil {
455-
return "", err
456-
}
457-
return c.ID, nil
458-
}
459-
460458
func (d *defaultManager) findContainerByName(ctx context.Context, name string) (*rt.ContainerInfo, error) {
461459
// List containers to find the one with the given name
462460
containers, err := d.runtime.ListWorkloads(ctx)
@@ -486,22 +484,6 @@ func (d *defaultManager) findContainerByName(ctx context.Context, name string) (
486484
return nil, fmt.Errorf("%w: %s", ErrContainerNotFound, name)
487485
}
488486

489-
// getContainerBaseName gets the base container name from the container labels
490-
func (d *defaultManager) getContainerBaseName(ctx context.Context, containerID string) (string, error) {
491-
containers, err := d.runtime.ListWorkloads(ctx)
492-
if err != nil {
493-
return "", fmt.Errorf("failed to list containers: %v", err)
494-
}
495-
496-
for _, c := range containers {
497-
if c.ID == containerID {
498-
return labels.GetContainerBaseName(c.Labels), nil
499-
}
500-
}
501-
502-
return "", fmt.Errorf("container %s not found", containerID)
503-
}
504-
505487
// stopContainer stops the container
506488
func (d *defaultManager) stopContainer(ctx context.Context, containerID, containerName string) error {
507489
logger.Infof("Stopping container %s...", containerName)
@@ -594,3 +576,39 @@ func (*defaultManager) cleanupTempPermissionProfile(ctx context.Context, baseNam
594576

595577
return nil
596578
}
579+
580+
// Internal type used when stopping workloads.
581+
type stopWorkloadRequest struct {
582+
Name string
583+
ID string
584+
}
585+
586+
// stopWorkloads stops the named workloads concurrently.
587+
// It assumes that the workloads exist in the running state.
588+
func (d *defaultManager) stopWorkloads(ctx context.Context, workloads []stopWorkloadRequest) *errgroup.Group {
589+
group := errgroup.Group{}
590+
for _, workload := range workloads {
591+
group.Go(func() error {
592+
// Stop the proxy process
593+
proxy.StopProcess(workload.Name)
594+
595+
// Stop the container
596+
err := d.stopContainer(ctx, workload.ID, workload.Name)
597+
if err != nil {
598+
return err
599+
}
600+
601+
if shouldRemoveClientConfig() {
602+
if err := removeClientConfigurations(workload.Name); err != nil {
603+
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
604+
} else {
605+
logger.Infof("Client configurations for %s removed", workload.Name)
606+
}
607+
}
608+
609+
return nil
610+
})
611+
}
612+
613+
return &group
614+
}

0 commit comments

Comments
 (0)