Files
stream.api/internal/workflow/render/workflow.go
2026-04-02 11:01:30 +00:00

261 lines
7.6 KiB
Go

package render
import (
"context"
"encoding/json"
"errors"
"net/url"
"strings"
"github.com/google/uuid"
"gorm.io/gorm"
"stream.api/internal/database/model"
"stream.api/internal/dto"
"stream.api/internal/repository"
)
var (
ErrUserNotFound = errors.New("user not found")
ErrAdTemplateNotFound = errors.New("ad template not found")
ErrJobServiceUnavailable = errors.New("job service is unavailable")
)
type JobService interface {
ListJobs(ctx context.Context, offset, limit int) (*dto.PaginatedJobs, error)
ListJobsByAgent(ctx context.Context, agentID string, offset, limit int) (*dto.PaginatedJobs, error)
ListJobsByCursor(ctx context.Context, agentID string, cursor string, pageSize int) (*dto.PaginatedJobs, error)
GetJob(ctx context.Context, id string) (*model.Job, error)
CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*model.Job, error)
CancelJob(ctx context.Context, id string) error
RetryJob(ctx context.Context, id string) (*model.Job, error)
ListDLQ(ctx context.Context, offset, limit int) ([]*dto.DLQEntry, int64, error)
GetDLQ(ctx context.Context, id string) (*dto.DLQEntry, error)
RetryDLQ(ctx context.Context, id string) (*model.Job, error)
RemoveDLQ(ctx context.Context, id string) error
}
type Workflow struct {
db *gorm.DB
jobService JobService
userRepository userRepository
workflowRepository videoWorkflowRepository
}
type userRepository interface {
GetByID(ctx context.Context, userID string) (*model.User, error)
}
type videoWorkflowRepository interface {
GetUserByID(ctx context.Context, userID string) (*model.User, error)
CreateVideoWithStorageAndAd(ctx context.Context, video *model.Video, userID string, adTemplateID *string) error
MarkVideoJobFailed(ctx context.Context, videoID string) error
}
type CreateVideoInput struct {
UserID string
Title string
Description *string
URL string
Size int64
Duration int32
Format string
AdTemplateID *string
}
type CreateVideoResult struct {
Video *model.Video
Job model.Job
}
func New(db *gorm.DB, jobService JobService) *Workflow {
return &Workflow{
db: db,
jobService: jobService,
userRepository: repository.NewUserRepository(db),
workflowRepository: repository.NewVideoWorkflowRepository(db),
}
}
func (w *Workflow) CreateVideo(ctx context.Context, input CreateVideoInput) (*CreateVideoResult, error) {
if w == nil || w.db == nil {
return nil, gorm.ErrInvalidDB
}
userID := strings.TrimSpace(input.UserID)
if userID == "" {
return nil, ErrUserNotFound
}
user, err := w.workflowRepository.GetUserByID(ctx, userID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrUserNotFound
}
return nil, err
}
title := strings.TrimSpace(input.Title)
videoURL := strings.TrimSpace(input.URL)
format := strings.TrimSpace(input.Format)
statusValue := "processing"
processingStatus := "PENDING"
storageType := detectStorageType(videoURL)
video := &model.Video{
ID: uuid.NewString(),
UserID: user.ID,
Name: title,
Title: title,
Description: nullableTrimmedString(input.Description),
URL: videoURL,
Size: input.Size,
Duration: input.Duration,
Format: format,
Status: model.StringPtr(statusValue),
ProcessingStatus: model.StringPtr(processingStatus),
StorageType: model.StringPtr(storageType),
}
if err := w.workflowRepository.CreateVideoWithStorageAndAd(ctx, video, user.ID, input.AdTemplateID); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrAdTemplateNotFound
}
return nil, err
}
if w.jobService == nil {
_ = w.workflowRepository.MarkVideoJobFailed(ctx, video.ID)
return nil, ErrJobServiceUnavailable
}
jobPayload, err := buildJobPayload(video.ID, user.ID, videoURL, format)
if err != nil {
_ = w.workflowRepository.MarkVideoJobFailed(ctx, video.ID)
return nil, err
}
job, err := w.jobService.CreateJob(ctx, user.ID, video.ID, title, jobPayload, 0, 0)
if err != nil {
_ = w.workflowRepository.MarkVideoJobFailed(ctx, video.ID)
return nil, err
}
return &CreateVideoResult{Video: video, Job: *job}, nil
}
func (w *Workflow) ListJobs(ctx context.Context, offset, limit int) (*dto.PaginatedJobs, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.ListJobs(ctx, offset, limit)
}
func (w *Workflow) ListJobsByAgent(ctx context.Context, agentID string, offset, limit int) (*dto.PaginatedJobs, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.ListJobsByAgent(ctx, agentID, offset, limit)
}
func (w *Workflow) ListJobsByCursor(ctx context.Context, agentID string, cursor string, pageSize int) (*dto.PaginatedJobs, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.ListJobsByCursor(ctx, agentID, cursor, pageSize)
}
func (w *Workflow) GetJob(ctx context.Context, id string) (*model.Job, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.GetJob(ctx, id)
}
func (w *Workflow) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*model.Job, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.CreateJob(ctx, userID, videoID, name, config, priority, timeLimit)
}
func (w *Workflow) CancelJob(ctx context.Context, id string) error {
if w == nil || w.jobService == nil {
return ErrJobServiceUnavailable
}
return w.jobService.CancelJob(ctx, id)
}
func (w *Workflow) RetryJob(ctx context.Context, id string) (*model.Job, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.RetryJob(ctx, id)
}
func (w *Workflow) ListDLQ(ctx context.Context, offset, limit int) ([]*dto.DLQEntry, int64, error) {
if w == nil || w.jobService == nil {
return nil, 0, ErrJobServiceUnavailable
}
return w.jobService.ListDLQ(ctx, offset, limit)
}
func (w *Workflow) GetDLQ(ctx context.Context, id string) (*dto.DLQEntry, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.GetDLQ(ctx, id)
}
func (w *Workflow) RetryDLQ(ctx context.Context, id string) (*model.Job, error) {
if w == nil || w.jobService == nil {
return nil, ErrJobServiceUnavailable
}
return w.jobService.RetryDLQ(ctx, id)
}
func (w *Workflow) RemoveDLQ(ctx context.Context, id string) error {
if w == nil || w.jobService == nil {
return ErrJobServiceUnavailable
}
return w.jobService.RemoveDLQ(ctx, id)
}
func buildJobPayload(videoID, userID, videoURL, format string) ([]byte, error) {
return json.Marshal(map[string]any{
"video_id": videoID,
"user_id": userID,
"input_url": videoURL,
"source_url": videoURL,
"format": format,
})
}
func nullableTrimmedString(value *string) *string {
if value == nil {
return nil
}
trimmed := strings.TrimSpace(*value)
if trimmed == "" {
return nil
}
return &trimmed
}
func detectStorageType(rawURL string) string {
if shouldDeleteStoredObject(rawURL) {
return "S3"
}
return "WORKER"
}
func shouldDeleteStoredObject(rawURL string) bool {
trimmed := strings.TrimSpace(rawURL)
if trimmed == "" {
return false
}
parsed, err := url.Parse(trimmed)
if err != nil {
return !strings.HasPrefix(trimmed, "/")
}
return parsed.Scheme == "" && parsed.Host == "" && !strings.HasPrefix(trimmed, "/")
}