package service import ( "context" "encoding/base64" "encoding/json" "errors" "fmt" "regexp" "strconv" "strings" "time" "gorm.io/gorm" redisadapter "stream.api/internal/adapters/redis" "stream.api/internal/database/model" "stream.api/internal/dto" "stream.api/internal/repository" "stream.api/pkg/logger" ) type JobQueue interface { Enqueue(ctx context.Context, job *model.Job) error Dequeue(ctx context.Context) (*model.Job, error) } type LogPubSub interface { Publish(ctx context.Context, jobID string, logLine string, progress float64) error PublishResource(ctx context.Context, agentID string, data []byte) error PublishCancel(ctx context.Context, agentID string, jobID string) error PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error Subscribe(ctx context.Context, jobID string) (<-chan dto.LogEntry, error) SubscribeResources(ctx context.Context) (<-chan dto.SystemResource, error) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) } type DeadLetterQueue interface { Add(ctx context.Context, job *model.Job, reason string) error Get(ctx context.Context, jobID string) (*redisadapter.DLQEntry, error) List(ctx context.Context, offset, limit int64) ([]*redisadapter.DLQEntry, error) Count(ctx context.Context) (int64, error) Remove(ctx context.Context, jobID string) error Retry(ctx context.Context, jobID string) (*model.Job, error) } type JobService struct { queue JobQueue pubsub LogPubSub dlq DeadLetterQueue jobRepository JobRepository logger logger.Logger } func NewJobService(db *gorm.DB, queue JobQueue, pubsub LogPubSub, dlq DeadLetterQueue) *JobService { return &JobService{ queue: queue, pubsub: pubsub, dlq: dlq, jobRepository: repository.NewJobRepository(db), } } func (s *JobService) SetLogger(l logger.Logger) { s.logger = l } var ErrInvalidJobCursor = errors.New("invalid job cursor") func strPtr(v string) *string { return &v } func int64Ptr(v int64) *int64 { return &v } func boolPtr(v bool) *bool { return &v } func float64Ptr(v float64) *float64 { return &v } func timePtr(v time.Time) *time.Time { return &v } func parseJobConfig(raw *string) dto.JobConfigEnvelope { if raw == nil || strings.TrimSpace(*raw) == "" { return dto.JobConfigEnvelope{} } var cfg dto.JobConfigEnvelope _ = json.Unmarshal([]byte(*raw), &cfg) return cfg } func encodeJobConfig(raw []byte, name, userID, videoID string, timeLimit int64) string { cfg := parseJobConfig(strPtr(string(raw))) if name != "" { cfg.Name = name } if userID != "" { cfg.UserID = userID } if videoID != "" { cfg.VideoID = videoID } if timeLimit > 0 { cfg.TimeLimit = timeLimit } encoded, _ := json.Marshal(cfg) return string(encoded) } func normalizeJobPageSize(pageSize int) int { if pageSize <= 0 { return dto.DefaultJobPageSize } if pageSize > dto.MaxJobPageSize { return dto.MaxJobPageSize } return pageSize } func encodeJobListCursor(cursor dto.JobListCursor) (string, error) { payload, err := json.Marshal(cursor) if err != nil { return "", err } return base64.RawURLEncoding.EncodeToString(payload), nil } func decodeJobListCursor(raw string) (*dto.JobListCursor, error) { raw = strings.TrimSpace(raw) if raw == "" { return nil, nil } payload, err := base64.RawURLEncoding.DecodeString(raw) if err != nil { return nil, ErrInvalidJobCursor } var cursor dto.JobListCursor if err := json.Unmarshal(payload, &cursor); err != nil { return nil, ErrInvalidJobCursor } if cursor.Version != dto.JobCursorVersion || cursor.CreatedAtUnixNano <= 0 || strings.TrimSpace(cursor.ID) == "" { return nil, ErrInvalidJobCursor } cursor.ID = strings.TrimSpace(cursor.ID) cursor.AgentID = strings.TrimSpace(cursor.AgentID) return &cursor, nil } func buildJobListCursor(job *model.Job, agentID string) (string, error) { if job == nil || job.CreatedAt == nil { return "", nil } return encodeJobListCursor(dto.JobListCursor{ Version: dto.JobCursorVersion, CreatedAtUnixNano: job.CreatedAt.UTC().UnixNano(), ID: strings.TrimSpace(job.ID), AgentID: strings.TrimSpace(agentID), }) } func listJobsByOffset(ctx context.Context, jobRepository JobRepository, agentID string, offset, limit int) (*dto.PaginatedJobs, error) { if offset < 0 { offset = 0 } limit = normalizeJobPageSize(limit) jobs, total, err := jobRepository.ListByOffset(ctx, agentID, offset, limit) if err != nil { return nil, err } return &dto.PaginatedJobs{Jobs: jobs, Total: total, Offset: offset, Limit: limit, PageSize: limit, HasMore: offset+len(jobs) < int(total)}, nil } func (s *JobService) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*model.Job, error) { status := string(dto.JobStatusPending) now := time.Now() job := &model.Job{ ID: fmt.Sprintf("job-%d", now.UnixNano()), Status: strPtr(status), Priority: int64Ptr(int64(priority)), Config: strPtr(encodeJobConfig(config, name, userID, videoID, timeLimit)), Cancelled: boolPtr(false), RetryCount: int64Ptr(0), MaxRetries: int64Ptr(3), CreatedAt: timePtr(now), UpdatedAt: timePtr(now), } if err := s.jobRepository.Create(ctx, job); err != nil { return nil, err } if err := syncVideoStatus(ctx, s.jobRepository, videoID, dto.JobStatusPending); err != nil { return nil, err } if s.queue == nil { return nil, errors.New("job queue is unavailable") } if err := s.removeFromQueue(ctx, job.ID); err != nil { return nil, err } if err := s.queue.Enqueue(ctx, job); err != nil { return nil, err } _ = s.publishJobUpdate(ctx, job.ID, status, videoID) return job, nil } func (s *JobService) ListJobs(ctx context.Context, offset, limit int) (*dto.PaginatedJobs, error) { return listJobsByOffset(ctx, s.jobRepository, "", offset, limit) } func (s *JobService) ListJobsByAgent(ctx context.Context, agentID string, offset, limit int) (*dto.PaginatedJobs, error) { return listJobsByOffset(ctx, s.jobRepository, strings.TrimSpace(agentID), offset, limit) } func (s *JobService) ListJobsByCursor(ctx context.Context, agentID string, cursor string, pageSize int) (*dto.PaginatedJobs, error) { agentID = strings.TrimSpace(agentID) pageSize = normalizeJobPageSize(pageSize) decodedCursor, err := decodeJobListCursor(cursor) if err != nil { return nil, err } if decodedCursor != nil && decodedCursor.AgentID != agentID { return nil, ErrInvalidJobCursor } var cursorTime time.Time if decodedCursor != nil { cursorTime = time.Unix(0, decodedCursor.CreatedAtUnixNano).UTC() } cursorID := "" if decodedCursor != nil { cursorID = decodedCursor.ID } jobs, hasMore, err := s.jobRepository.ListByCursor(ctx, agentID, cursorTime, cursorID, pageSize) if err != nil { return nil, err } nextCursor := "" if hasMore && len(jobs) > 0 { nextCursor, err = buildJobListCursor(jobs[len(jobs)-1], agentID) if err != nil { return nil, err } } return &dto.PaginatedJobs{ Jobs: jobs, Total: 0, Limit: pageSize, PageSize: pageSize, HasMore: hasMore, NextCursor: nextCursor, }, nil } func (s *JobService) GetJob(ctx context.Context, id string) (*model.Job, error) { job, err := s.jobRepository.GetByID(ctx, id) if err != nil { return nil, err } return job, nil } func (s *JobService) GetNextJob(ctx context.Context) (*model.Job, error) { if s.queue == nil { return nil, errors.New("job queue is unavailable") } for { job, err := s.queue.Dequeue(ctx) if err != nil || job == nil { return job, err } fresh, err := s.jobRepository.GetByID(ctx, job.ID) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { continue } return nil, err } if !s.canDispatchJob(fresh) { continue } return fresh, nil } } func (s *JobService) SubscribeSystemResources(ctx context.Context) (<-chan dto.SystemResource, error) { if s.pubsub == nil { return nil, errors.New("job pubsub is unavailable") } return s.pubsub.SubscribeResources(ctx) } func (s *JobService) SubscribeJobLogs(ctx context.Context, jobID string) (<-chan dto.LogEntry, error) { if s.pubsub == nil { return nil, errors.New("job pubsub is unavailable") } return s.pubsub.Subscribe(ctx, jobID) } func (s *JobService) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) { if s.pubsub == nil { return nil, errors.New("job pubsub is unavailable") } return s.pubsub.SubscribeCancel(ctx, agentID) } func (s *JobService) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) { if s.pubsub == nil { return nil, errors.New("job pubsub is unavailable") } return s.pubsub.SubscribeJobUpdates(ctx) } func (s *JobService) UpdateJobStatus(ctx context.Context, jobID string, status dto.JobStatus) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } currentStatus := s.jobStatus(job) if (currentStatus == dto.JobStatusCancelled || currentStatus == dto.JobStatusSuccess) && currentStatus != status { return nil } if currentStatus == status { return nil } now := time.Now() updated := false switch status { case dto.JobStatusRunning: updated, err = s.jobRepository.MarkJobStatusIfCurrent(ctx, jobID, []string{string(dto.JobStatusPending)}, string(status), now, false) case dto.JobStatusSuccess: updated, err = s.jobRepository.MarkJobStatusIfCurrent(ctx, jobID, []string{string(dto.JobStatusRunning)}, string(status), now, true) if err == nil && updated { err = s.removeFromQueue(ctx, jobID) } default: updated, err = s.jobRepository.MarkJobStatusIfCurrent(ctx, jobID, []string{string(currentStatus)}, string(status), now, status == dto.JobStatusFailure || status == dto.JobStatusCancelled) } if err != nil { return err } if !updated { return nil } cfg := parseJobConfig(job.Config) if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, status); err != nil { return err } return s.publishJobUpdate(ctx, jobID, string(status), cfg.VideoID) } func (s *JobService) AssignJob(ctx context.Context, jobID string, agentID string) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } if !s.canDispatchJob(job) { return fmt.Errorf("job %s is not dispatchable", jobID) } agentNumeric, err := strconv.ParseInt(agentID, 10, 64) if err != nil { return err } updated, err := s.jobRepository.AssignPendingJob(ctx, jobID, agentNumeric, time.Now()) if err != nil { return err } if !updated { return fmt.Errorf("job %s is not dispatchable", jobID) } cfg := parseJobConfig(job.Config) if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusRunning); err != nil { return err } return s.publishJobUpdate(ctx, jobID, string(dto.JobStatusRunning), cfg.VideoID) } func (s *JobService) CancelJob(ctx context.Context, jobID string) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return fmt.Errorf("job not found: %w", err) } updated, err := s.jobRepository.CancelJobIfActive(ctx, jobID, time.Now()) if err != nil { return err } if !updated { return fmt.Errorf("cannot cancel job with status %s", s.jobStatus(job)) } cfg := parseJobConfig(job.Config) if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusCancelled); err != nil { return err } _ = s.publishJobUpdate(ctx, jobID, string(dto.JobStatusCancelled), cfg.VideoID) if err := s.removeFromQueue(ctx, job.ID); err != nil { return err } if job.AgentID != nil && s.pubsub != nil { _ = s.pubsub.PublishCancel(ctx, strconv.FormatInt(*job.AgentID, 10), job.ID) } return s.publishLog(ctx, jobID, "[SYSTEM] Job cancelled by admin", -1) } func (s *JobService) RetryJob(ctx context.Context, jobID string) (*model.Job, error) { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return nil, fmt.Errorf("job not found: %w", err) } currentStatus := s.jobStatus(job) if currentStatus != dto.JobStatusFailure && currentStatus != dto.JobStatusCancelled { return nil, fmt.Errorf("cannot retry job with status %s", currentStatus) } if err := s.requeueJob(ctx, job, false); err != nil { return nil, err } if s.dlq != nil { _ = s.dlq.Remove(ctx, jobID) } return s.jobRepository.GetByID(ctx, jobID) } func (s *JobService) UpdateJobProgress(ctx context.Context, jobID string, progress float64) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } now := time.Now() job.Progress = float64Ptr(progress) job.UpdatedAt = &now if err := s.jobRepository.Save(ctx, job); err != nil { return err } return s.publishLog(ctx, jobID, "", progress) } func (s *JobService) ProcessLog(ctx context.Context, jobID string, logData []byte) error { line := string(logData) re := regexp.MustCompile(`out_time_us=(\d+)`) matches := re.FindStringSubmatch(line) var progress float64 if len(matches) > 1 { us, _ := strconv.ParseInt(matches[1], 10, 64) if us > 0 { progress = float64(us) / 1000000.0 } } job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } existingLogs := "" if job.Logs != nil { existingLogs = *job.Logs } newLog := line if !strings.HasSuffix(newLog, "\n") { newLog += "\n" } existingLogs += newLog if len(existingLogs) > 10*1024*1024 { existingLogs = existingLogs[len(existingLogs)-8*1024*1024:] } now := time.Now() job.Logs = &existingLogs if progress > 0 { job.Progress = float64Ptr(progress) } job.UpdatedAt = &now if err := s.jobRepository.Save(ctx, job); err != nil { return err } return s.publishLog(ctx, jobID, line, progress) } func syncVideoStatus(ctx context.Context, jobRepository JobRepository, videoID string, status dto.JobStatus) error { videoID = strings.TrimSpace(videoID) if videoID == "" { return nil } statusValue := "processing" processingStatus := "PROCESSING" switch status { case dto.JobStatusSuccess: statusValue = "ready" processingStatus = "READY" case dto.JobStatusFailure, dto.JobStatusCancelled: statusValue = "failed" processingStatus = "FAILED" } return jobRepository.UpdateVideoStatus(ctx, videoID, statusValue, processingStatus) } func (s *JobService) PublishSystemResources(ctx context.Context, agentID string, data []byte) error { if s.pubsub == nil { return errors.New("job pubsub is unavailable") } return s.pubsub.PublishResource(ctx, agentID, data) } func (s *JobService) StartInflightReclaimLoop(ctx context.Context, interval time.Duration, batchSize int64) { if interval <= 0 { interval = 30 * time.Second } if s.logger != nil { s.logger.Info("started inflight reclaim loop", "interval", interval.String(), "batch_size", batchSize) } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): if s.logger != nil { s.logger.Info("stopped inflight reclaim loop") } return case <-ticker.C: _ = s.reclaimExpiredOnce(ctx, batchSize) } } } func (s *JobService) reclaimExpiredOnce(ctx context.Context, batchSize int64) error { type expirable interface { ListExpiredInflight(ctx context.Context, now time.Time, limit int64) ([]string, error) } queue, ok := s.queue.(expirable) if !ok { return nil } startedAt := time.Now() jobIDs, err := queue.ListExpiredInflight(ctx, time.Now(), batchSize) if err != nil { if s.logger != nil { s.logger.Error("failed to list expired inflight jobs", "error", err) } return err } for _, jobID := range jobIDs { _ = s.handleExpiredInflightJob(ctx, jobID) } if s.logger != nil && len(jobIDs) > 0 { s.logger.Info("completed inflight reclaim batch", "expired_count", len(jobIDs), "duration_ms", time.Since(startedAt).Milliseconds()) } return nil } func (s *JobService) handleExpiredInflightJob(ctx context.Context, jobID string) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { if s.logger != nil { s.logger.Warn("failed to load expired inflight job", "job_id", jobID, "error", err) } return err } status := s.jobStatus(job) if s.logger != nil { s.logger.Warn("processing expired inflight job", "job_id", jobID, "current_status", status) } switch status { case dto.JobStatusRunning: return s.HandleJobFailure(ctx, jobID, "lease_expired") case dto.JobStatusPending, dto.JobStatusSuccess, dto.JobStatusFailure, dto.JobStatusCancelled: return s.removeFromQueue(ctx, jobID) default: return s.removeFromQueue(ctx, jobID) } } func (s *JobService) RenewJobLease(ctx context.Context, jobID string) error { type touchable interface { TouchInflight(ctx context.Context, jobID string, ttl time.Duration) error } queue, ok := s.queue.(touchable) if !ok { return nil } return queue.TouchInflight(ctx, jobID, 15*time.Minute) } func (s *JobService) HandleDispatchFailure(ctx context.Context, jobID string, reason string, retryable bool) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } if !retryable { return s.moveJobToDLQ(ctx, job, reason) } return s.requeueOrDLQ(ctx, job, reason) } func (s *JobService) HandleJobFailure(ctx context.Context, jobID string, reason string) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } return s.requeueOrDLQ(ctx, job, reason) } func (s *JobService) HandleAgentDisconnect(ctx context.Context, jobID string) error { job, err := s.jobRepository.GetByID(ctx, jobID) if err != nil { return err } return s.requeueOrDLQ(ctx, job, "agent_unregistered") } func (s *JobService) requeueOrDLQ(ctx context.Context, job *model.Job, reason string) error { if job == nil { return nil } willRetry := s.canAutoRetry(job) if s.logger != nil { s.logger.Warn("evaluating retry vs dlq", "job_id", job.ID, "reason", reason, "retry_count", s.retryCount(job), "max_retries", s.maxRetries(job), "will_retry", willRetry) } if !willRetry { return s.moveJobToDLQ(ctx, job, reason) } return s.requeueJob(ctx, job, true) } func (s *JobService) requeueJob(ctx context.Context, job *model.Job, incrementRetry bool) error { if job == nil { return nil } if s.queue == nil { return errors.New("job queue is unavailable") } pending := string(dto.JobStatusPending) cancelled := false progress := 0.0 now := time.Now() job.Status = &pending job.Cancelled = &cancelled job.Progress = &progress job.AgentID = nil job.UpdatedAt = &now if incrementRetry { job.RetryCount = int64Ptr(s.retryCount(job) + 1) job.Logs = appendSystemLog(job.Logs, fmt.Sprintf("[SYSTEM] Auto-retry scheduled at %s", now.UTC().Format(time.RFC3339))) } if err := s.jobRepository.Save(ctx, job); err != nil { return err } cfg := parseJobConfig(job.Config) if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusPending); err != nil { return err } if err := s.queue.Enqueue(ctx, job); err != nil { return err } if s.logger != nil { s.logger.Info("requeued job", "job_id", job.ID, "retry_count", s.retryCount(job), "max_retries", s.maxRetries(job), "action", "requeue") } return s.publishJobUpdate(ctx, job.ID, pending, cfg.VideoID) } func (s *JobService) moveJobToDLQ(ctx context.Context, job *model.Job, reason string) error { if job == nil { return nil } failure := string(dto.JobStatusFailure) now := time.Now() job.Status = &failure job.AgentID = nil job.UpdatedAt = &now job.Logs = appendSystemLog(job.Logs, fmt.Sprintf("[SYSTEM] Sent to DLQ: %s", strings.TrimSpace(reason))) if err := s.jobRepository.Save(ctx, job); err != nil { return err } cfg := parseJobConfig(job.Config) if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusFailure); err != nil { return err } if err := s.removeFromQueue(ctx, job.ID); err != nil { return err } if s.dlq != nil { if err := s.dlq.Add(ctx, job, strings.TrimSpace(reason)); err != nil { return err } if s.logger != nil { dlqCount, _ := s.dlq.Count(ctx) s.logger.Warn("moved job to dlq", "job_id", job.ID, "reason", strings.TrimSpace(reason), "retry_count", s.retryCount(job), "max_retries", s.maxRetries(job), "dlq_size", dlqCount) } } return s.publishJobUpdate(ctx, job.ID, failure, cfg.VideoID) } func (s *JobService) canDispatchJob(job *model.Job) bool { if job == nil { return false } if job.Cancelled != nil && *job.Cancelled { return false } return s.jobStatus(job) == dto.JobStatusPending } func (s *JobService) canAutoRetry(job *model.Job) bool { return s.retryCount(job) < s.maxRetries(job) } func (s *JobService) retryCount(job *model.Job) int64 { if job == nil || job.RetryCount == nil { return 0 } return *job.RetryCount } func (s *JobService) maxRetries(job *model.Job) int64 { if job == nil || job.MaxRetries == nil || *job.MaxRetries <= 0 { return 3 } return *job.MaxRetries } func (s *JobService) jobStatus(job *model.Job) dto.JobStatus { if job == nil || job.Status == nil { return dto.JobStatusPending } return dto.JobStatus(strings.TrimSpace(*job.Status)) } func (s *JobService) publishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error { if s.pubsub == nil { return nil } return s.pubsub.PublishJobUpdate(ctx, jobID, status, videoID) } func (s *JobService) publishLog(ctx context.Context, jobID string, logLine string, progress float64) error { if s.pubsub == nil { return nil } return s.pubsub.Publish(ctx, jobID, logLine, progress) } func (s *JobService) ListDLQ(ctx context.Context, offset, limit int) ([]*dto.DLQEntry, int64, error) { if s.dlq == nil { return []*dto.DLQEntry{}, 0, nil } if offset < 0 { offset = 0 } if limit <= 0 { limit = 20 } entries, err := s.dlq.List(ctx, int64(offset), int64(limit)) if err != nil { return nil, 0, err } count, err := s.dlq.Count(ctx) if err != nil { return nil, 0, err } items := make([]*dto.DLQEntry, 0, len(entries)) for _, entry := range entries { items = append(items, &dto.DLQEntry{Job: entry.Job, FailureTime: entry.FailureTime.Unix(), Reason: entry.Reason, RetryCount: entry.RetryCount}) } return items, count, nil } func (s *JobService) GetDLQ(ctx context.Context, id string) (*dto.DLQEntry, error) { if s.dlq == nil { return nil, fmt.Errorf("job not found in DLQ") } entry, err := s.dlq.Get(ctx, strings.TrimSpace(id)) if err != nil { return nil, err } return &dto.DLQEntry{Job: entry.Job, FailureTime: entry.FailureTime.Unix(), Reason: entry.Reason, RetryCount: entry.RetryCount}, nil } func (s *JobService) RetryDLQ(ctx context.Context, id string) (*model.Job, error) { if s.dlq == nil { return nil, fmt.Errorf("job not found in DLQ") } job, err := s.dlq.Retry(ctx, strings.TrimSpace(id)) if err != nil { return nil, err } if job == nil { return nil, fmt.Errorf("job not found in DLQ") } if err := s.requeueJob(ctx, job, false); err != nil { return nil, err } if s.logger != nil { count, _ := s.dlq.Count(ctx) s.logger.Info("retried job from dlq", "job_id", job.ID, "dlq_size", count) } return s.jobRepository.GetByID(ctx, job.ID) } func (s *JobService) RemoveDLQ(ctx context.Context, id string) error { if s.dlq == nil { return fmt.Errorf("job not found in DLQ") } jobID := strings.TrimSpace(id) if err := s.dlq.Remove(ctx, jobID); err != nil { return err } if s.logger != nil { count, _ := s.dlq.Count(ctx) s.logger.Info("removed job from dlq", "job_id", jobID, "dlq_size", count) } return nil } func (s *JobService) removeFromQueue(ctx context.Context, jobID string) error { type ackable interface { Ack(ctx context.Context, jobID string) error } if queue, ok := s.queue.(ackable); ok { return queue.Ack(ctx, jobID) } return nil } func appendSystemLog(logs *string, line string) *string { line = strings.TrimSpace(line) if line == "" { return logs } existing := "" if logs != nil { existing = *logs } if existing != "" && !strings.HasSuffix(existing, "\n") { existing += "\n" } existing += line + "\n" return &existing }