Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
89cdd8e
Back-end for runner first build
collincunn Feb 26, 2026
d287b12
Fixed assorted bugs and tests
collincunn Feb 26, 2026
eeb863a
Added async for long polling
collincunn Feb 26, 2026
8e0e93f
Added more info to agents
collincunn Feb 26, 2026
e617f6d
Fixed baz comments
collincunn Feb 26, 2026
8bf8865
Timeout underspecified
collincunn Feb 26, 2026
f2c38cb
minor validation feedback
collincunn Feb 26, 2026
725b2fc
Addressed comments
collincunn Feb 27, 2026
6873c55
more pr comments
collincunn Feb 27, 2026
71191d8
Merge remote-tracking branch 'origin/main' into collinc/local-runner-…
collincunn Mar 3, 2026
91853e2
Addressed PR comments; added integration test
collincunn Mar 3, 2026
ec449e5
Extracted helper function
collincunn Mar 3, 2026
4c081e8
Merge branch 'main' into collinc/local-runner-impl
collincunn Mar 3, 2026
a519c8c
PR feedback
collincunn Mar 3, 2026
cd5514e
Merge branch 'collinc/local-runner-impl' of github.com:comet-ml/opik …
collincunn Mar 3, 2026
16ed26e
missed two comments
collincunn Mar 3, 2026
0b4b4c5
Merge branch 'main' into collinc/local-runner-impl
collincunn Mar 3, 2026
4ddac68
regressions
collincunn Mar 3, 2026
e681305
Merge branch 'collinc/local-runner-impl' of github.com:comet-ml/opik …
collincunn Mar 3, 2026
49da026
more regressions
collincunn Mar 3, 2026
1e74333
more regressions
collincunn Mar 3, 2026
1cc7016
improved logging
collincunn Mar 3, 2026
fbae570
PR comments
collincunn Mar 4, 2026
b89aee8
Merge branch 'main' into collinc/local-runner-impl
collincunn Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,19 @@ serviceToggles:
# Default: false
# Description: Whether or not Collaborators tab feature is enabled
collaboratorsTabEnabled: ${TOGGLE_COLLABORATORS_TAB_ENABLED:-"false"}
# Default: true
# Description: Whether or not Local Runner feature is enabled
runnersEnabled: ${TOGGLE_RUNNERS_ENABLED:-"true"}

# Local Runner configuration
runner:
enabled: ${OPIK_RUNNER_ENABLED:-false}
jobTimeoutSeconds: ${OPIK_RUNNER_JOB_TIMEOUT:-1800}
maxPendingJobsPerRunner: ${OPIK_RUNNER_MAX_PENDING:-100}
heartbeatTtlSeconds: ${OPIK_RUNNER_HEARTBEAT_TTL:-15}
nextJobPollTimeoutSeconds: ${OPIK_RUNNER_NEXT_JOB_POLL_TIMEOUT:-10}
deadRunnerPurgeHours: ${OPIK_RUNNER_DEAD_PURGE_HOURS:-24}
completedJobTtlDays: ${OPIK_RUNNER_COMPLETED_JOB_TTL_DAYS:-7}

# Trace Thread configuration
traceThreadConfig:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.comet.opik.api.resources.v1.jobs;

import com.comet.opik.domain.RunnerService;
import com.comet.opik.infrastructure.RunnerConfig;
import com.comet.opik.infrastructure.lock.LockService;
import io.dropwizard.jobs.Job;
import io.dropwizard.jobs.annotations.Every;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import reactor.core.publisher.Mono;

import java.time.Duration;

import static com.comet.opik.infrastructure.lock.LockService.Lock;

@Slf4j
@Singleton
@DisallowConcurrentExecution
@Every("60s")
@RequiredArgsConstructor(onConstructor_ = @Inject)
public class RunnerReaperJob extends Job {

private static final Lock REAPER_LOCK = new Lock("runner-reaper");

private final @NonNull RunnerService runnerService;
private final @NonNull LockService lockService;
private final @NonNull RunnerConfig runnerConfig;

@Override
public void doJob(JobExecutionContext context) {
if (!runnerConfig.isEnabled()) {
return;
}

lockService.bestEffortLock(
REAPER_LOCK,
Mono.fromRunnable(() -> runnerService.reapDeadRunners()),
Mono.fromRunnable(() -> log.debug("Could not acquire reaper lock, skipping")),
Duration.ofSeconds(55),
Duration.ofSeconds(5))
.subscribe(
__ -> log.debug("Runner reaper completed"),
error -> log.error("Runner reaper failed", error));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package com.comet.opik.api.resources.v1.priv;

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.runner.ConnectRequest;
import com.comet.opik.api.runner.ConnectResponse;
import com.comet.opik.api.runner.CreateJobRequest;
import com.comet.opik.api.runner.HeartbeatResponse;
import com.comet.opik.api.runner.JobResultRequest;
import com.comet.opik.api.runner.LogEntry;
import com.comet.opik.api.runner.PairResponse;
import com.comet.opik.api.runner.Runner;
import com.comet.opik.api.runner.RunnerJob;
import com.comet.opik.domain.RunnerService;
import com.comet.opik.infrastructure.RunnerConfig;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.ratelimit.RateLimited;
import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;

@Path("/v1/private/runners")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Timed
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Tag(name = "Runners", description = "Local runner management endpoints")
public class RunnersResource {

private final @NonNull Provider<RequestContext> requestContext;
private final @NonNull RunnerService runnerService;
private final @NonNull RunnerConfig runnerConfig;

@POST
@Path("/pair")
@Operation(operationId = "generatePairingCode", summary = "Generate a pairing code for the current workspace")
public Response generatePairingCode() {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
PairResponse response = runnerService.generatePairingCode(workspaceId, userName);
return Response.ok(response).build();
}

@POST
@Path("/connect")
@Operation(operationId = "connectRunner", summary = "Exchange a pairing code or API key for runner credentials")
public Response connect(@NotNull @Valid ConnectRequest request) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
ConnectResponse response = runnerService.connect(workspaceId, userName, request);
return Response.ok(response).build();
}

@GET
@Operation(operationId = "listRunners", summary = "List all runners in the current workspace")
public Response listRunners() {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
List<Runner> runners = runnerService.listRunners(workspaceId);
return Response.ok(runners).build();
}

@GET
@Path("/{runnerId}")
@Operation(operationId = "getRunner", summary = "Get a single runner with its registered agents")
public Response getRunner(@PathParam("runnerId") String runnerId) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
Runner runner = runnerService.getRunner(workspaceId, runnerId);
return Response.ok(runner).build();
}

@PUT
@Path("/{runnerId}/agents")
@RateLimited
@Operation(operationId = "registerAgents", summary = "Register or update the runner's agent list")
public Response registerAgents(@PathParam("runnerId") String runnerId,
@NotNull Map<String, JsonNode> agents) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
runnerService.registerAgents(runnerId, workspaceId, agents);
return Response.noContent().build();
}

@POST
@Path("/{runnerId}/heartbeat")
@RateLimited
@Operation(operationId = "heartbeat", summary = "Refresh runner heartbeat")
public Response heartbeat(@PathParam("runnerId") String runnerId) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
HeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId);
return Response.ok(response).build();
}

@POST
@Path("/jobs")
@Operation(operationId = "createJob", summary = "Create a job and enqueue it for execution")
@RateLimited
public Response createJob(@NotNull @Valid CreateJobRequest request) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
RunnerJob job = runnerService.createJob(workspaceId, userName, request);
return Response.status(Response.Status.CREATED).entity(job).build();
}

@GET
@Path("/{runnerId}/jobs")
@Operation(operationId = "listJobs", summary = "List jobs for a runner")
public Response listJobs(@PathParam("runnerId") String runnerId,
@QueryParam("project") String project,
@QueryParam("page") @DefaultValue("0") @Min(0) int page,
@QueryParam("size") @DefaultValue("25") @Min(1) int size) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
RunnerJob.RunnerJobPage jobPage = runnerService.listJobs(runnerId, project, workspaceId, page, size);
return Response.ok(jobPage).build();
}

@GET
@Path("/{runnerId}/jobs/next")
@Operation(operationId = "nextJob", summary = "Long-poll for the next pending job")
public void nextJob(@PathParam("runnerId") String runnerId,
@Suspended AsyncResponse asyncResponse) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
runnerService.nextJob(runnerId, workspaceId)
.thenAccept(job -> {
if (job == null) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(Response.ok(job).build());
}
})
.exceptionally(e -> {
asyncResponse.resume(e);
return null;
});
}

@GET
@Path("/jobs/{jobId}")
@Operation(operationId = "getJob", summary = "Get a single job's status and results")
public Response getJob(@PathParam("jobId") String jobId) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
RunnerJob job = runnerService.getJob(jobId, workspaceId);
return Response.ok(job).build();
}

@GET
@Path("/jobs/{jobId}/logs")
@Operation(operationId = "getJobLogs", summary = "Get log entries for a job")
public Response getJobLogs(@PathParam("jobId") String jobId,
@QueryParam("offset") @DefaultValue("0") @Min(0) int offset) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
List<LogEntry> logs = runnerService.getJobLogs(jobId, offset, workspaceId);
return Response.ok(logs).build();
}

@POST
@Path("/jobs/{jobId}/logs")
@RateLimited
@Operation(operationId = "appendJobLogs", summary = "Append log entries for a running job")
public Response appendLogs(@PathParam("jobId") String jobId,
@NotNull List<LogEntry> entries) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
runnerService.appendLogs(jobId, workspaceId, entries);
return Response.noContent().build();
}

@POST
@Path("/jobs/{jobId}/result")
@Operation(operationId = "reportJobResult", summary = "Report job completion or failure")
public Response reportResult(@PathParam("jobId") String jobId,
@NotNull @Valid JobResultRequest result) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
runnerService.reportResult(jobId, workspaceId, result);
return Response.noContent().build();
}

@POST
@Path("/jobs/{jobId}/cancel")
@Operation(operationId = "cancelJob", summary = "Cancel a pending or running job")
public Response cancelJob(@PathParam("jobId") String jobId) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
runnerService.cancelJob(jobId, workspaceId);
return Response.noContent().build();
}

private void ensureEnabled() {
if (!runnerConfig.isEnabled()) {
throw new NotFoundException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotBlank;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ConnectRequest(
String pairingCode,
@NotBlank String runnerName) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ConnectResponse(
String runnerId,
String workspaceId) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotBlank;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record CreateJobRequest(
@NotBlank String agentName,
JsonNode inputs,
String project,
String runnerId) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record HeartbeatResponse(
List<String> cancelledJobIds) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotBlank;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record JobResultRequest(
@NotBlank String status,
JsonNode result,
String error,
String traceId) {
}
Loading
Loading