package redis import ( "context" "encoding/json" "errors" "fmt" "strings" "time" goredis "github.com/redis/go-redis/v9" "stream.api/internal/database/model" "stream.api/internal/dto" ) const ( JobQueueKey = "render:jobs:queue:v2" JobInflightKey = "render:jobs:inflight" JobInflightMetaKey = "render:jobs:inflight:meta" JobSequenceKey = "render:jobs:queue:seq" LogChannel = "render:jobs:logs" ResourceChannel = "render:agents:resources" JobUpdateChannel = "render:jobs:updates" defaultQueuePoll = time.Second defaultInflightTTL = 15 * time.Minute ) type RedisAdapter struct{ client *goredis.Client } type inflightMeta struct { ReadyScore float64 `json:"ready_score"` ClaimedAt int64 `json:"claimed_at"` } func NewAdapter(addr, password string, db int) (*RedisAdapter, error) { client := goredis.NewClient(&goredis.Options{Addr: addr, Password: password, DB: db}) if err := client.Ping(context.Background()).Err(); err != nil { return nil, err } return &RedisAdapter{client: client}, nil } func (r *RedisAdapter) Client() *goredis.Client { return r.client } func (r *RedisAdapter) Enqueue(ctx context.Context, job *model.Job) error { if job == nil || strings.TrimSpace(job.ID) == "" { return errors.New("job id is required") } priority := int64(0) if job.Priority != nil { priority = *job.Priority } seq, err := r.client.Incr(ctx, JobSequenceKey).Result() if err != nil { return err } score := float64((-priority * 1_000_000_000_000) + seq) jobID := strings.TrimSpace(job.ID) if err := r.client.HDel(ctx, JobInflightMetaKey, jobID).Err(); err != nil { return err } if err := r.client.ZRem(ctx, JobInflightKey, jobID).Err(); err != nil { return err } return r.client.ZAdd(ctx, JobQueueKey, goredis.Z{Score: score, Member: jobID}).Err() } func (r *RedisAdapter) Dequeue(ctx context.Context) (*model.Job, error) { for { if ctx.Err() != nil { return nil, ctx.Err() } res, err := r.client.ZPopMin(ctx, JobQueueKey, 1).Result() if err != nil { if ctx.Err() != nil { return nil, ctx.Err() } return nil, err } if len(res) == 0 { select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(defaultQueuePoll): continue } } jobID := fmt.Sprintf("%v", res[0].Member) meta, err := json.Marshal(inflightMeta{ReadyScore: res[0].Score, ClaimedAt: time.Now().Unix()}) if err != nil { return nil, err } leaseScore := float64(time.Now().Add(defaultInflightTTL).Unix()) pipe := r.client.TxPipeline() pipe.ZAdd(ctx, JobInflightKey, goredis.Z{Score: leaseScore, Member: jobID}) pipe.HSet(ctx, JobInflightMetaKey, jobID, meta) if _, err := pipe.Exec(ctx); err != nil { return nil, err } return &model.Job{ID: jobID}, nil } } func (r *RedisAdapter) Ack(ctx context.Context, jobID string) error { jobID = strings.TrimSpace(jobID) if jobID == "" { return nil } pipe := r.client.TxPipeline() pipe.ZRem(ctx, JobQueueKey, jobID) pipe.ZRem(ctx, JobInflightKey, jobID) pipe.HDel(ctx, JobInflightMetaKey, jobID) _, err := pipe.Exec(ctx) return err } func (r *RedisAdapter) ListExpiredInflight(ctx context.Context, now time.Time, limit int64) ([]string, error) { if limit <= 0 { limit = 100 } return r.client.ZRangeByScore(ctx, JobInflightKey, &goredis.ZRangeBy{Min: "-inf", Max: fmt.Sprintf("%d", now.Unix()), Offset: 0, Count: limit}).Result() } func (r *RedisAdapter) TouchInflight(ctx context.Context, jobID string, ttl time.Duration) error { jobID = strings.TrimSpace(jobID) if jobID == "" { return nil } if ttl <= 0 { ttl = defaultInflightTTL } return r.client.ZAddXX(ctx, JobInflightKey, goredis.Z{Score: float64(time.Now().Add(ttl).Unix()), Member: jobID}).Err() } func (r *RedisAdapter) Publish(ctx context.Context, jobID string, logLine string, progress float64) error { payload, err := json.Marshal(dto.LogEntry{JobID: jobID, Line: logLine, Progress: progress}) if err != nil { return err } return r.client.Publish(ctx, LogChannel, payload).Err() } func (r *RedisAdapter) Subscribe(ctx context.Context, jobID string) (<-chan dto.LogEntry, error) { pubsub := r.client.Subscribe(ctx, LogChannel) ch := make(chan dto.LogEntry) go func() { defer close(ch) defer pubsub.Close() for msg := range pubsub.Channel() { var entry dto.LogEntry if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil { continue } if jobID == "" || entry.JobID == jobID { ch <- entry } } }() return ch, nil } func (r *RedisAdapter) PublishResource(ctx context.Context, agentID string, data []byte) error { var decoded struct { CPU float64 `json:"cpu"` RAM float64 `json:"ram"` } if err := json.Unmarshal(data, &decoded); err != nil { return err } payload, err := json.Marshal(dto.SystemResource{AgentID: agentID, CPU: decoded.CPU, RAM: decoded.RAM}) if err != nil { return err } return r.client.Publish(ctx, ResourceChannel, payload).Err() } func (r *RedisAdapter) SubscribeResources(ctx context.Context) (<-chan dto.SystemResource, error) { pubsub := r.client.Subscribe(ctx, ResourceChannel) ch := make(chan dto.SystemResource) go func() { defer close(ch) defer pubsub.Close() for msg := range pubsub.Channel() { var entry dto.SystemResource if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil { continue } ch <- entry } }() return ch, nil } func (r *RedisAdapter) PublishCancel(ctx context.Context, agentID string, jobID string) error { return r.client.Publish(ctx, fmt.Sprintf("render:agents:%s:cancel", agentID), jobID).Err() } func (r *RedisAdapter) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) { pubsub := r.client.Subscribe(ctx, fmt.Sprintf("render:agents:%s:cancel", agentID)) ch := make(chan string) go func() { defer close(ch) defer pubsub.Close() for msg := range pubsub.Channel() { ch <- msg.Payload } }() return ch, nil } func (r *RedisAdapter) PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error { payload, err := json.Marshal(map[string]string{"job_id": jobID, "status": status, "video_id": videoID}) if err != nil { return err } return r.client.Publish(ctx, JobUpdateChannel, payload).Err() } func (r *RedisAdapter) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) { pubsub := r.client.Subscribe(ctx, JobUpdateChannel) ch := make(chan string) go func() { defer close(ch) defer pubsub.Close() for msg := range pubsub.Channel() { ch <- msg.Payload } }() return ch, nil } func (c *RedisAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { return c.client.Set(ctx, key, value, expiration).Err() } func (c *RedisAdapter) Get(ctx context.Context, key string) (string, error) { return c.client.Get(ctx, key).Result() } func (c *RedisAdapter) Del(ctx context.Context, key string) error { return c.client.Del(ctx, key).Err() } func (c *RedisAdapter) Close() error { return c.client.Close() }