837 lines
24 KiB
Go
837 lines
24 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"gorm.io/gorm"
|
|
redisadapter "stream.api/internal/adapters/redis"
|
|
"stream.api/internal/database/model"
|
|
"stream.api/internal/dto"
|
|
"stream.api/internal/repository"
|
|
"stream.api/pkg/logger"
|
|
)
|
|
|
|
type JobQueue interface {
|
|
Enqueue(ctx context.Context, job *model.Job) error
|
|
Dequeue(ctx context.Context) (*model.Job, error)
|
|
}
|
|
|
|
type LogPubSub interface {
|
|
Publish(ctx context.Context, jobID string, logLine string, progress float64) error
|
|
PublishResource(ctx context.Context, agentID string, data []byte) error
|
|
PublishCancel(ctx context.Context, agentID string, jobID string) error
|
|
PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error
|
|
Subscribe(ctx context.Context, jobID string) (<-chan dto.LogEntry, error)
|
|
SubscribeResources(ctx context.Context) (<-chan dto.SystemResource, error)
|
|
SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error)
|
|
SubscribeJobUpdates(ctx context.Context) (<-chan string, error)
|
|
}
|
|
|
|
type DeadLetterQueue interface {
|
|
Add(ctx context.Context, job *model.Job, reason string) error
|
|
Get(ctx context.Context, jobID string) (*redisadapter.DLQEntry, error)
|
|
List(ctx context.Context, offset, limit int64) ([]*redisadapter.DLQEntry, error)
|
|
Count(ctx context.Context) (int64, error)
|
|
Remove(ctx context.Context, jobID string) error
|
|
Retry(ctx context.Context, jobID string) (*model.Job, error)
|
|
}
|
|
|
|
type JobService struct {
|
|
queue JobQueue
|
|
pubsub LogPubSub
|
|
dlq DeadLetterQueue
|
|
jobRepository JobRepository
|
|
logger logger.Logger
|
|
}
|
|
|
|
func NewJobService(db *gorm.DB, queue JobQueue, pubsub LogPubSub, dlq DeadLetterQueue) *JobService {
|
|
return &JobService{
|
|
queue: queue,
|
|
pubsub: pubsub,
|
|
dlq: dlq,
|
|
jobRepository: repository.NewJobRepository(db),
|
|
}
|
|
}
|
|
|
|
func (s *JobService) SetLogger(l logger.Logger) {
|
|
s.logger = l
|
|
}
|
|
|
|
var ErrInvalidJobCursor = errors.New("invalid job cursor")
|
|
|
|
func strPtr(v string) *string { return &v }
|
|
func int64Ptr(v int64) *int64 { return &v }
|
|
func boolPtr(v bool) *bool { return &v }
|
|
func float64Ptr(v float64) *float64 { return &v }
|
|
func timePtr(v time.Time) *time.Time { return &v }
|
|
|
|
func parseJobConfig(raw *string) dto.JobConfigEnvelope {
|
|
if raw == nil || strings.TrimSpace(*raw) == "" {
|
|
return dto.JobConfigEnvelope{}
|
|
}
|
|
var cfg dto.JobConfigEnvelope
|
|
_ = json.Unmarshal([]byte(*raw), &cfg)
|
|
return cfg
|
|
}
|
|
|
|
func encodeJobConfig(raw []byte, name, userID, videoID string, timeLimit int64) string {
|
|
cfg := parseJobConfig(strPtr(string(raw)))
|
|
if name != "" {
|
|
cfg.Name = name
|
|
}
|
|
if userID != "" {
|
|
cfg.UserID = userID
|
|
}
|
|
if videoID != "" {
|
|
cfg.VideoID = videoID
|
|
}
|
|
if timeLimit > 0 {
|
|
cfg.TimeLimit = timeLimit
|
|
}
|
|
encoded, _ := json.Marshal(cfg)
|
|
return string(encoded)
|
|
}
|
|
|
|
func normalizeJobPageSize(pageSize int) int {
|
|
if pageSize <= 0 {
|
|
return dto.DefaultJobPageSize
|
|
}
|
|
if pageSize > dto.MaxJobPageSize {
|
|
return dto.MaxJobPageSize
|
|
}
|
|
return pageSize
|
|
}
|
|
|
|
func encodeJobListCursor(cursor dto.JobListCursor) (string, error) {
|
|
payload, err := json.Marshal(cursor)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return base64.RawURLEncoding.EncodeToString(payload), nil
|
|
}
|
|
|
|
func decodeJobListCursor(raw string) (*dto.JobListCursor, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return nil, nil
|
|
}
|
|
payload, err := base64.RawURLEncoding.DecodeString(raw)
|
|
if err != nil {
|
|
return nil, ErrInvalidJobCursor
|
|
}
|
|
var cursor dto.JobListCursor
|
|
if err := json.Unmarshal(payload, &cursor); err != nil {
|
|
return nil, ErrInvalidJobCursor
|
|
}
|
|
if cursor.Version != dto.JobCursorVersion || cursor.CreatedAtUnixNano <= 0 || strings.TrimSpace(cursor.ID) == "" {
|
|
return nil, ErrInvalidJobCursor
|
|
}
|
|
cursor.ID = strings.TrimSpace(cursor.ID)
|
|
cursor.AgentID = strings.TrimSpace(cursor.AgentID)
|
|
return &cursor, nil
|
|
}
|
|
|
|
func buildJobListCursor(job *model.Job, agentID string) (string, error) {
|
|
if job == nil || job.CreatedAt == nil {
|
|
return "", nil
|
|
}
|
|
return encodeJobListCursor(dto.JobListCursor{
|
|
Version: dto.JobCursorVersion,
|
|
CreatedAtUnixNano: job.CreatedAt.UTC().UnixNano(),
|
|
ID: strings.TrimSpace(job.ID),
|
|
AgentID: strings.TrimSpace(agentID),
|
|
})
|
|
}
|
|
|
|
func listJobsByOffset(ctx context.Context, jobRepository JobRepository, agentID string, offset, limit int) (*dto.PaginatedJobs, error) {
|
|
if offset < 0 {
|
|
offset = 0
|
|
}
|
|
limit = normalizeJobPageSize(limit)
|
|
jobs, total, err := jobRepository.ListByOffset(ctx, agentID, offset, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &dto.PaginatedJobs{Jobs: jobs, Total: total, Offset: offset, Limit: limit, PageSize: limit, HasMore: offset+len(jobs) < int(total)}, nil
|
|
}
|
|
|
|
func (s *JobService) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*model.Job, error) {
|
|
status := string(dto.JobStatusPending)
|
|
now := time.Now()
|
|
job := &model.Job{
|
|
ID: fmt.Sprintf("job-%d", now.UnixNano()),
|
|
Status: strPtr(status),
|
|
Priority: int64Ptr(int64(priority)),
|
|
Config: strPtr(encodeJobConfig(config, name, userID, videoID, timeLimit)),
|
|
Cancelled: boolPtr(false),
|
|
RetryCount: int64Ptr(0),
|
|
MaxRetries: int64Ptr(3),
|
|
CreatedAt: timePtr(now),
|
|
UpdatedAt: timePtr(now),
|
|
}
|
|
if err := s.jobRepository.Create(ctx, job); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := syncVideoStatus(ctx, s.jobRepository, videoID, dto.JobStatusPending); err != nil {
|
|
return nil, err
|
|
}
|
|
if s.queue == nil {
|
|
return nil, errors.New("job queue is unavailable")
|
|
}
|
|
if err := s.removeFromQueue(ctx, job.ID); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := s.queue.Enqueue(ctx, job); err != nil {
|
|
return nil, err
|
|
}
|
|
_ = s.publishJobUpdate(ctx, job.ID, status, videoID)
|
|
return job, nil
|
|
}
|
|
|
|
func (s *JobService) ListJobs(ctx context.Context, offset, limit int) (*dto.PaginatedJobs, error) {
|
|
return listJobsByOffset(ctx, s.jobRepository, "", offset, limit)
|
|
}
|
|
|
|
func (s *JobService) ListJobsByAgent(ctx context.Context, agentID string, offset, limit int) (*dto.PaginatedJobs, error) {
|
|
return listJobsByOffset(ctx, s.jobRepository, strings.TrimSpace(agentID), offset, limit)
|
|
}
|
|
|
|
func (s *JobService) ListJobsByCursor(ctx context.Context, agentID string, cursor string, pageSize int) (*dto.PaginatedJobs, error) {
|
|
agentID = strings.TrimSpace(agentID)
|
|
pageSize = normalizeJobPageSize(pageSize)
|
|
|
|
decodedCursor, err := decodeJobListCursor(cursor)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if decodedCursor != nil && decodedCursor.AgentID != agentID {
|
|
return nil, ErrInvalidJobCursor
|
|
}
|
|
|
|
var cursorTime time.Time
|
|
if decodedCursor != nil {
|
|
cursorTime = time.Unix(0, decodedCursor.CreatedAtUnixNano).UTC()
|
|
}
|
|
cursorID := ""
|
|
if decodedCursor != nil {
|
|
cursorID = decodedCursor.ID
|
|
}
|
|
jobs, hasMore, err := s.jobRepository.ListByCursor(ctx, agentID, cursorTime, cursorID, pageSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nextCursor := ""
|
|
if hasMore && len(jobs) > 0 {
|
|
nextCursor, err = buildJobListCursor(jobs[len(jobs)-1], agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &dto.PaginatedJobs{
|
|
Jobs: jobs,
|
|
Total: 0,
|
|
Limit: pageSize,
|
|
PageSize: pageSize,
|
|
HasMore: hasMore,
|
|
NextCursor: nextCursor,
|
|
}, nil
|
|
}
|
|
|
|
func (s *JobService) GetJob(ctx context.Context, id string) (*model.Job, error) {
|
|
job, err := s.jobRepository.GetByID(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return job, nil
|
|
}
|
|
|
|
func (s *JobService) GetNextJob(ctx context.Context) (*model.Job, error) {
|
|
if s.queue == nil {
|
|
return nil, errors.New("job queue is unavailable")
|
|
}
|
|
|
|
for {
|
|
job, err := s.queue.Dequeue(ctx)
|
|
if err != nil || job == nil {
|
|
return job, err
|
|
}
|
|
|
|
fresh, err := s.jobRepository.GetByID(ctx, job.ID)
|
|
if err != nil {
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
if !s.canDispatchJob(fresh) {
|
|
continue
|
|
}
|
|
return fresh, nil
|
|
}
|
|
}
|
|
|
|
func (s *JobService) SubscribeSystemResources(ctx context.Context) (<-chan dto.SystemResource, error) {
|
|
if s.pubsub == nil {
|
|
return nil, errors.New("job pubsub is unavailable")
|
|
}
|
|
return s.pubsub.SubscribeResources(ctx)
|
|
}
|
|
|
|
func (s *JobService) SubscribeJobLogs(ctx context.Context, jobID string) (<-chan dto.LogEntry, error) {
|
|
if s.pubsub == nil {
|
|
return nil, errors.New("job pubsub is unavailable")
|
|
}
|
|
return s.pubsub.Subscribe(ctx, jobID)
|
|
}
|
|
|
|
func (s *JobService) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) {
|
|
if s.pubsub == nil {
|
|
return nil, errors.New("job pubsub is unavailable")
|
|
}
|
|
return s.pubsub.SubscribeCancel(ctx, agentID)
|
|
}
|
|
|
|
func (s *JobService) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) {
|
|
if s.pubsub == nil {
|
|
return nil, errors.New("job pubsub is unavailable")
|
|
}
|
|
return s.pubsub.SubscribeJobUpdates(ctx)
|
|
}
|
|
|
|
func (s *JobService) UpdateJobStatus(ctx context.Context, jobID string, status dto.JobStatus) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
currentStatus := s.jobStatus(job)
|
|
if (currentStatus == dto.JobStatusCancelled || currentStatus == dto.JobStatusSuccess) && currentStatus != status {
|
|
return nil
|
|
}
|
|
if currentStatus == status {
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
updated := false
|
|
switch status {
|
|
case dto.JobStatusRunning:
|
|
updated, err = s.jobRepository.MarkJobStatusIfCurrent(ctx, jobID, []string{string(dto.JobStatusPending)}, string(status), now, false)
|
|
case dto.JobStatusSuccess:
|
|
updated, err = s.jobRepository.MarkJobStatusIfCurrent(ctx, jobID, []string{string(dto.JobStatusRunning)}, string(status), now, true)
|
|
if err == nil && updated {
|
|
err = s.removeFromQueue(ctx, jobID)
|
|
}
|
|
default:
|
|
updated, err = s.jobRepository.MarkJobStatusIfCurrent(ctx, jobID, []string{string(currentStatus)}, string(status), now, status == dto.JobStatusFailure || status == dto.JobStatusCancelled)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !updated {
|
|
return nil
|
|
}
|
|
cfg := parseJobConfig(job.Config)
|
|
if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, status); err != nil {
|
|
return err
|
|
}
|
|
return s.publishJobUpdate(ctx, jobID, string(status), cfg.VideoID)
|
|
}
|
|
|
|
func (s *JobService) AssignJob(ctx context.Context, jobID string, agentID string) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !s.canDispatchJob(job) {
|
|
return fmt.Errorf("job %s is not dispatchable", jobID)
|
|
}
|
|
|
|
agentNumeric, err := strconv.ParseInt(agentID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
updated, err := s.jobRepository.AssignPendingJob(ctx, jobID, agentNumeric, time.Now())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !updated {
|
|
return fmt.Errorf("job %s is not dispatchable", jobID)
|
|
}
|
|
cfg := parseJobConfig(job.Config)
|
|
if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusRunning); err != nil {
|
|
return err
|
|
}
|
|
return s.publishJobUpdate(ctx, jobID, string(dto.JobStatusRunning), cfg.VideoID)
|
|
}
|
|
|
|
func (s *JobService) CancelJob(ctx context.Context, jobID string) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return fmt.Errorf("job not found: %w", err)
|
|
}
|
|
updated, err := s.jobRepository.CancelJobIfActive(ctx, jobID, time.Now())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !updated {
|
|
return fmt.Errorf("cannot cancel job with status %s", s.jobStatus(job))
|
|
}
|
|
cfg := parseJobConfig(job.Config)
|
|
if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusCancelled); err != nil {
|
|
return err
|
|
}
|
|
_ = s.publishJobUpdate(ctx, jobID, string(dto.JobStatusCancelled), cfg.VideoID)
|
|
if err := s.removeFromQueue(ctx, job.ID); err != nil {
|
|
return err
|
|
}
|
|
if job.AgentID != nil && s.pubsub != nil {
|
|
_ = s.pubsub.PublishCancel(ctx, strconv.FormatInt(*job.AgentID, 10), job.ID)
|
|
}
|
|
return s.publishLog(ctx, jobID, "[SYSTEM] Job cancelled by admin", -1)
|
|
}
|
|
|
|
func (s *JobService) RetryJob(ctx context.Context, jobID string) (*model.Job, error) {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("job not found: %w", err)
|
|
}
|
|
currentStatus := s.jobStatus(job)
|
|
if currentStatus != dto.JobStatusFailure && currentStatus != dto.JobStatusCancelled {
|
|
return nil, fmt.Errorf("cannot retry job with status %s", currentStatus)
|
|
}
|
|
if err := s.requeueJob(ctx, job, false); err != nil {
|
|
return nil, err
|
|
}
|
|
if s.dlq != nil {
|
|
_ = s.dlq.Remove(ctx, jobID)
|
|
}
|
|
return s.jobRepository.GetByID(ctx, jobID)
|
|
}
|
|
|
|
func (s *JobService) UpdateJobProgress(ctx context.Context, jobID string, progress float64) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
now := time.Now()
|
|
job.Progress = float64Ptr(progress)
|
|
job.UpdatedAt = &now
|
|
if err := s.jobRepository.Save(ctx, job); err != nil {
|
|
return err
|
|
}
|
|
return s.publishLog(ctx, jobID, "", progress)
|
|
}
|
|
|
|
func (s *JobService) ProcessLog(ctx context.Context, jobID string, logData []byte) error {
|
|
line := string(logData)
|
|
re := regexp.MustCompile(`out_time_us=(\d+)`)
|
|
matches := re.FindStringSubmatch(line)
|
|
var progress float64
|
|
if len(matches) > 1 {
|
|
us, _ := strconv.ParseInt(matches[1], 10, 64)
|
|
if us > 0 {
|
|
progress = float64(us) / 1000000.0
|
|
}
|
|
}
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
existingLogs := ""
|
|
if job.Logs != nil {
|
|
existingLogs = *job.Logs
|
|
}
|
|
newLog := line
|
|
if !strings.HasSuffix(newLog, "\n") {
|
|
newLog += "\n"
|
|
}
|
|
existingLogs += newLog
|
|
if len(existingLogs) > 10*1024*1024 {
|
|
existingLogs = existingLogs[len(existingLogs)-8*1024*1024:]
|
|
}
|
|
now := time.Now()
|
|
job.Logs = &existingLogs
|
|
if progress > 0 {
|
|
job.Progress = float64Ptr(progress)
|
|
}
|
|
job.UpdatedAt = &now
|
|
if err := s.jobRepository.Save(ctx, job); err != nil {
|
|
return err
|
|
}
|
|
return s.publishLog(ctx, jobID, line, progress)
|
|
}
|
|
|
|
func syncVideoStatus(ctx context.Context, jobRepository JobRepository, videoID string, status dto.JobStatus) error {
|
|
videoID = strings.TrimSpace(videoID)
|
|
if videoID == "" {
|
|
return nil
|
|
}
|
|
|
|
statusValue := "processing"
|
|
processingStatus := "PROCESSING"
|
|
switch status {
|
|
case dto.JobStatusSuccess:
|
|
statusValue = "ready"
|
|
processingStatus = "READY"
|
|
case dto.JobStatusFailure, dto.JobStatusCancelled:
|
|
statusValue = "failed"
|
|
processingStatus = "FAILED"
|
|
}
|
|
|
|
return jobRepository.UpdateVideoStatus(ctx, videoID, statusValue, processingStatus)
|
|
}
|
|
|
|
func (s *JobService) PublishSystemResources(ctx context.Context, agentID string, data []byte) error {
|
|
if s.pubsub == nil {
|
|
return errors.New("job pubsub is unavailable")
|
|
}
|
|
return s.pubsub.PublishResource(ctx, agentID, data)
|
|
}
|
|
|
|
func (s *JobService) StartInflightReclaimLoop(ctx context.Context, interval time.Duration, batchSize int64) {
|
|
if interval <= 0 {
|
|
interval = 30 * time.Second
|
|
}
|
|
if s.logger != nil {
|
|
s.logger.Info("started inflight reclaim loop", "interval", interval.String(), "batch_size", batchSize)
|
|
}
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
if s.logger != nil {
|
|
s.logger.Info("stopped inflight reclaim loop")
|
|
}
|
|
return
|
|
case <-ticker.C:
|
|
_ = s.reclaimExpiredOnce(ctx, batchSize)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *JobService) reclaimExpiredOnce(ctx context.Context, batchSize int64) error {
|
|
type expirable interface {
|
|
ListExpiredInflight(ctx context.Context, now time.Time, limit int64) ([]string, error)
|
|
}
|
|
queue, ok := s.queue.(expirable)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
startedAt := time.Now()
|
|
jobIDs, err := queue.ListExpiredInflight(ctx, time.Now(), batchSize)
|
|
if err != nil {
|
|
if s.logger != nil {
|
|
s.logger.Error("failed to list expired inflight jobs", "error", err)
|
|
}
|
|
return err
|
|
}
|
|
for _, jobID := range jobIDs {
|
|
_ = s.handleExpiredInflightJob(ctx, jobID)
|
|
}
|
|
if s.logger != nil && len(jobIDs) > 0 {
|
|
s.logger.Info("completed inflight reclaim batch", "expired_count", len(jobIDs), "duration_ms", time.Since(startedAt).Milliseconds())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *JobService) handleExpiredInflightJob(ctx context.Context, jobID string) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
if s.logger != nil {
|
|
s.logger.Warn("failed to load expired inflight job", "job_id", jobID, "error", err)
|
|
}
|
|
return err
|
|
}
|
|
status := s.jobStatus(job)
|
|
if s.logger != nil {
|
|
s.logger.Warn("processing expired inflight job", "job_id", jobID, "current_status", status)
|
|
}
|
|
switch status {
|
|
case dto.JobStatusRunning:
|
|
return s.HandleJobFailure(ctx, jobID, "lease_expired")
|
|
case dto.JobStatusPending, dto.JobStatusSuccess, dto.JobStatusFailure, dto.JobStatusCancelled:
|
|
return s.removeFromQueue(ctx, jobID)
|
|
default:
|
|
return s.removeFromQueue(ctx, jobID)
|
|
}
|
|
}
|
|
|
|
func (s *JobService) RenewJobLease(ctx context.Context, jobID string) error {
|
|
type touchable interface {
|
|
TouchInflight(ctx context.Context, jobID string, ttl time.Duration) error
|
|
}
|
|
queue, ok := s.queue.(touchable)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return queue.TouchInflight(ctx, jobID, 15*time.Minute)
|
|
}
|
|
|
|
func (s *JobService) HandleDispatchFailure(ctx context.Context, jobID string, reason string, retryable bool) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !retryable {
|
|
return s.moveJobToDLQ(ctx, job, reason)
|
|
}
|
|
return s.requeueOrDLQ(ctx, job, reason)
|
|
}
|
|
|
|
func (s *JobService) HandleJobFailure(ctx context.Context, jobID string, reason string) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.requeueOrDLQ(ctx, job, reason)
|
|
}
|
|
|
|
func (s *JobService) HandleAgentDisconnect(ctx context.Context, jobID string) error {
|
|
job, err := s.jobRepository.GetByID(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.requeueOrDLQ(ctx, job, "agent_unregistered")
|
|
}
|
|
|
|
func (s *JobService) requeueOrDLQ(ctx context.Context, job *model.Job, reason string) error {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
willRetry := s.canAutoRetry(job)
|
|
if s.logger != nil {
|
|
s.logger.Warn("evaluating retry vs dlq", "job_id", job.ID, "reason", reason, "retry_count", s.retryCount(job), "max_retries", s.maxRetries(job), "will_retry", willRetry)
|
|
}
|
|
if !willRetry {
|
|
return s.moveJobToDLQ(ctx, job, reason)
|
|
}
|
|
return s.requeueJob(ctx, job, true)
|
|
}
|
|
|
|
func (s *JobService) requeueJob(ctx context.Context, job *model.Job, incrementRetry bool) error {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
if s.queue == nil {
|
|
return errors.New("job queue is unavailable")
|
|
}
|
|
|
|
pending := string(dto.JobStatusPending)
|
|
cancelled := false
|
|
progress := 0.0
|
|
now := time.Now()
|
|
job.Status = &pending
|
|
job.Cancelled = &cancelled
|
|
job.Progress = &progress
|
|
job.AgentID = nil
|
|
job.UpdatedAt = &now
|
|
if incrementRetry {
|
|
job.RetryCount = int64Ptr(s.retryCount(job) + 1)
|
|
job.Logs = appendSystemLog(job.Logs, fmt.Sprintf("[SYSTEM] Auto-retry scheduled at %s", now.UTC().Format(time.RFC3339)))
|
|
}
|
|
if err := s.jobRepository.Save(ctx, job); err != nil {
|
|
return err
|
|
}
|
|
cfg := parseJobConfig(job.Config)
|
|
if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusPending); err != nil {
|
|
return err
|
|
}
|
|
if err := s.queue.Enqueue(ctx, job); err != nil {
|
|
return err
|
|
}
|
|
if s.logger != nil {
|
|
s.logger.Info("requeued job", "job_id", job.ID, "retry_count", s.retryCount(job), "max_retries", s.maxRetries(job), "action", "requeue")
|
|
}
|
|
return s.publishJobUpdate(ctx, job.ID, pending, cfg.VideoID)
|
|
}
|
|
|
|
func (s *JobService) moveJobToDLQ(ctx context.Context, job *model.Job, reason string) error {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
failure := string(dto.JobStatusFailure)
|
|
now := time.Now()
|
|
job.Status = &failure
|
|
job.AgentID = nil
|
|
job.UpdatedAt = &now
|
|
job.Logs = appendSystemLog(job.Logs, fmt.Sprintf("[SYSTEM] Sent to DLQ: %s", strings.TrimSpace(reason)))
|
|
if err := s.jobRepository.Save(ctx, job); err != nil {
|
|
return err
|
|
}
|
|
cfg := parseJobConfig(job.Config)
|
|
if err := syncVideoStatus(ctx, s.jobRepository, cfg.VideoID, dto.JobStatusFailure); err != nil {
|
|
return err
|
|
}
|
|
if err := s.removeFromQueue(ctx, job.ID); err != nil {
|
|
return err
|
|
}
|
|
if s.dlq != nil {
|
|
if err := s.dlq.Add(ctx, job, strings.TrimSpace(reason)); err != nil {
|
|
return err
|
|
}
|
|
if s.logger != nil {
|
|
dlqCount, _ := s.dlq.Count(ctx)
|
|
s.logger.Warn("moved job to dlq", "job_id", job.ID, "reason", strings.TrimSpace(reason), "retry_count", s.retryCount(job), "max_retries", s.maxRetries(job), "dlq_size", dlqCount)
|
|
}
|
|
}
|
|
return s.publishJobUpdate(ctx, job.ID, failure, cfg.VideoID)
|
|
}
|
|
|
|
func (s *JobService) canDispatchJob(job *model.Job) bool {
|
|
if job == nil {
|
|
return false
|
|
}
|
|
if job.Cancelled != nil && *job.Cancelled {
|
|
return false
|
|
}
|
|
return s.jobStatus(job) == dto.JobStatusPending
|
|
}
|
|
|
|
func (s *JobService) canAutoRetry(job *model.Job) bool {
|
|
return s.retryCount(job) < s.maxRetries(job)
|
|
}
|
|
|
|
func (s *JobService) retryCount(job *model.Job) int64 {
|
|
if job == nil || job.RetryCount == nil {
|
|
return 0
|
|
}
|
|
return *job.RetryCount
|
|
}
|
|
|
|
func (s *JobService) maxRetries(job *model.Job) int64 {
|
|
if job == nil || job.MaxRetries == nil || *job.MaxRetries <= 0 {
|
|
return 3
|
|
}
|
|
return *job.MaxRetries
|
|
}
|
|
|
|
func (s *JobService) jobStatus(job *model.Job) dto.JobStatus {
|
|
if job == nil || job.Status == nil {
|
|
return dto.JobStatusPending
|
|
}
|
|
return dto.JobStatus(strings.TrimSpace(*job.Status))
|
|
}
|
|
|
|
func (s *JobService) publishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error {
|
|
if s.pubsub == nil {
|
|
return nil
|
|
}
|
|
return s.pubsub.PublishJobUpdate(ctx, jobID, status, videoID)
|
|
}
|
|
|
|
func (s *JobService) publishLog(ctx context.Context, jobID string, logLine string, progress float64) error {
|
|
if s.pubsub == nil {
|
|
return nil
|
|
}
|
|
return s.pubsub.Publish(ctx, jobID, logLine, progress)
|
|
}
|
|
|
|
func (s *JobService) ListDLQ(ctx context.Context, offset, limit int) ([]*dto.DLQEntry, int64, error) {
|
|
if s.dlq == nil {
|
|
return []*dto.DLQEntry{}, 0, nil
|
|
}
|
|
if offset < 0 {
|
|
offset = 0
|
|
}
|
|
if limit <= 0 {
|
|
limit = 20
|
|
}
|
|
entries, err := s.dlq.List(ctx, int64(offset), int64(limit))
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
count, err := s.dlq.Count(ctx)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
items := make([]*dto.DLQEntry, 0, len(entries))
|
|
for _, entry := range entries {
|
|
items = append(items, &dto.DLQEntry{Job: entry.Job, FailureTime: entry.FailureTime.Unix(), Reason: entry.Reason, RetryCount: entry.RetryCount})
|
|
}
|
|
return items, count, nil
|
|
}
|
|
|
|
func (s *JobService) GetDLQ(ctx context.Context, id string) (*dto.DLQEntry, error) {
|
|
if s.dlq == nil {
|
|
return nil, fmt.Errorf("job not found in DLQ")
|
|
}
|
|
entry, err := s.dlq.Get(ctx, strings.TrimSpace(id))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &dto.DLQEntry{Job: entry.Job, FailureTime: entry.FailureTime.Unix(), Reason: entry.Reason, RetryCount: entry.RetryCount}, nil
|
|
}
|
|
|
|
func (s *JobService) RetryDLQ(ctx context.Context, id string) (*model.Job, error) {
|
|
if s.dlq == nil {
|
|
return nil, fmt.Errorf("job not found in DLQ")
|
|
}
|
|
job, err := s.dlq.Retry(ctx, strings.TrimSpace(id))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if job == nil {
|
|
return nil, fmt.Errorf("job not found in DLQ")
|
|
}
|
|
if err := s.requeueJob(ctx, job, false); err != nil {
|
|
return nil, err
|
|
}
|
|
if s.logger != nil {
|
|
count, _ := s.dlq.Count(ctx)
|
|
s.logger.Info("retried job from dlq", "job_id", job.ID, "dlq_size", count)
|
|
}
|
|
return s.jobRepository.GetByID(ctx, job.ID)
|
|
}
|
|
|
|
func (s *JobService) RemoveDLQ(ctx context.Context, id string) error {
|
|
if s.dlq == nil {
|
|
return fmt.Errorf("job not found in DLQ")
|
|
}
|
|
jobID := strings.TrimSpace(id)
|
|
if err := s.dlq.Remove(ctx, jobID); err != nil {
|
|
return err
|
|
}
|
|
if s.logger != nil {
|
|
count, _ := s.dlq.Count(ctx)
|
|
s.logger.Info("removed job from dlq", "job_id", jobID, "dlq_size", count)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *JobService) removeFromQueue(ctx context.Context, jobID string) error {
|
|
type ackable interface {
|
|
Ack(ctx context.Context, jobID string) error
|
|
}
|
|
if queue, ok := s.queue.(ackable); ok {
|
|
return queue.Ack(ctx, jobID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func appendSystemLog(logs *string, line string) *string {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
return logs
|
|
}
|
|
existing := ""
|
|
if logs != nil {
|
|
existing = *logs
|
|
}
|
|
if existing != "" && !strings.HasSuffix(existing, "\n") {
|
|
existing += "\n"
|
|
}
|
|
existing += line + "\n"
|
|
return &existing
|
|
}
|