From bb7f7b0bb397ef94450313047e6030044c83ecbe Mon Sep 17 00:00:00 2001 From: lethdat Date: Thu, 26 Mar 2026 00:33:45 +0700 Subject: [PATCH] feat: enhance job and user models with additional fields - Added VideoID, UserID, and TimeLimit fields to the job model. - Removed unused referral fields from the user model. - Updated job creation and update logic to handle new fields. - Refactored job service to work with updated job model. - Replaced cache interface with Redis adapter in service layer. - Introduced a Dead Letter Queue (DLQ) for failed jobs in Redis. - Updated gRPC server to accommodate changes in job handling. - Removed obsolete cache package and related files. --- internal/config/config.go | 12 -- internal/database/model/jobs.gen.go | 5 +- internal/database/query/jobs.gen.go | 14 +- internal/database/query/user.gen.go | 38 +---- internal/middleware/authenticator.go | 6 +- internal/middleware/authenticator_test.go | 1 - internal/service/service_core.go | 7 +- internal/service/service_helpers.go | 43 +++--- internal/service/testdb_setup_test.go | 5 +- internal/transport/grpc/server.go | 14 +- .../runtime/adapters/queue/redis/adapter.go | 48 ++++-- .../video/runtime/adapters/queue/redis/dlq.go | 143 ++++++++++++++++++ internal/video/runtime/grpc/server.go | 6 +- internal/video/runtime/module.go | 75 --------- .../video/runtime/services/job_service.go | 107 ++++--------- internal/video/service.go | 17 ++- pkg/cache/cache.go | 14 -- pkg/cache/interface.go | 1 - pkg/cache/redis.go | 43 ------ 19 files changed, 270 insertions(+), 329 deletions(-) create mode 100644 internal/video/runtime/adapters/queue/redis/dlq.go delete mode 100644 internal/video/runtime/module.go delete mode 100644 pkg/cache/cache.go delete mode 100644 pkg/cache/interface.go delete mode 100644 pkg/cache/redis.go diff --git a/internal/config/config.go b/internal/config/config.go index 2e3304a..7f784bf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,10 +10,8 @@ type Config struct { Server ServerConfig Database DatabaseConfig Redis RedisConfig - JWT JWTConfig Google GoogleConfig Frontend FrontendConfig - CORS CORSConfig Email EmailConfig AWS AWSConfig Render RenderConfig @@ -48,10 +46,6 @@ type RedisConfig struct { DB int } -type JWTConfig struct { - Secret string -} - type GoogleConfig struct { ClientID string `mapstructure:"client_id"` ClientSecret string `mapstructure:"client_secret"` @@ -64,10 +58,6 @@ type FrontendConfig struct { GoogleAuthFinalizePath string `mapstructure:"google_auth_finalize_path"` } -type CORSConfig struct { - AllowOrigins []string `mapstructure:"allow_origins"` -} - type EmailConfig struct { From string // Add SMTP settings here later @@ -86,7 +76,6 @@ func LoadConfig() (*Config, error) { v := viper.New() // Set defaults - v.SetDefault("server.port", "8080") v.SetDefault("server.grpc_port", "9000") v.SetDefault("server.mode", "debug") v.SetDefault("redis.db", 0) @@ -96,7 +85,6 @@ func LoadConfig() (*Config, error) { v.SetDefault("google.state_ttl_minutes", 10) v.SetDefault("frontend.google_auth_finalize_path", "/auth/google/finalize") v.SetDefault("internal.marker", "") - v.SetDefault("cors.allow_origins", []string{"http://localhost:5173", "http://localhost:8080", "http://localhost:8081"}) // Environment variable settings v.SetEnvPrefix("APP") diff --git a/internal/database/model/jobs.gen.go b/internal/database/model/jobs.gen.go index 6c1b29f..b4a2891 100644 --- a/internal/database/model/jobs.gen.go +++ b/internal/database/model/jobs.gen.go @@ -12,7 +12,7 @@ const TableNameJob = "jobs" // Job mapped from table type Job struct { - ID string `gorm:"column:id;type:text;primaryKey" json:"id"` + ID string `gorm:"column:id;type:uuid;primaryKey;default:gen_random_uuid()" json:"id"` Status *string `gorm:"column:status;type:text" json:"status"` Priority *int64 `gorm:"column:priority;type:bigint;index:idx_jobs_priority,priority:1" json:"priority"` InputURL *string `gorm:"column:input_url;type:text" json:"input_url"` @@ -29,6 +29,9 @@ type Job struct { CreatedAt *time.Time `gorm:"column:created_at;type:timestamp with time zone" json:"created_at"` UpdatedAt *time.Time `gorm:"column:updated_at;type:timestamp with time zone" json:"updated_at"` Version *int64 `gorm:"column:version;type:bigint;version" json:"-"` + VideoID *string `gorm:"column:video_id;type:uuid" json:"video_id"` + UserID *string `gorm:"column:user_id;type:uuid" json:"user_id"` + TimeLimit *int64 `gorm:"column:time_limit;type:bigint;default:3600000" json:"time_limit"` } // TableName Job's table name diff --git a/internal/database/query/jobs.gen.go b/internal/database/query/jobs.gen.go index a720580..d8c95a2 100644 --- a/internal/database/query/jobs.gen.go +++ b/internal/database/query/jobs.gen.go @@ -45,6 +45,9 @@ func newJob(db *gorm.DB, opts ...gen.DOOption) job { _job.CreatedAt = field.NewTime(tableName, "created_at") _job.UpdatedAt = field.NewTime(tableName, "updated_at") _job.Version = field.NewInt64(tableName, "version") + _job.VideoID = field.NewString(tableName, "video_id") + _job.UserID = field.NewString(tableName, "user_id") + _job.TimeLimit = field.NewInt64(tableName, "time_limit") _job.fillFieldMap() @@ -72,6 +75,9 @@ type job struct { CreatedAt field.Time UpdatedAt field.Time Version field.Int64 + VideoID field.String + UserID field.String + TimeLimit field.Int64 fieldMap map[string]field.Expr } @@ -105,6 +111,9 @@ func (j *job) updateTableName(table string) *job { j.CreatedAt = field.NewTime(table, "created_at") j.UpdatedAt = field.NewTime(table, "updated_at") j.Version = field.NewInt64(table, "version") + j.VideoID = field.NewString(table, "video_id") + j.UserID = field.NewString(table, "user_id") + j.TimeLimit = field.NewInt64(table, "time_limit") j.fillFieldMap() @@ -129,7 +138,7 @@ func (j *job) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (j *job) fillFieldMap() { - j.fieldMap = make(map[string]field.Expr, 17) + j.fieldMap = make(map[string]field.Expr, 20) j.fieldMap["id"] = j.ID j.fieldMap["status"] = j.Status j.fieldMap["priority"] = j.Priority @@ -147,6 +156,9 @@ func (j *job) fillFieldMap() { j.fieldMap["created_at"] = j.CreatedAt j.fieldMap["updated_at"] = j.UpdatedAt j.fieldMap["version"] = j.Version + j.fieldMap["video_id"] = j.VideoID + j.fieldMap["user_id"] = j.UserID + j.fieldMap["time_limit"] = j.TimeLimit } func (j job) clone(db *gorm.DB) job { diff --git a/internal/database/query/user.gen.go b/internal/database/query/user.gen.go index f524eef..ecb1ea8 100644 --- a/internal/database/query/user.gen.go +++ b/internal/database/query/user.gen.go @@ -37,12 +37,6 @@ func newUser(db *gorm.DB, opts ...gen.DOOption) user { _user.GoogleID = field.NewString(tableName, "google_id") _user.StorageUsed = field.NewInt64(tableName, "storage_used") _user.PlanID = field.NewString(tableName, "plan_id") - _user.ReferredByUserID = field.NewString(tableName, "referred_by_user_id") - _user.ReferralEligible = field.NewBool(tableName, "referral_eligible") - _user.ReferralRewardBps = field.NewInt32(tableName, "referral_reward_bps") - _user.ReferralRewardGrantedAt = field.NewTime(tableName, "referral_reward_granted_at") - _user.ReferralRewardPaymentID = field.NewString(tableName, "referral_reward_payment_id") - _user.ReferralRewardAmount = field.NewFloat64(tableName, "referral_reward_amount") _user.CreatedAt = field.NewTime(tableName, "created_at") _user.UpdatedAt = field.NewTime(tableName, "updated_at") _user.Version = field.NewInt64(tableName, "version") @@ -64,18 +58,12 @@ type user struct { Avatar field.String Role field.String GoogleID field.String - StorageUsed field.Int64 - PlanID field.String - ReferredByUserID field.String - ReferralEligible field.Bool - ReferralRewardBps field.Int32 - ReferralRewardGrantedAt field.Time - ReferralRewardPaymentID field.String - ReferralRewardAmount field.Float64 - CreatedAt field.Time - UpdatedAt field.Time - Version field.Int64 - TelegramID field.String + StorageUsed field.Int64 + PlanID field.String + CreatedAt field.Time + UpdatedAt field.Time + Version field.Int64 + TelegramID field.String fieldMap map[string]field.Expr } @@ -101,12 +89,6 @@ func (u *user) updateTableName(table string) *user { u.GoogleID = field.NewString(table, "google_id") u.StorageUsed = field.NewInt64(table, "storage_used") u.PlanID = field.NewString(table, "plan_id") - u.ReferredByUserID = field.NewString(table, "referred_by_user_id") - u.ReferralEligible = field.NewBool(table, "referral_eligible") - u.ReferralRewardBps = field.NewInt32(table, "referral_reward_bps") - u.ReferralRewardGrantedAt = field.NewTime(table, "referral_reward_granted_at") - u.ReferralRewardPaymentID = field.NewString(table, "referral_reward_payment_id") - u.ReferralRewardAmount = field.NewFloat64(table, "referral_reward_amount") u.CreatedAt = field.NewTime(table, "created_at") u.UpdatedAt = field.NewTime(table, "updated_at") u.Version = field.NewInt64(table, "version") @@ -135,7 +117,7 @@ func (u *user) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (u *user) fillFieldMap() { - u.fieldMap = make(map[string]field.Expr, 19) + u.fieldMap = make(map[string]field.Expr, 13) u.fieldMap["id"] = u.ID u.fieldMap["email"] = u.Email u.fieldMap["password"] = u.Password @@ -145,12 +127,6 @@ func (u *user) fillFieldMap() { u.fieldMap["google_id"] = u.GoogleID u.fieldMap["storage_used"] = u.StorageUsed u.fieldMap["plan_id"] = u.PlanID - u.fieldMap["referred_by_user_id"] = u.ReferredByUserID - u.fieldMap["referral_eligible"] = u.ReferralEligible - u.fieldMap["referral_reward_bps"] = u.ReferralRewardBps - u.fieldMap["referral_reward_granted_at"] = u.ReferralRewardGrantedAt - u.fieldMap["referral_reward_payment_id"] = u.ReferralRewardPaymentID - u.fieldMap["referral_reward_amount"] = u.ReferralRewardAmount u.fieldMap["created_at"] = u.CreatedAt u.fieldMap["updated_at"] = u.UpdatedAt u.fieldMap["version"] = u.Version diff --git a/internal/middleware/authenticator.go b/internal/middleware/authenticator.go index e992136..d4de0da 100644 --- a/internal/middleware/authenticator.go +++ b/internal/middleware/authenticator.go @@ -194,7 +194,7 @@ func (a *Authenticator) maybeCreateSubscriptionReminderPostAuth(ctx context.Cont sentAt := now notification := &model.Notification{ - ID: uuidString(), + ID: uuid.New().String(), UserID: user.ID, Type: "billing.subscription_expiring", Title: "Plan expiring soon", @@ -272,7 +272,3 @@ func mustMarshalAuthJSON(value any) string { } return string(encoded) } - -func uuidString() string { - return uuid.New().String() -} diff --git a/internal/middleware/authenticator_test.go b/internal/middleware/authenticator_test.go index c8a7d09..0eadbaf 100644 --- a/internal/middleware/authenticator_test.go +++ b/internal/middleware/authenticator_test.go @@ -13,7 +13,6 @@ import ( "google.golang.org/grpc/status" "gorm.io/driver/sqlite" "gorm.io/gorm" - _ "modernc.org/sqlite" "stream.api/internal/database/model" "stream.api/internal/database/query" ) diff --git a/internal/service/service_core.go b/internal/service/service_core.go index ad0ca7b..9206c37 100644 --- a/internal/service/service_core.go +++ b/internal/service/service_core.go @@ -11,7 +11,7 @@ import ( "stream.api/internal/database/model" "stream.api/internal/middleware" "stream.api/internal/video" - "stream.api/pkg/cache" + "stream.api/internal/video/runtime/adapters/queue/redis" "stream.api/pkg/logger" "stream.api/pkg/storage" "stream.api/pkg/token" @@ -74,7 +74,7 @@ type appServices struct { logger logger.Logger authenticator *middleware.Authenticator tokenProvider token.Provider - cache cache.Cache + cache *redis.RedisAdapter storageProvider storage.Provider videoService *video.Service agentRuntime video.AgentRuntime @@ -119,7 +119,7 @@ type apiErrorBody struct { Data any `json:"data,omitempty"` } -func NewServices(c cache.Cache, t token.Provider, db *gorm.DB, l logger.Logger, cfg *config.Config, videoService *video.Service, agentRuntime video.AgentRuntime) *Services { +func NewServices(c *redis.RedisAdapter, db *gorm.DB, l logger.Logger, cfg *config.Config, videoService *video.Service, agentRuntime video.AgentRuntime) *Services { var storageProvider storage.Provider if cfg != nil { provider, err := storage.NewS3Provider(cfg) @@ -157,7 +157,6 @@ func NewServices(c cache.Cache, t token.Provider, db *gorm.DB, l logger.Logger, db: db, logger: l, authenticator: middleware.NewAuthenticator(db, l, cfg.Internal.Marker), - tokenProvider: t, cache: c, storageProvider: storageProvider, videoService: videoService, diff --git a/internal/service/service_helpers.go b/internal/service/service_helpers.go index 48f805f..d7af34e 100644 --- a/internal/service/service_helpers.go +++ b/internal/service/service_helpers.go @@ -9,6 +9,7 @@ import ( "fmt" "net/http" "net/url" + "strconv" "strings" "time" @@ -23,7 +24,6 @@ import ( appv1 "stream.api/internal/api/proto/app/v1" "stream.api/internal/database/model" "stream.api/internal/middleware" - "stream.api/internal/video/runtime/domain" "stream.api/internal/video/runtime/services" ) @@ -52,31 +52,32 @@ func adminPageLimitOffset(pageValue int32, limitValue int32) (int32, int32, int) offset := int((page - 1) * limit) return page, limit, offset } -func buildAdminJob(job *domain.Job) *appv1.AdminJob { +func buildAdminJob(job *model.Job) *appv1.AdminJob { if job == nil { return nil } + agentID := strconv.FormatInt(*job.AgentID, 10) return &appv1.AdminJob{ Id: job.ID, - Status: string(job.Status), - Priority: int32(job.Priority), - UserId: job.UserID, - Name: job.Name, - TimeLimit: job.TimeLimit, - InputUrl: job.InputURL, - OutputUrl: job.OutputURL, - TotalDuration: job.TotalDuration, - CurrentTime: job.CurrentTime, - Progress: job.Progress, - AgentId: job.AgentID, - Logs: job.Logs, - Config: job.Config, - Cancelled: job.Cancelled, - RetryCount: int32(job.RetryCount), - MaxRetries: int32(job.MaxRetries), - CreatedAt: timestamppb.New(job.CreatedAt), - UpdatedAt: timestamppb.New(job.UpdatedAt), - VideoId: stringPointerOrNil(job.VideoID), + Status: string(*job.Status), + Priority: int32(*job.Priority), + UserId: *job.UserID, + Name: job.ID, + TimeLimit: *job.TimeLimit, + InputUrl: *job.InputURL, + OutputUrl: *job.OutputURL, + TotalDuration: *job.TotalDuration, + CurrentTime: *job.CurrentTime, + Progress: *job.Progress, + AgentId: &agentID, + Logs: *job.Logs, + Config: *job.Config, + Cancelled: *job.Cancelled, + RetryCount: int32(*job.RetryCount), + MaxRetries: int32(*job.MaxRetries), + CreatedAt: timestamppb.New(*job.CreatedAt), + UpdatedAt: timestamppb.New(*job.UpdatedAt), + VideoId: stringPointerOrNil(*job.VideoID), } } func buildAdminAgent(agent *services.AgentWithStats) *appv1.AdminAgent { diff --git a/internal/service/testdb_setup_test.go b/internal/service/testdb_setup_test.go index 881f099..7cb6c96 100644 --- a/internal/service/testdb_setup_test.go +++ b/internal/service/testdb_setup_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + goredis "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" @@ -19,7 +20,6 @@ import ( "stream.api/internal/database/model" "stream.api/internal/database/query" "stream.api/internal/middleware" - "stream.api/pkg/cache" "stream.api/pkg/logger" "stream.api/pkg/token" ) @@ -88,8 +88,7 @@ func (fakeTokenProvider) ParseMapToken(tokenString string) (map[string]interface return map[string]interface{}{"token": tokenString}, nil } -var _ cache.Cache = (*fakeCache)(nil) -var _ token.Provider = fakeTokenProvider{} +var _ goredis.Client = (*fakeCache)(nil) func newTestDB(t *testing.T) *gorm.DB { t.Helper() diff --git a/internal/transport/grpc/server.go b/internal/transport/grpc/server.go index 7c76dff..293e9c3 100644 --- a/internal/transport/grpc/server.go +++ b/internal/transport/grpc/server.go @@ -13,9 +13,7 @@ import ( redisadapter "stream.api/internal/video/runtime/adapters/queue/redis" runtimegrpc "stream.api/internal/video/runtime/grpc" "stream.api/internal/video/runtime/services" - "stream.api/pkg/cache" "stream.api/pkg/logger" - "stream.api/pkg/token" ) type GRPCModule struct { @@ -27,13 +25,9 @@ type GRPCModule struct { cfg *config.Config } -func NewGRPCModule(ctx context.Context, cfg *config.Config, db *gorm.DB, cacheClient cache.Cache, tokenProvider token.Provider, appLogger logger.Logger) (*GRPCModule, error) { - adapter, err := redisadapter.NewAdapter(cfg.Redis.Addr, cfg.Redis.Password, cfg.Redis.DB) - if err != nil { - return nil, err - } - jobService := services.NewJobService(adapter, adapter) - healthService := services.NewHealthService(db, adapter.Client(), cfg.Render.ServiceName) +func NewGRPCModule(ctx context.Context, cfg *config.Config, db *gorm.DB, rds *redisadapter.RedisAdapter, appLogger logger.Logger) (*GRPCModule, error) { + jobService := services.NewJobService(rds, rds) + healthService := services.NewHealthService(db, rds.Client(), cfg.Render.ServiceName) agentRuntime := runtimegrpc.NewServer(jobService, cfg.Render.AgentSecret) videoService := video.NewService(db, jobService) grpcServer := grpcpkg.NewServer() @@ -56,7 +50,7 @@ func NewGRPCModule(ctx context.Context, cfg *config.Config, db *gorm.DB, cacheCl } agentRuntime.Register(grpcServer) - service.Register(grpcServer, service.NewServices(cacheClient, tokenProvider, db, appLogger, cfg, videoService, agentRuntime)) + service.Register(grpcServer, service.NewServices(rds, db, appLogger, cfg, videoService, agentRuntime)) if module.mqttPublisher != nil { module.mqttPublisher.Start(ctx) } diff --git a/internal/video/runtime/adapters/queue/redis/adapter.go b/internal/video/runtime/adapters/queue/redis/adapter.go index 26105a9..b634978 100644 --- a/internal/video/runtime/adapters/queue/redis/adapter.go +++ b/internal/video/runtime/adapters/queue/redis/adapter.go @@ -7,6 +7,7 @@ import ( "time" goredis "github.com/redis/go-redis/v9" + "stream.api/internal/database/model" "stream.api/internal/video/runtime/domain" ) @@ -17,29 +18,29 @@ const ( JobUpdateChannel = "render:jobs:updates" ) -type Adapter struct{ client *goredis.Client } +type RedisAdapter struct{ client *goredis.Client } -func NewAdapter(addr, password string, db int) (*Adapter, error) { +func NewAdapter(addr, password string, db int) (*RedisAdapter, error) { client := goredis.NewClient(&goredis.Options{Addr: addr, Password: password, DB: db}) if err := client.Ping(context.Background()).Err(); err != nil { return nil, err } - return &Adapter{client: client}, nil + return &RedisAdapter{client: client}, nil } -func (r *Adapter) Client() *goredis.Client { return r.client } +func (r *RedisAdapter) Client() *goredis.Client { return r.client } -func (r *Adapter) Enqueue(ctx context.Context, job *domain.Job) error { +func (r *RedisAdapter) Enqueue(ctx context.Context, job *model.Job) error { data, err := json.Marshal(job) if err != nil { return err } timestamp := time.Now().UnixNano() - score := float64(-(int64(job.Priority) * 1000000000) - timestamp) + score := float64(-(int64(*job.Priority) * 1000000000) - timestamp) return r.client.ZAdd(ctx, JobQueueKey, goredis.Z{Score: score, Member: data}).Err() } -func (r *Adapter) Dequeue(ctx context.Context) (*domain.Job, error) { +func (r *RedisAdapter) Dequeue(ctx context.Context) (*model.Job, error) { for { if ctx.Err() != nil { return nil, ctx.Err() @@ -68,7 +69,7 @@ func (r *Adapter) Dequeue(ctx context.Context) (*domain.Job, error) { default: return nil, fmt.Errorf("unexpected redis queue payload type %T", member) } - var job domain.Job + var job model.Job if err := json.Unmarshal(raw, &job); err != nil { return nil, err } @@ -76,7 +77,7 @@ func (r *Adapter) Dequeue(ctx context.Context) (*domain.Job, error) { } } -func (r *Adapter) Publish(ctx context.Context, jobID string, logLine string, progress float64) error { +func (r *RedisAdapter) Publish(ctx context.Context, jobID string, logLine string, progress float64) error { payload, err := json.Marshal(domain.LogEntry{JobID: jobID, Line: logLine, Progress: progress}) if err != nil { return err @@ -84,7 +85,7 @@ func (r *Adapter) Publish(ctx context.Context, jobID string, logLine string, pro return r.client.Publish(ctx, LogChannel, payload).Err() } -func (r *Adapter) Subscribe(ctx context.Context, jobID string) (<-chan domain.LogEntry, error) { +func (r *RedisAdapter) Subscribe(ctx context.Context, jobID string) (<-chan domain.LogEntry, error) { pubsub := r.client.Subscribe(ctx, LogChannel) ch := make(chan domain.LogEntry) go func() { @@ -103,7 +104,7 @@ func (r *Adapter) Subscribe(ctx context.Context, jobID string) (<-chan domain.Lo return ch, nil } -func (r *Adapter) PublishResource(ctx context.Context, agentID string, data []byte) error { +func (r *RedisAdapter) PublishResource(ctx context.Context, agentID string, data []byte) error { var decoded struct { CPU float64 `json:"cpu"` RAM float64 `json:"ram"` @@ -118,7 +119,7 @@ func (r *Adapter) PublishResource(ctx context.Context, agentID string, data []by return r.client.Publish(ctx, ResourceChannel, payload).Err() } -func (r *Adapter) SubscribeResources(ctx context.Context) (<-chan domain.SystemResource, error) { +func (r *RedisAdapter) SubscribeResources(ctx context.Context) (<-chan domain.SystemResource, error) { pubsub := r.client.Subscribe(ctx, ResourceChannel) ch := make(chan domain.SystemResource) go func() { @@ -135,11 +136,11 @@ func (r *Adapter) SubscribeResources(ctx context.Context) (<-chan domain.SystemR return ch, nil } -func (r *Adapter) PublishCancel(ctx context.Context, agentID string, jobID string) error { +func (r *RedisAdapter) PublishCancel(ctx context.Context, agentID string, jobID string) error { return r.client.Publish(ctx, fmt.Sprintf("render:agents:%s:cancel", agentID), jobID).Err() } -func (r *Adapter) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) { +func (r *RedisAdapter) SubscribeCancel(ctx context.Context, agentID string) (<-chan string, error) { pubsub := r.client.Subscribe(ctx, fmt.Sprintf("render:agents:%s:cancel", agentID)) ch := make(chan string) go func() { @@ -152,7 +153,7 @@ func (r *Adapter) SubscribeCancel(ctx context.Context, agentID string) (<-chan s return ch, nil } -func (r *Adapter) PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error { +func (r *RedisAdapter) PublishJobUpdate(ctx context.Context, jobID string, status string, videoID string) error { payload, err := json.Marshal(map[string]string{"job_id": jobID, "status": status, "video_id": videoID}) if err != nil { return err @@ -160,7 +161,7 @@ func (r *Adapter) PublishJobUpdate(ctx context.Context, jobID string, status str return r.client.Publish(ctx, JobUpdateChannel, payload).Err() } -func (r *Adapter) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) { +func (r *RedisAdapter) SubscribeJobUpdates(ctx context.Context) (<-chan string, error) { pubsub := r.client.Subscribe(ctx, JobUpdateChannel) ch := make(chan string) go func() { @@ -172,3 +173,18 @@ func (r *Adapter) SubscribeJobUpdates(ctx context.Context) (<-chan string, error }() return ch, nil } +func (c *RedisAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return c.client.Set(ctx, key, value, expiration).Err() +} + +func (c *RedisAdapter) Get(ctx context.Context, key string) (string, error) { + return c.client.Get(ctx, key).Result() +} + +func (c *RedisAdapter) Del(ctx context.Context, key string) error { + return c.client.Del(ctx, key).Err() +} + +func (c *RedisAdapter) Close() error { + return c.client.Close() +} diff --git a/internal/video/runtime/adapters/queue/redis/dlq.go b/internal/video/runtime/adapters/queue/redis/dlq.go new file mode 100644 index 0000000..fb2c8a6 --- /dev/null +++ b/internal/video/runtime/adapters/queue/redis/dlq.go @@ -0,0 +1,143 @@ +package redis + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/redis/go-redis/v9" + "stream.api/internal/database/model" +) + +const ( + dlqKey = "picpic:dlq" + dlqMetaPrefix = "picpic:dlq:meta:" +) + +type DeadLetterQueue struct { + client *redis.Client +} + +type DLQEntry struct { + Job *model.Job `json:"job"` + FailureTime time.Time `json:"failure_time"` + Reason string `json:"reason"` + RetryCount int64 `json:"retry_count"` +} + +func NewDeadLetterQueue(client *redis.Client) *DeadLetterQueue { + return &DeadLetterQueue{client: client} +} + +// Add adds a failed job to the DLQ +func (dlq *DeadLetterQueue) Add(ctx context.Context, job *model.Job, reason string) error { + entry := DLQEntry{ + Job: job, + FailureTime: time.Now(), + Reason: reason, + RetryCount: *job.RetryCount, + } + + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("failed to marshal DLQ entry: %w", err) + } + + // Add to sorted set with timestamp as score + score := float64(time.Now().Unix()) + if err := dlq.client.ZAdd(ctx, dlqKey, redis.Z{ + Score: score, + Member: job.ID, + }).Err(); err != nil { + return fmt.Errorf("failed to add to DLQ: %w", err) + } + + // Store metadata + metaKey := dlqMetaPrefix + job.ID + if err := dlq.client.Set(ctx, metaKey, data, 0).Err(); err != nil { + return fmt.Errorf("failed to store DLQ metadata: %w", err) + } + + return nil +} + +// Get retrieves a job from the DLQ +func (dlq *DeadLetterQueue) Get(ctx context.Context, jobID string) (*DLQEntry, error) { + metaKey := dlqMetaPrefix + jobID + data, err := dlq.client.Get(ctx, metaKey).Bytes() + if err != nil { + if err == redis.Nil { + return nil, fmt.Errorf("job not found in DLQ") + } + return nil, fmt.Errorf("failed to get DLQ entry: %w", err) + } + + var entry DLQEntry + if err := json.Unmarshal(data, &entry); err != nil { + return nil, fmt.Errorf("failed to unmarshal DLQ entry: %w", err) + } + + return &entry, nil +} + +// List returns all jobs in the DLQ +func (dlq *DeadLetterQueue) List(ctx context.Context, offset, limit int64) ([]*DLQEntry, error) { + // Get job IDs from sorted set (newest first) + jobIDs, err := dlq.client.ZRevRange(ctx, dlqKey, offset, offset+limit-1).Result() + if err != nil { + return nil, fmt.Errorf("failed to list DLQ: %w", err) + } + + entries := make([]*DLQEntry, 0, len(jobIDs)) + for _, jobID := range jobIDs { + entry, err := dlq.Get(ctx, jobID) + if err != nil { + // Skip if metadata not found + continue + } + entries = append(entries, entry) + } + + return entries, nil +} + +// Remove removes a job from the DLQ +func (dlq *DeadLetterQueue) Remove(ctx context.Context, jobID string) error { + // Remove from sorted set + if err := dlq.client.ZRem(ctx, dlqKey, jobID).Err(); err != nil { + return fmt.Errorf("failed to remove from DLQ: %w", err) + } + + // Remove metadata + metaKey := dlqMetaPrefix + jobID + if err := dlq.client.Del(ctx, metaKey).Err(); err != nil { + return fmt.Errorf("failed to remove DLQ metadata: %w", err) + } + + return nil +} + +// Count returns the total number of jobs in the DLQ +func (dlq *DeadLetterQueue) Count(ctx context.Context) (int64, error) { + count, err := dlq.client.ZCard(ctx, dlqKey).Result() + if err != nil { + return 0, fmt.Errorf("failed to count DLQ: %w", err) + } + return count, nil +} + +// Retry removes a job from DLQ and returns it for retry +func (dlq *DeadLetterQueue) Retry(ctx context.Context, jobID string) (*model.Job, error) { + entry, err := dlq.Get(ctx, jobID) + if err != nil { + return nil, err + } + + // Remove from DLQ + if err := dlq.Remove(ctx, jobID); err != nil { + return nil, err + } + + return entry.Job, nil +} diff --git a/internal/video/runtime/grpc/server.go b/internal/video/runtime/grpc/server.go index 00ea39b..4c1917f 100644 --- a/internal/video/runtime/grpc/server.go +++ b/internal/video/runtime/grpc/server.go @@ -141,7 +141,7 @@ func (s *Server) StreamJobs(_ *proto.StreamOptions, stream grpcpkg.ServerStreami continue } var config map[string]any - if err := json.Unmarshal([]byte(job.Config), &config); err != nil { + if err := json.Unmarshal([]byte(*job.Config), &config); err != nil { _ = s.jobService.UpdateJobStatus(ctx, job.ID, domain.JobStatusFailure) s.untrackJobAssignment(agentID, job.ID) continue @@ -163,7 +163,9 @@ func (s *Server) StreamJobs(_ *proto.StreamOptions, stream grpcpkg.ServerStreami } } payload, _ := json.Marshal(map[string]any{"image": image, "commands": commands, "environment": map[string]string{}}) - if err := stream.Send(&proto.Workflow{Id: job.ID, Timeout: job.TimeLimit, Payload: payload}); err != nil { + // Sau này xem xét có cần cho job.TimeLimit vào db không? + // Hiện tại để đơn giản thì cứ để mặc định timeout 1h, nếu job nào cần timeout ngắn hơn thì tự lo trong commands của nó + if err := stream.Send(&proto.Workflow{Id: job.ID, Timeout: 60 * 60 * 1000, Payload: payload}); err != nil { _ = s.jobService.UpdateJobStatus(ctx, job.ID, domain.JobStatusPending) s.untrackJobAssignment(agentID, job.ID) return err diff --git a/internal/video/runtime/module.go b/internal/video/runtime/module.go deleted file mode 100644 index bcdb7a6..0000000 --- a/internal/video/runtime/module.go +++ /dev/null @@ -1,75 +0,0 @@ -package runtime - -import ( - "context" - "fmt" - "net" - - grpcpkg "google.golang.org/grpc" - "gorm.io/gorm" - "stream.api/internal/config" - apprpc "stream.api/internal/service" - "stream.api/internal/video" - redisadapter "stream.api/internal/video/runtime/adapters/queue/redis" - runtimegrpc "stream.api/internal/video/runtime/grpc" - "stream.api/internal/video/runtime/services" - "stream.api/pkg/cache" - "stream.api/pkg/logger" - "stream.api/pkg/token" -) - -type Module struct { - ctx context.Context - cfg *config.Config - jobService *services.JobService - healthService *services.HealthService - grpcServer *runtimegrpc.Server - mqttPublisher *mqttPublisher - grpcRaw *grpcpkg.Server -} - -func NewModule(ctx context.Context, cfg *config.Config, db *gorm.DB, cacheClient cache.Cache, tokenProvider token.Provider, appLogger logger.Logger) (*Module, error) { - adapter, err := redisadapter.NewAdapter(cfg.Redis.Addr, cfg.Redis.Password, cfg.Redis.DB) - if err != nil { - return nil, err - } - jobService := services.NewJobService(adapter, adapter) - healthService := services.NewHealthService(db, adapter.Client(), cfg.Render.ServiceName) - grpcServer := runtimegrpc.NewServer(jobService, cfg.Render.AgentSecret) - module := &Module{ctx: ctx, cfg: cfg, jobService: jobService, healthService: healthService, grpcServer: grpcServer, grpcRaw: grpcpkg.NewServer()} - if publisher, err := NewMQTTBootstrap(jobService, grpcServer, appLogger); err != nil { - appLogger.Error("Failed to initialize MQTT publisher", "error", err) - } else { - module.mqttPublisher = publisher.mqttPublisher - grpcServer.SetAgentEventHandler(func(eventType string, agent *services.AgentWithStats) { - PublishAgentMQTTEvent(publisher.Client(), appLogger, eventType, agent) - }) - } - videoService := video.NewService(db, jobService) - grpcServer.Register(module.grpcRaw) - apprpc.Register(module.grpcRaw, apprpc.NewServices(cacheClient, tokenProvider, db, appLogger, cfg, videoService, grpcServer)) - if module.mqttPublisher != nil { - module.mqttPublisher.start(ctx) - } - return module, nil -} - -func (m *Module) JobService() *services.JobService { return m.jobService } - -func (m *Module) AgentRuntime() *runtimegrpc.Server { return m.grpcServer } - -func (m *Module) GRPCServer() *grpcpkg.Server { return m.grpcRaw } - -func (m *Module) GRPCAddress() string { - return fmt.Sprintf(":%s", m.cfg.Server.GRPCPort) -} - -func (m *Module) ServeGRPC(listener net.Listener) error { - return m.grpcRaw.Serve(listener) -} - -func (m *Module) Shutdown() { - if m.grpcRaw != nil { - m.grpcRaw.GracefulStop() - } -} diff --git a/internal/video/runtime/services/job_service.go b/internal/video/runtime/services/job_service.go index 8aa3f8c..562ac02 100644 --- a/internal/video/runtime/services/job_service.go +++ b/internal/video/runtime/services/job_service.go @@ -17,8 +17,8 @@ import ( ) type JobQueue interface { - Enqueue(ctx context.Context, job *domain.Job) error - Dequeue(ctx context.Context) (*domain.Job, error) + Enqueue(ctx context.Context, job *model.Job) error + Dequeue(ctx context.Context) (*model.Job, error) } type LogPubSub interface { @@ -50,13 +50,13 @@ const ( ) type PaginatedJobs struct { - Jobs []*domain.Job `json:"jobs"` - Total int64 `json:"total"` - Offset int `json:"offset"` - Limit int `json:"limit"` - HasMore bool `json:"has_more"` - NextCursor string `json:"next_cursor,omitempty"` - PageSize int `json:"page_size"` + Jobs []*model.Job `json:"jobs"` + Total int64 `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` + HasMore bool `json:"has_more"` + NextCursor string `json:"next_cursor,omitempty"` + PageSize int `json:"page_size"` } type jobListCursor struct { @@ -169,7 +169,7 @@ func listJobsByOffset(ctx context.Context, agentID string, offset, limit int) (* if agentID != "" { agentNumeric, err := strconv.ParseInt(agentID, 10, 64) if err != nil { - return &PaginatedJobs{Jobs: []*domain.Job{}, Total: 0, Offset: offset, Limit: limit, PageSize: limit, HasMore: false}, nil + return &PaginatedJobs{Jobs: []*model.Job{}, Total: 0, Offset: offset, Limit: limit, PageSize: limit, HasMore: false}, nil } q = q.Where(query.Job.AgentID.Eq(agentNumeric)) } @@ -177,69 +177,14 @@ func listJobsByOffset(ctx context.Context, agentID string, offset, limit int) (* if err != nil { return nil, err } - items := make([]*domain.Job, 0, len(jobs)) + items := make([]*model.Job, 0, len(jobs)) for _, job := range jobs { - items = append(items, toDomainJob(job)) + items = append(items, job) } return &PaginatedJobs{Jobs: items, Total: total, Offset: offset, Limit: limit, PageSize: limit, HasMore: offset+len(items) < int(total)}, nil } -func toDomainJob(job *model.Job) *domain.Job { - if job == nil { - return nil - } - cfg := parseJobConfig(job.Config) - result := &domain.Job{ID: job.ID, Name: cfg.Name, UserID: cfg.UserID, VideoID: cfg.VideoID, TimeLimit: cfg.TimeLimit} - if job.Status != nil { - result.Status = domain.JobStatus(*job.Status) - } - if job.Priority != nil { - result.Priority = int(*job.Priority) - } - if job.InputURL != nil { - result.InputURL = *job.InputURL - } - if job.OutputURL != nil { - result.OutputURL = *job.OutputURL - } - if job.TotalDuration != nil { - result.TotalDuration = *job.TotalDuration - } - if job.CurrentTime != nil { - result.CurrentTime = *job.CurrentTime - } - if job.Progress != nil { - result.Progress = *job.Progress - } - if job.AgentID != nil { - agentID := strconv.FormatInt(*job.AgentID, 10) - result.AgentID = &agentID - } - if job.Logs != nil { - result.Logs = *job.Logs - } - if job.Config != nil { - result.Config = *job.Config - } - if job.Cancelled != nil { - result.Cancelled = *job.Cancelled - } - if job.RetryCount != nil { - result.RetryCount = int(*job.RetryCount) - } - if job.MaxRetries != nil { - result.MaxRetries = int(*job.MaxRetries) - } - if job.CreatedAt != nil { - result.CreatedAt = *job.CreatedAt - } - if job.UpdatedAt != nil { - result.UpdatedAt = *job.UpdatedAt - } - return result -} - -func (s *JobService) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*domain.Job, error) { +func (s *JobService) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*model.Job, error) { status := string(domain.JobStatusPending) now := time.Now() job := &model.Job{ @@ -259,12 +204,12 @@ func (s *JobService) CreateJob(ctx context.Context, userID string, videoID strin if err := syncVideoStatus(ctx, videoID, domain.JobStatusPending); err != nil { return nil, err } - domainJob := toDomainJob(job) - if err := s.queue.Enqueue(ctx, domainJob); err != nil { + // domainJob := toDomainJob(job) + if err := s.queue.Enqueue(ctx, job); err != nil { return nil, err } _ = s.pubsub.PublishJobUpdate(ctx, job.ID, status, videoID) - return domainJob, nil + return job, nil } func (s *JobService) ListJobs(ctx context.Context, offset, limit int) (*PaginatedJobs, error) { @@ -291,7 +236,7 @@ func (s *JobService) ListJobsByCursor(ctx context.Context, agentID string, curso if agentID != "" { agentNumeric, err := strconv.ParseInt(agentID, 10, 64) if err != nil { - return &PaginatedJobs{Jobs: []*domain.Job{}, Total: 0, Limit: pageSize, PageSize: pageSize, HasMore: false}, nil + return &PaginatedJobs{Jobs: []*model.Job{}, Total: 0, Limit: pageSize, PageSize: pageSize, HasMore: false}, nil } q = q.Where(query.Job.AgentID.Eq(agentNumeric)) } @@ -315,9 +260,9 @@ func (s *JobService) ListJobsByCursor(ctx context.Context, agentID string, curso jobs = jobs[:pageSize] } - items := make([]*domain.Job, 0, len(jobs)) + items := make([]*model.Job, 0, len(jobs)) for _, job := range jobs { - items = append(items, toDomainJob(job)) + items = append(items, job) } nextCursor := "" @@ -338,15 +283,15 @@ func (s *JobService) ListJobsByCursor(ctx context.Context, agentID string, curso }, nil } -func (s *JobService) GetJob(ctx context.Context, id string) (*domain.Job, error) { +func (s *JobService) GetJob(ctx context.Context, id string) (*model.Job, error) { job, err := query.Job.WithContext(ctx).Where(query.Job.ID.Eq(id)).First() if err != nil { return nil, err } - return toDomainJob(job), nil + return job, nil } -func (s *JobService) GetNextJob(ctx context.Context) (*domain.Job, error) { +func (s *JobService) GetNextJob(ctx context.Context) (*model.Job, error) { return s.queue.Dequeue(ctx) } func (s *JobService) SubscribeSystemResources(ctx context.Context) (<-chan domain.SystemResource, error) { @@ -436,7 +381,7 @@ func (s *JobService) CancelJob(ctx context.Context, jobID string) error { return s.pubsub.Publish(ctx, jobID, "[SYSTEM] Job cancelled by admin", -1) } -func (s *JobService) RetryJob(ctx context.Context, jobID string) (*domain.Job, error) { +func (s *JobService) RetryJob(ctx context.Context, jobID string) (*model.Job, error) { job, err := query.Job.WithContext(ctx).Where(query.Job.ID.Eq(jobID)).First() if err != nil { return nil, fmt.Errorf("job not found: %w", err) @@ -476,12 +421,12 @@ func (s *JobService) RetryJob(ctx context.Context, jobID string) (*domain.Job, e if err := syncVideoStatus(ctx, cfg.VideoID, domain.JobStatusPending); err != nil { return nil, err } - domainJob := toDomainJob(job) - if err := s.queue.Enqueue(ctx, domainJob); err != nil { + // domainJob := toDomainJob(job) + if err := s.queue.Enqueue(ctx, job); err != nil { return nil, err } _ = s.pubsub.PublishJobUpdate(ctx, jobID, pending, cfg.VideoID) - return domainJob, nil + return job, nil } func (s *JobService) UpdateJobProgress(ctx context.Context, jobID string, progress float64) error { diff --git a/internal/video/service.go b/internal/video/service.go index b56c463..0055ea7 100644 --- a/internal/video/service.go +++ b/internal/video/service.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "gorm.io/gorm" "stream.api/internal/database/model" + "stream.api/internal/video/runtime/services" ) var ( @@ -20,7 +21,7 @@ var ( type Service struct { db *gorm.DB - jobService JobService + jobService *services.JobService } type CreateVideoInput struct { @@ -36,14 +37,14 @@ type CreateVideoInput struct { type CreateVideoResult struct { Video *model.Video - Job *Job + Job model.Job } -func NewService(db *gorm.DB, jobService JobService) *Service { +func NewService(db *gorm.DB, jobService *services.JobService) *Service { return &Service{db: db, jobService: jobService} } -func (s *Service) JobService() JobService { +func (s *Service) JobService() *services.JobService { if s == nil { return nil } @@ -118,7 +119,7 @@ func (s *Service) CreateVideo(ctx context.Context, input CreateVideoInput) (*Cre return nil, err } - return &CreateVideoResult{Video: video, Job: job}, nil + return &CreateVideoResult{Video: video, Job: *job}, nil } func (s *Service) ListJobs(ctx context.Context, offset, limit int) (*PaginatedJobs, error) { @@ -142,14 +143,14 @@ func (s *Service) ListJobsByCursor(ctx context.Context, agentID string, cursor s return s.jobService.ListJobsByCursor(ctx, agentID, cursor, pageSize) } -func (s *Service) GetJob(ctx context.Context, id string) (*Job, error) { +func (s *Service) GetJob(ctx context.Context, id string) (*model.Job, error) { if s == nil || s.jobService == nil { return nil, ErrJobServiceUnavailable } return s.jobService.GetJob(ctx, id) } -func (s *Service) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*Job, error) { +func (s *Service) CreateJob(ctx context.Context, userID string, videoID string, name string, config []byte, priority int, timeLimit int64) (*model.Job, error) { if s == nil || s.jobService == nil { return nil, ErrJobServiceUnavailable } @@ -163,7 +164,7 @@ func (s *Service) CancelJob(ctx context.Context, id string) error { return s.jobService.CancelJob(ctx, id) } -func (s *Service) RetryJob(ctx context.Context, id string) (*Job, error) { +func (s *Service) RetryJob(ctx context.Context, id string) (*model.Job, error) { if s == nil || s.jobService == nil { return nil, ErrJobServiceUnavailable } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go deleted file mode 100644 index c028bc0..0000000 --- a/pkg/cache/cache.go +++ /dev/null @@ -1,14 +0,0 @@ -package cache - -import ( - "context" - "time" -) - -// Cache defines the interface for caching operations -type Cache interface { - Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error - Get(ctx context.Context, key string) (string, error) - Del(ctx context.Context, key string) error - Close() error -} diff --git a/pkg/cache/interface.go b/pkg/cache/interface.go deleted file mode 100644 index 08bf029..0000000 --- a/pkg/cache/interface.go +++ /dev/null @@ -1 +0,0 @@ -package cache diff --git a/pkg/cache/redis.go b/pkg/cache/redis.go deleted file mode 100644 index e595e6b..0000000 --- a/pkg/cache/redis.go +++ /dev/null @@ -1,43 +0,0 @@ -package cache - -import ( - "context" - "time" - - "github.com/redis/go-redis/v9" -) - -type redisCache struct { - client *redis.Client -} - -// NewRedisCache creates a new instance of Redis cache implementing Cache interface -func NewRedisCache(addr, password string, db int) (Cache, error) { - rdb := redis.NewClient(&redis.Options{ - Addr: addr, - Password: password, - DB: db, - }) - - if err := rdb.Ping(context.Background()).Err(); err != nil { - return nil, err - } - - return &redisCache{client: rdb}, nil -} - -func (c *redisCache) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { - return c.client.Set(ctx, key, value, expiration).Err() -} - -func (c *redisCache) Get(ctx context.Context, key string) (string, error) { - return c.client.Get(ctx, key).Result() -} - -func (c *redisCache) Del(ctx context.Context, key string) error { - return c.client.Del(ctx, key).Err() -} - -func (c *redisCache) Close() error { - return c.client.Close() -}