247 lines
6.9 KiB
Go
247 lines
6.9 KiB
Go
package redis
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
goredis "github.com/redis/go-redis/v9"
|
|
"stream.api/internal/database/model"
|
|
"stream.api/internal/dto"
|
|
)
|
|
|
|
const (
|
|
JobQueueKey = "render:jobs:queue:v2"
|
|
JobInflightKey = "render:jobs:inflight"
|
|
JobInflightMetaKey = "render:jobs:inflight:meta"
|
|
JobSequenceKey = "render:jobs:queue:seq"
|
|
LogChannel = "render:jobs:logs"
|
|
ResourceChannel = "render:agents:resources"
|
|
JobUpdateChannel = "render:jobs:updates"
|
|
defaultQueuePoll = time.Second
|
|
defaultInflightTTL = 15 * time.Minute
|
|
)
|
|
|
|
type RedisAdapter struct{ client *goredis.Client }
|
|
|
|
type inflightMeta struct {
|
|
ReadyScore float64 `json:"ready_score"`
|
|
ClaimedAt int64 `json:"claimed_at"`
|
|
}
|
|
|
|
func NewAdapter(addr, password string, db int) (*RedisAdapter, error) {
|
|
client := goredis.NewClient(&goredis.Options{Addr: addr, Password: password, DB: db})
|
|
if err := client.Ping(context.Background()).Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return &RedisAdapter{client: client}, nil
|
|
}
|
|
|
|
func (r *RedisAdapter) Client() *goredis.Client { return r.client }
|
|
|
|
func (r *RedisAdapter) Enqueue(ctx context.Context, job *model.Job) error {
|
|
if job == nil || strings.TrimSpace(job.ID) == "" {
|
|
return errors.New("job id is required")
|
|
}
|
|
priority := int64(0)
|
|
if job.Priority != nil {
|
|
priority = *job.Priority
|
|
}
|
|
seq, err := r.client.Incr(ctx, JobSequenceKey).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
score := float64((-priority * 1_000_000_000_000) + seq)
|
|
jobID := strings.TrimSpace(job.ID)
|
|
if err := r.client.HDel(ctx, JobInflightMetaKey, jobID).Err(); err != nil {
|
|
return err
|
|
}
|
|
if err := r.client.ZRem(ctx, JobInflightKey, jobID).Err(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.ZAdd(ctx, JobQueueKey, goredis.Z{Score: score, Member: jobID}).Err()
|
|
}
|
|
|
|
func (r *RedisAdapter) Dequeue(ctx context.Context) (*model.Job, error) {
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
res, err := r.client.ZPopMin(ctx, JobQueueKey, 1).Result()
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
return nil, err
|
|
}
|
|
if len(res) == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(defaultQueuePoll):
|
|
continue
|
|
}
|
|
}
|
|
jobID := fmt.Sprintf("%v", res[0].Member)
|
|
meta, err := json.Marshal(inflightMeta{ReadyScore: res[0].Score, ClaimedAt: time.Now().Unix()})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
leaseScore := float64(time.Now().Add(defaultInflightTTL).Unix())
|
|
pipe := r.client.TxPipeline()
|
|
pipe.ZAdd(ctx, JobInflightKey, goredis.Z{Score: leaseScore, Member: jobID})
|
|
pipe.HSet(ctx, JobInflightMetaKey, jobID, meta)
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return &model.Job{ID: jobID}, nil
|
|
}
|
|
}
|
|
|
|
func (r *RedisAdapter) Ack(ctx context.Context, jobID string) error {
|
|
jobID = strings.TrimSpace(jobID)
|
|
if jobID == "" {
|
|
return nil
|
|
}
|
|
pipe := r.client.TxPipeline()
|
|
pipe.ZRem(ctx, JobQueueKey, jobID)
|
|
pipe.ZRem(ctx, JobInflightKey, jobID)
|
|
pipe.HDel(ctx, JobInflightMetaKey, jobID)
|
|
_, err := pipe.Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
func (r *RedisAdapter) ListExpiredInflight(ctx context.Context, now time.Time, limit int64) ([]string, error) {
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
return r.client.ZRangeByScore(ctx, JobInflightKey, &goredis.ZRangeBy{Min: "-inf", Max: fmt.Sprintf("%d", now.Unix()), Offset: 0, Count: limit}).Result()
|
|
}
|
|
|
|
func (r *RedisAdapter) TouchInflight(ctx context.Context, jobID string, ttl time.Duration) error {
|
|
jobID = strings.TrimSpace(jobID)
|
|
if jobID == "" {
|
|
return nil
|
|
}
|
|
if ttl <= 0 {
|
|
ttl = defaultInflightTTL
|
|
}
|
|
return r.client.ZAddXX(ctx, JobInflightKey, goredis.Z{Score: float64(time.Now().Add(ttl).Unix()), Member: jobID}).Err()
|
|
}
|
|
|
|
func (r *RedisAdapter) Publish(ctx context.Context, jobID string, logLine string, progress float64) error {
|
|
payload, err := json.Marshal(dto.LogEntry{JobID: jobID, Line: logLine, Progress: progress})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.client.Publish(ctx, LogChannel, payload).Err()
|
|
}
|
|
|
|
func (r *RedisAdapter) Subscribe(ctx context.Context, jobID string) (<-chan dto.LogEntry, error) {
|
|
pubsub := r.client.Subscribe(ctx, LogChannel)
|
|
ch := make(chan dto.LogEntry)
|
|
go func() {
|
|
defer close(ch)
|
|
defer pubsub.Close()
|
|
for msg := range pubsub.Channel() {
|
|
var entry dto.LogEntry
|
|
if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil {
|
|
continue
|
|
}
|
|
if jobID == "" || entry.JobID == jobID {
|
|
ch <- entry
|
|
}
|
|
}
|
|
}()
|
|
return ch, nil
|
|
}
|
|
|
|
func (r *RedisAdapter) PublishResource(ctx context.Context, agentID string, data []byte) error {
|
|
var decoded struct {
|
|
CPU float64 `json:"cpu"`
|
|
RAM float64 `json:"ram"`
|
|
}
|
|
if err := json.Unmarshal(data, &decoded); err != nil {
|
|
return err
|
|
}
|
|
payload, err := json.Marshal(dto.SystemResource{AgentID: agentID, CPU: decoded.CPU, RAM: decoded.RAM})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.client.Publish(ctx, ResourceChannel, payload).Err()
|
|
}
|
|
|
|
func (r *RedisAdapter) SubscribeResources(ctx context.Context) (<-chan dto.SystemResource, error) {
|
|
pubsub := r.client.Subscribe(ctx, ResourceChannel)
|
|
ch := make(chan dto.SystemResource)
|
|
go func() {
|
|
defer close(ch)
|
|
defer pubsub.Close()
|
|
for msg := range pubsub.Channel() {
|
|
var entry dto.SystemResource
|
|
if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil {
|
|
continue
|
|
}
|
|
ch <- entry
|
|
}
|
|
}()
|
|
return ch, nil
|
|
}
|
|
|
|
func (r *RedisAdapter) PublishCancel(ctx context.Context, agentID string, jobID string) error {
|
|
return r.client.Publish(ctx, fmt.Sprintf("render:agents:%s:cancel", agentID), jobID).Err()
|
|
}
|
|
|
|
func (r *RedisAdapter) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) {
|
|
pubsub := r.client.Subscribe(ctx, fmt.Sprintf("render:agents:%s:cancel", agentID))
|
|
ch := make(chan string)
|
|
go func() {
|
|
defer close(ch)
|
|
defer pubsub.Close()
|
|
for msg := range pubsub.Channel() {
|
|
ch <- msg.Payload
|
|
}
|
|
}()
|
|
return ch, nil
|
|
}
|
|
|
|
func (r *RedisAdapter) PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error {
|
|
payload, err := json.Marshal(map[string]string{"job_id": jobID, "status": status, "video_id": videoID})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.client.Publish(ctx, JobUpdateChannel, payload).Err()
|
|
}
|
|
|
|
func (r *RedisAdapter) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) {
|
|
pubsub := r.client.Subscribe(ctx, JobUpdateChannel)
|
|
ch := make(chan string)
|
|
go func() {
|
|
defer close(ch)
|
|
defer pubsub.Close()
|
|
for msg := range pubsub.Channel() {
|
|
ch <- msg.Payload
|
|
}
|
|
}()
|
|
return ch, nil
|
|
}
|
|
|
|
func (c *RedisAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
|
|
return c.client.Set(ctx, key, value, expiration).Err()
|
|
}
|
|
|
|
func (c *RedisAdapter) Get(ctx context.Context, key string) (string, error) {
|
|
return c.client.Get(ctx, key).Result()
|
|
}
|
|
|
|
func (c *RedisAdapter) Del(ctx context.Context, key string) error {
|
|
return c.client.Del(ctx, key).Err()
|
|
}
|
|
|
|
func (c *RedisAdapter) Close() error {
|
|
return c.client.Close()
|
|
}
|