Files
stream.api/internal/workflow/agent/docker.go
2026-04-02 11:01:30 +00:00

155 lines
4.0 KiB
Go

package agent
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/client"
)
type DockerExecutor struct {
cli *client.Client
}
func NewDockerExecutor() (*DockerExecutor, error) {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
}
return &DockerExecutor{cli: cli}, nil
}
func (d *DockerExecutor) Run(ctx context.Context, imageName string, commands []string, env map[string]string, logCallback func(string)) error {
reader, err := d.cli.ImagePull(ctx, imageName, image.PullOptions{})
if err != nil {
log.Printf("Warning: Failed to pull image %s (might exist locally): %v", imageName, err)
} else {
_, _ = io.Copy(io.Discard, reader)
_ = reader.Close()
}
envSlice := make([]string, 0, len(env))
for k, v := range env {
envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v))
}
script := strings.Join(commands, " && ")
if len(commands) == 0 {
script = "echo 'No commands to execute'"
}
cmd := []string{"/bin/sh", "-c", script}
resp, err := d.cli.ContainerCreate(ctx, &container.Config{
Image: imageName,
Entrypoint: cmd,
Cmd: nil,
Env: envSlice,
Tty: true,
}, nil, nil, nil, "")
if err != nil {
return fmt.Errorf("failed to create container: %w", err)
}
containerID := resp.ID
defer d.cleanup(context.Background(), containerID)
if err := d.cli.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
return fmt.Errorf("failed to start container: %w", err)
}
var wg sync.WaitGroup
out, err := d.cli.ContainerLogs(ctx, containerID, container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
log.Printf("Failed to get logs: %v", err)
} else {
wg.Add(1)
go func() {
defer wg.Done()
defer out.Close()
scanner := bufio.NewScanner(out)
for scanner.Scan() {
logCallback(scanner.Text())
}
}()
}
statusCh, errCh := d.cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
return fmt.Errorf("error waiting for container: %w", err)
}
case status := <-statusCh:
if status.StatusCode != 0 {
wg.Wait()
return fmt.Errorf("container exited with code %d", status.StatusCode)
}
}
wg.Wait()
return nil
}
func (d *DockerExecutor) cleanup(ctx context.Context, containerID string) {
_ = d.cli.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true})
}
func (d *DockerExecutor) SelfUpdate(ctx context.Context, imageTag string, agentID string) error {
log.Printf("Initiating self-update using Watchtower...")
containerID, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to get hostname (container ID): %w", err)
}
log.Printf("Current Container ID: %s", containerID)
watchtowerImage := "containrrr/watchtower:latest"
reader, err := d.cli.ImagePull(ctx, watchtowerImage, image.PullOptions{})
if err != nil {
log.Printf("Failed to pull watchtower: %v", err)
return fmt.Errorf("failed to pull watchtower: %w", err)
}
_, _ = io.Copy(io.Discard, reader)
_ = reader.Close()
hostSock := os.Getenv("HOST_DOCKER_SOCK")
if hostSock == "" {
hostSock = "/var/run/docker.sock"
}
cmd := []string{"/watchtower", "--run-once", "--cleanup", "--debug", containerID}
resp, err := d.cli.ContainerCreate(ctx, &container.Config{
Image: watchtowerImage,
Cmd: cmd,
}, &container.HostConfig{
Binds: []string{fmt.Sprintf("%s:/var/run/docker.sock", hostSock)},
}, nil, nil, fmt.Sprintf("watchtower-updater-%s-%d", agentID, time.Now().Unix()))
if err != nil {
return fmt.Errorf("failed to create watchtower container: %w", err)
}
if err := d.cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
return fmt.Errorf("failed to start watchtower: %w", err)
}
log.Printf("Watchtower started with ID: %s. Monitoring...", resp.ID)
time.Sleep(2 * time.Second)
return nil
}