feat: Add player_configs feature and migrate user preferences
- Implemented player_configs table to store multiple player configurations per user. - Migrated existing player settings from user_preferences to player_configs. - Removed player-related columns from user_preferences. - Added referral state fields to user for tracking referral rewards. - Created migration scripts for database changes and data migration. - Added test cases for app services and usage helpers. - Introduced video job service interfaces and implementations.
This commit is contained in:
@@ -2,7 +2,9 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
@@ -23,7 +25,7 @@ type LogPubSub interface {
|
||||
Publish(ctx context.Context, jobID string, logLine string, progress float64) error
|
||||
PublishResource(ctx context.Context, agentID string, data []byte) error
|
||||
PublishCancel(ctx context.Context, agentID string, jobID string) error
|
||||
PublishJobUpdate(ctx context.Context, jobID string, status string) error
|
||||
PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error
|
||||
Subscribe(ctx context.Context, jobID string) (<-chan domain.LogEntry, error)
|
||||
SubscribeResources(ctx context.Context) (<-chan domain.SystemResource, error)
|
||||
SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error)
|
||||
@@ -39,12 +41,29 @@ func NewJobService(queue JobQueue, pubsub LogPubSub) *JobService {
|
||||
return &JobService{queue: queue, pubsub: pubsub}
|
||||
}
|
||||
|
||||
var ErrInvalidJobCursor = errors.New("invalid job cursor")
|
||||
|
||||
const (
|
||||
defaultJobPageSize = 20
|
||||
maxJobPageSize = 100
|
||||
jobCursorVersion = 1
|
||||
)
|
||||
|
||||
type PaginatedJobs struct {
|
||||
Jobs []*domain.Job `json:"jobs"`
|
||||
Total int64 `json:"total"`
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
HasMore bool `json:"has_more"`
|
||||
Jobs []*domain.Job `json:"jobs"`
|
||||
Total int64 `json:"total"`
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
HasMore bool `json:"has_more"`
|
||||
NextCursor string `json:"next_cursor,omitempty"`
|
||||
PageSize int `json:"page_size"`
|
||||
}
|
||||
|
||||
type jobListCursor struct {
|
||||
Version int `json:"v"`
|
||||
CreatedAtUnixNano int64 `json:"created_at_unix_nano"`
|
||||
ID string `json:"id"`
|
||||
AgentID string `json:"agent_id,omitempty"`
|
||||
}
|
||||
|
||||
type jobConfigEnvelope struct {
|
||||
@@ -53,6 +72,7 @@ type jobConfigEnvelope struct {
|
||||
Environment map[string]string `json:"environment,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
UserID string `json:"user_id,omitempty"`
|
||||
VideoID string `json:"video_id,omitempty"`
|
||||
TimeLimit int64 `json:"time_limit,omitempty"`
|
||||
}
|
||||
|
||||
@@ -60,7 +80,6 @@ func strPtr(v string) *string { return &v }
|
||||
func int64Ptr(v int64) *int64 { return &v }
|
||||
func boolPtr(v bool) *bool { return &v }
|
||||
func float64Ptr(v float64) *float64 { return &v }
|
||||
func int32Ptr(v int32) *int32 { return &v }
|
||||
func timePtr(v time.Time) *time.Time { return &v }
|
||||
|
||||
func parseJobConfig(raw *string) jobConfigEnvelope {
|
||||
@@ -72,7 +91,7 @@ func parseJobConfig(raw *string) jobConfigEnvelope {
|
||||
return cfg
|
||||
}
|
||||
|
||||
func encodeJobConfig(raw []byte, name, userID string, timeLimit int64) string {
|
||||
func encodeJobConfig(raw []byte, name, userID, videoID string, timeLimit int64) string {
|
||||
cfg := parseJobConfig(strPtr(string(raw)))
|
||||
if name != "" {
|
||||
cfg.Name = name
|
||||
@@ -80,6 +99,9 @@ func encodeJobConfig(raw []byte, name, userID string, timeLimit int64) string {
|
||||
if userID != "" {
|
||||
cfg.UserID = userID
|
||||
}
|
||||
if videoID != "" {
|
||||
cfg.VideoID = videoID
|
||||
}
|
||||
if timeLimit > 0 {
|
||||
cfg.TimeLimit = timeLimit
|
||||
}
|
||||
@@ -87,12 +109,87 @@ func encodeJobConfig(raw []byte, name, userID string, timeLimit int64) string {
|
||||
return string(encoded)
|
||||
}
|
||||
|
||||
func normalizeJobPageSize(pageSize int) int {
|
||||
if pageSize <= 0 {
|
||||
return defaultJobPageSize
|
||||
}
|
||||
if pageSize > maxJobPageSize {
|
||||
return maxJobPageSize
|
||||
}
|
||||
return pageSize
|
||||
}
|
||||
|
||||
func encodeJobListCursor(cursor jobListCursor) (string, error) {
|
||||
payload, err := json.Marshal(cursor)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return base64.RawURLEncoding.EncodeToString(payload), nil
|
||||
}
|
||||
|
||||
func decodeJobListCursor(raw string) (*jobListCursor, error) {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return nil, nil
|
||||
}
|
||||
payload, err := base64.RawURLEncoding.DecodeString(raw)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidJobCursor
|
||||
}
|
||||
var cursor jobListCursor
|
||||
if err := json.Unmarshal(payload, &cursor); err != nil {
|
||||
return nil, ErrInvalidJobCursor
|
||||
}
|
||||
if cursor.Version != jobCursorVersion || cursor.CreatedAtUnixNano <= 0 || strings.TrimSpace(cursor.ID) == "" {
|
||||
return nil, ErrInvalidJobCursor
|
||||
}
|
||||
cursor.ID = strings.TrimSpace(cursor.ID)
|
||||
cursor.AgentID = strings.TrimSpace(cursor.AgentID)
|
||||
return &cursor, nil
|
||||
}
|
||||
|
||||
func buildJobListCursor(job *model.Job, agentID string) (string, error) {
|
||||
if job == nil || job.CreatedAt == nil {
|
||||
return "", nil
|
||||
}
|
||||
return encodeJobListCursor(jobListCursor{
|
||||
Version: jobCursorVersion,
|
||||
CreatedAtUnixNano: job.CreatedAt.UTC().UnixNano(),
|
||||
ID: strings.TrimSpace(job.ID),
|
||||
AgentID: strings.TrimSpace(agentID),
|
||||
})
|
||||
}
|
||||
|
||||
func listJobsByOffset(ctx context.Context, agentID string, offset, limit int) (*PaginatedJobs, error) {
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
limit = normalizeJobPageSize(limit)
|
||||
q := query.Job.WithContext(ctx).Order(query.Job.CreatedAt.Desc(), query.Job.ID.Desc())
|
||||
if agentID != "" {
|
||||
agentNumeric, err := strconv.ParseInt(agentID, 10, 64)
|
||||
if err != nil {
|
||||
return &PaginatedJobs{Jobs: []*domain.Job{}, Total: 0, Offset: offset, Limit: limit, PageSize: limit, HasMore: false}, nil
|
||||
}
|
||||
q = q.Where(query.Job.AgentID.Eq(agentNumeric))
|
||||
}
|
||||
jobs, total, err := q.FindByPage(offset, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items := make([]*domain.Job, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
items = append(items, toDomainJob(job))
|
||||
}
|
||||
return &PaginatedJobs{Jobs: items, Total: total, Offset: offset, Limit: limit, PageSize: limit, HasMore: offset+len(items) < int(total)}, nil
|
||||
}
|
||||
|
||||
func toDomainJob(job *model.Job) *domain.Job {
|
||||
if job == nil {
|
||||
return nil
|
||||
}
|
||||
cfg := parseJobConfig(job.Config)
|
||||
result := &domain.Job{ID: job.ID, Name: cfg.Name, UserID: cfg.UserID, TimeLimit: cfg.TimeLimit}
|
||||
result := &domain.Job{ID: job.ID, Name: cfg.Name, UserID: cfg.UserID, VideoID: cfg.VideoID, TimeLimit: cfg.TimeLimit}
|
||||
if job.Status != nil {
|
||||
result.Status = domain.JobStatus(*job.Status)
|
||||
}
|
||||
@@ -142,14 +239,14 @@ func toDomainJob(job *model.Job) *domain.Job {
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *JobService) CreateJob(ctx context.Context, userID string, name string, config []byte, priority int, timeLimit int64) (*domain.Job, error) {
|
||||
func (s *JobService) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*domain.Job, error) {
|
||||
status := string(domain.JobStatusPending)
|
||||
now := time.Now()
|
||||
job := &model.Job{
|
||||
ID: fmt.Sprintf("job-%d", now.UnixNano()),
|
||||
Status: strPtr(status),
|
||||
Priority: int64Ptr(int64(priority)),
|
||||
Config: strPtr(encodeJobConfig(config, name, userID, timeLimit)),
|
||||
Config: strPtr(encodeJobConfig(config, name, userID, videoID, timeLimit)),
|
||||
Cancelled: boolPtr(false),
|
||||
RetryCount: int64Ptr(0),
|
||||
MaxRetries: int64Ptr(3),
|
||||
@@ -159,52 +256,86 @@ func (s *JobService) CreateJob(ctx context.Context, userID string, name string,
|
||||
if err := query.Job.WithContext(ctx).Create(job); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := syncVideoStatus(ctx, videoID, domain.JobStatusPending); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
domainJob := toDomainJob(job)
|
||||
if err := s.queue.Enqueue(ctx, domainJob); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_ = s.pubsub.PublishJobUpdate(ctx, job.ID, status, videoID)
|
||||
return domainJob, nil
|
||||
}
|
||||
|
||||
func (s *JobService) ListJobs(ctx context.Context, offset, limit int) (*PaginatedJobs, error) {
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
if limit <= 0 || limit > 100 {
|
||||
limit = 20
|
||||
}
|
||||
jobs, total, err := query.Job.WithContext(ctx).Order(query.Job.CreatedAt.Desc()).FindByPage(offset, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items := make([]*domain.Job, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
items = append(items, toDomainJob(job))
|
||||
}
|
||||
return &PaginatedJobs{Jobs: items, Total: total, Offset: offset, Limit: limit, HasMore: offset+len(items) < int(total)}, nil
|
||||
return listJobsByOffset(ctx, "", offset, limit)
|
||||
}
|
||||
|
||||
func (s *JobService) ListJobsByAgent(ctx context.Context, agentID string, offset, limit int) (*PaginatedJobs, error) {
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
if limit <= 0 || limit > 100 {
|
||||
limit = 20
|
||||
}
|
||||
agentNumeric, err := strconv.ParseInt(agentID, 10, 64)
|
||||
if err != nil {
|
||||
return &PaginatedJobs{Jobs: []*domain.Job{}, Total: 0, Offset: offset, Limit: limit, HasMore: false}, nil
|
||||
}
|
||||
q := query.Job.WithContext(ctx).Where(query.Job.AgentID.Eq(agentNumeric)).Order(query.Job.CreatedAt.Desc())
|
||||
jobs, total, err := q.FindByPage(offset, limit)
|
||||
return listJobsByOffset(ctx, strings.TrimSpace(agentID), offset, limit)
|
||||
}
|
||||
|
||||
func (s *JobService) ListJobsByCursor(ctx context.Context, agentID string, cursor string, pageSize int) (*PaginatedJobs, error) {
|
||||
agentID = strings.TrimSpace(agentID)
|
||||
pageSize = normalizeJobPageSize(pageSize)
|
||||
|
||||
decodedCursor, err := decodeJobListCursor(cursor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if decodedCursor != nil && decodedCursor.AgentID != agentID {
|
||||
return nil, ErrInvalidJobCursor
|
||||
}
|
||||
|
||||
q := query.Job.WithContext(ctx).Order(query.Job.CreatedAt.Desc(), query.Job.ID.Desc())
|
||||
if agentID != "" {
|
||||
agentNumeric, err := strconv.ParseInt(agentID, 10, 64)
|
||||
if err != nil {
|
||||
return &PaginatedJobs{Jobs: []*domain.Job{}, Total: 0, Limit: pageSize, PageSize: pageSize, HasMore: false}, nil
|
||||
}
|
||||
q = q.Where(query.Job.AgentID.Eq(agentNumeric))
|
||||
}
|
||||
var cursorTime time.Time
|
||||
if decodedCursor != nil {
|
||||
cursorTime = time.Unix(0, decodedCursor.CreatedAtUnixNano).UTC()
|
||||
}
|
||||
|
||||
queryDB := q.UnderlyingDB()
|
||||
if decodedCursor != nil {
|
||||
queryDB = queryDB.Where("(created_at < ?) OR (created_at = ? AND id < ?)", cursorTime, cursorTime, decodedCursor.ID)
|
||||
}
|
||||
|
||||
var jobs []*model.Job
|
||||
if err := queryDB.Limit(pageSize + 1).Find(&jobs).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasMore := len(jobs) > pageSize
|
||||
if hasMore {
|
||||
jobs = jobs[:pageSize]
|
||||
}
|
||||
|
||||
items := make([]*domain.Job, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
items = append(items, toDomainJob(job))
|
||||
}
|
||||
return &PaginatedJobs{Jobs: items, Total: total, Offset: offset, Limit: limit, HasMore: offset+len(items) < int(total)}, nil
|
||||
|
||||
nextCursor := ""
|
||||
if hasMore && len(jobs) > 0 {
|
||||
nextCursor, err = buildJobListCursor(jobs[len(jobs)-1], agentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &PaginatedJobs{
|
||||
Jobs: items,
|
||||
Total: 0,
|
||||
Limit: pageSize,
|
||||
PageSize: pageSize,
|
||||
HasMore: hasMore,
|
||||
NextCursor: nextCursor,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *JobService) GetJob(ctx context.Context, id string) (*domain.Job, error) {
|
||||
@@ -242,7 +373,11 @@ func (s *JobService) UpdateJobStatus(ctx context.Context, jobID string, status d
|
||||
if err := query.Job.WithContext(ctx).Save(job); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.pubsub.PublishJobUpdate(ctx, jobID, string(status))
|
||||
cfg := parseJobConfig(job.Config)
|
||||
if err := syncVideoStatus(ctx, cfg.VideoID, status); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.pubsub.PublishJobUpdate(ctx, jobID, string(status), cfg.VideoID)
|
||||
}
|
||||
|
||||
func (s *JobService) AssignJob(ctx context.Context, jobID string, agentID string) error {
|
||||
@@ -262,7 +397,11 @@ func (s *JobService) AssignJob(ctx context.Context, jobID string, agentID string
|
||||
if err := query.Job.WithContext(ctx).Save(job); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.pubsub.PublishJobUpdate(ctx, jobID, status)
|
||||
cfg := parseJobConfig(job.Config)
|
||||
if err := syncVideoStatus(ctx, cfg.VideoID, domain.JobStatusRunning); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.pubsub.PublishJobUpdate(ctx, jobID, status, cfg.VideoID)
|
||||
}
|
||||
|
||||
func (s *JobService) CancelJob(ctx context.Context, jobID string) error {
|
||||
@@ -286,7 +425,11 @@ func (s *JobService) CancelJob(ctx context.Context, jobID string) error {
|
||||
if err := query.Job.WithContext(ctx).Save(job); err != nil {
|
||||
return err
|
||||
}
|
||||
_ = s.pubsub.PublishJobUpdate(ctx, jobID, status)
|
||||
cfg := parseJobConfig(job.Config)
|
||||
if err := syncVideoStatus(ctx, cfg.VideoID, domain.JobStatusCancelled); err != nil {
|
||||
return err
|
||||
}
|
||||
_ = s.pubsub.PublishJobUpdate(ctx, jobID, status, cfg.VideoID)
|
||||
if job.AgentID != nil {
|
||||
_ = s.pubsub.PublishCancel(ctx, strconv.FormatInt(*job.AgentID, 10), job.ID)
|
||||
}
|
||||
@@ -329,10 +472,15 @@ func (s *JobService) RetryJob(ctx context.Context, jobID string) (*domain.Job, e
|
||||
if err := query.Job.WithContext(ctx).Save(job); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg := parseJobConfig(job.Config)
|
||||
if err := syncVideoStatus(ctx, cfg.VideoID, domain.JobStatusPending); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
domainJob := toDomainJob(job)
|
||||
if err := s.queue.Enqueue(ctx, domainJob); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_ = s.pubsub.PublishJobUpdate(ctx, jobID, pending, cfg.VideoID)
|
||||
return domainJob, nil
|
||||
}
|
||||
|
||||
@@ -389,6 +537,29 @@ func (s *JobService) ProcessLog(ctx context.Context, jobID string, logData []byt
|
||||
return s.pubsub.Publish(ctx, jobID, line, progress)
|
||||
}
|
||||
|
||||
func syncVideoStatus(ctx context.Context, videoID string, status domain.JobStatus) error {
|
||||
videoID = strings.TrimSpace(videoID)
|
||||
if videoID == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
statusValue := "processing"
|
||||
processingStatus := "PROCESSING"
|
||||
switch status {
|
||||
case domain.JobStatusSuccess:
|
||||
statusValue = "ready"
|
||||
processingStatus = "READY"
|
||||
case domain.JobStatusFailure, domain.JobStatusCancelled:
|
||||
statusValue = "failed"
|
||||
processingStatus = "FAILED"
|
||||
}
|
||||
|
||||
_, err := query.Video.WithContext(ctx).
|
||||
Where(query.Video.ID.Eq(videoID)).
|
||||
Updates(map[string]any{"status": statusValue, "processing_status": processingStatus})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *JobService) PublishSystemResources(ctx context.Context, agentID string, data []byte) error {
|
||||
return s.pubsub.PublishResource(ctx, agentID, data)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user