package agent import ( "bufio" "context" "fmt" "io" "log" "os" "strings" "sync" "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" "github.com/docker/docker/client" ) type DockerExecutor struct { cli *client.Client } func NewDockerExecutor() (*DockerExecutor, error) { cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { return nil, err } return &DockerExecutor{cli: cli}, nil } func (d *DockerExecutor) Run(ctx context.Context, imageName string, commands []string, env map[string]string, logCallback func(string)) error { reader, err := d.cli.ImagePull(ctx, imageName, image.PullOptions{}) if err != nil { log.Printf("Warning: Failed to pull image %s (might exist locally): %v", imageName, err) } else { _, _ = io.Copy(io.Discard, reader) _ = reader.Close() } envSlice := make([]string, 0, len(env)) for k, v := range env { envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v)) } script := strings.Join(commands, " && ") if len(commands) == 0 { script = "echo 'No commands to execute'" } cmd := []string{"/bin/sh", "-c", script} resp, err := d.cli.ContainerCreate(ctx, &container.Config{ Image: imageName, Entrypoint: cmd, Cmd: nil, Env: envSlice, Tty: true, }, nil, nil, nil, "") if err != nil { return fmt.Errorf("failed to create container: %w", err) } containerID := resp.ID defer d.cleanup(context.Background(), containerID) if err := d.cli.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil { return fmt.Errorf("failed to start container: %w", err) } var wg sync.WaitGroup out, err := d.cli.ContainerLogs(ctx, containerID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, Follow: true, }) if err != nil { log.Printf("Failed to get logs: %v", err) } else { wg.Add(1) go func() { defer wg.Done() defer out.Close() scanner := bufio.NewScanner(out) for scanner.Scan() { logCallback(scanner.Text()) } }() } statusCh, errCh := d.cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) select { case err := <-errCh: if err != nil { return fmt.Errorf("error waiting for container: %w", err) } case status := <-statusCh: if status.StatusCode != 0 { wg.Wait() return fmt.Errorf("container exited with code %d", status.StatusCode) } } wg.Wait() return nil } func (d *DockerExecutor) cleanup(ctx context.Context, containerID string) { _ = d.cli.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}) } func (d *DockerExecutor) SelfUpdate(ctx context.Context, imageTag string, agentID string) error { log.Printf("Initiating self-update using Watchtower...") containerID, err := os.Hostname() if err != nil { return fmt.Errorf("failed to get hostname (container ID): %w", err) } log.Printf("Current Container ID: %s", containerID) watchtowerImage := "containrrr/watchtower:latest" reader, err := d.cli.ImagePull(ctx, watchtowerImage, image.PullOptions{}) if err != nil { log.Printf("Failed to pull watchtower: %v", err) return fmt.Errorf("failed to pull watchtower: %w", err) } _, _ = io.Copy(io.Discard, reader) _ = reader.Close() hostSock := os.Getenv("HOST_DOCKER_SOCK") if hostSock == "" { hostSock = "/var/run/docker.sock" } cmd := []string{"/watchtower", "--run-once", "--cleanup", "--debug", containerID} resp, err := d.cli.ContainerCreate(ctx, &container.Config{ Image: watchtowerImage, Cmd: cmd, }, &container.HostConfig{ Binds: []string{fmt.Sprintf("%s:/var/run/docker.sock", hostSock)}, }, nil, nil, fmt.Sprintf("watchtower-updater-%s-%d", agentID, time.Now().Unix())) if err != nil { return fmt.Errorf("failed to create watchtower container: %w", err) } if err := d.cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { return fmt.Errorf("failed to start watchtower: %w", err) } log.Printf("Watchtower started with ID: %s. Monitoring...", resp.ID) time.Sleep(2 * time.Second) return nil }