package runtime import ( "context" "encoding/json" "fmt" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "stream.api/internal/video/runtime/domain" "stream.api/internal/video/runtime/services" "stream.api/pkg/logger" ) const ( defaultMQTTBrokerURL = "tcp://broker.mqtt-dashboard.com:1883" defaultMQTTPrefix = "picpic" ) type mqttPublisher struct { client mqtt.Client jobService *services.JobService agentRT interface { ListAgentsWithStats() []*services.AgentWithStats } logger logger.Logger prefix string } func newMQTTPublisher(jobService *services.JobService, agentRT interface { ListAgentsWithStats() []*services.AgentWithStats }, appLogger logger.Logger) (*mqttPublisher, error) { opts := mqtt.NewClientOptions() opts.AddBroker(defaultMQTTBrokerURL) opts.SetClientID(fmt.Sprintf("stream-api-%d", time.Now().UnixNano())) opts.SetKeepAlive(60 * time.Second) opts.SetPingTimeout(10 * time.Second) opts.SetAutoReconnect(true) client := mqtt.NewClient(opts) token := client.Connect() if token.Wait() && token.Error() != nil { return nil, token.Error() } return &mqttPublisher{ client: client, jobService: jobService, agentRT: agentRT, logger: appLogger, prefix: defaultMQTTPrefix, }, nil } func (p *mqttPublisher) start(ctx context.Context) { if p == nil || p.jobService == nil { return } go p.consumeLogs(ctx) go p.consumeJobUpdates(ctx) go p.consumeResources(ctx) } func (p *mqttPublisher) consumeLogs(ctx context.Context) { ch, err := p.jobService.SubscribeJobLogs(ctx, "") if err != nil { p.logger.Error("Failed to subscribe job logs for MQTT", "error", err) return } for { select { case <-ctx.Done(): return case entry, ok := <-ch: if !ok { return } payload, _ := json.Marshal(entry) p.publish(fmt.Sprintf("%s/logs/%s", p.prefix, entry.JobID), payload) } } } func (p *mqttPublisher) consumeJobUpdates(ctx context.Context) { ch, err := p.jobService.SubscribeJobUpdates(ctx) if err != nil { p.logger.Error("Failed to subscribe job updates for MQTT", "error", err) return } for { select { case <-ctx.Done(): return case msg, ok := <-ch: if !ok { return } var inner map[string]any if err := json.Unmarshal([]byte(msg), &inner); err != nil { continue } jobID, _ := inner["job_id"].(string) eventPayload, _ := json.Marshal(map[string]any{ "type": "job_update", "payload": inner, }) if jobID != "" { p.publish(fmt.Sprintf("%s/job/%s", p.prefix, jobID), eventPayload) } p.publish(fmt.Sprintf("%s/events", p.prefix), eventPayload) } } } func (p *mqttPublisher) consumeResources(ctx context.Context) { ch, err := p.jobService.SubscribeSystemResources(ctx) if err != nil { p.logger.Error("Failed to subscribe resources for MQTT", "error", err) return } for { select { case <-ctx.Done(): return case entry, ok := <-ch: if !ok { return } resourcePayload, _ := json.Marshal(map[string]any{ "type": "resource_update", "payload": entry, }) p.publish(fmt.Sprintf("%s/events", p.prefix), resourcePayload) if p.agentRT != nil { for _, agent := range p.agentRT.ListAgentsWithStats() { if agent == nil || agent.Agent == nil || agent.ID != entry.AgentID { continue } agentPayload, _ := json.Marshal(map[string]any{ "type": "agent_update", "payload": mapAgentPayload(agent), }) p.publish(fmt.Sprintf("%s/events", p.prefix), agentPayload) break } } } } } func (p *mqttPublisher) publish(topic string, payload []byte) { if p == nil || p.client == nil { return } token := p.client.Publish(topic, 0, false, payload) token.WaitTimeout(5 * time.Second) } func mapAgentPayload(agent *services.AgentWithStats) map[string]any { if agent == nil || agent.Agent == nil { return map[string]any{} } return map[string]any{ "id": agent.ID, "name": agent.Name, "platform": agent.Platform, "backend": agent.Backend, "version": agent.Version, "capacity": agent.Capacity, "status": string(agent.Status), "cpu": agent.CPU, "ram": agent.RAM, "last_heartbeat": agent.LastHeartbeat, "created_at": agent.CreatedAt, "updated_at": agent.UpdatedAt, "active_job_count": agent.ActiveJobCount, } } func publishAgentEvent(client mqtt.Client, appLogger logger.Logger, eventType string, agent *services.AgentWithStats) { if client == nil || agent == nil || agent.Agent == nil { return } payload, err := json.Marshal(map[string]any{ "type": eventType, "payload": mapAgentPayload(agent), }) if err != nil { appLogger.Error("Failed to marshal agent MQTT event", "error", err) return } token := client.Publish(fmt.Sprintf("%s/events", defaultMQTTPrefix), 0, false, payload) token.WaitTimeout(5 * time.Second) } func publishResourceEvent(client mqtt.Client, appLogger logger.Logger, entry domain.SystemResource) { if client == nil { return } payload, err := json.Marshal(map[string]any{ "type": "resource_update", "payload": entry, }) if err != nil { appLogger.Error("Failed to marshal resource MQTT event", "error", err) return } token := client.Publish(fmt.Sprintf("%s/events", defaultMQTTPrefix), 0, false, payload) token.WaitTimeout(5 * time.Second) } type MQTTBootstrap struct{ *mqttPublisher } func NewMQTTBootstrap(jobService *services.JobService, agentRT interface { ListAgentsWithStats() []*services.AgentWithStats }, appLogger logger.Logger) (*MQTTBootstrap, error) { publisher, err := newMQTTPublisher(jobService, agentRT, appLogger) if err != nil { return nil, err } return &MQTTBootstrap{mqttPublisher: publisher}, nil } func (b *MQTTBootstrap) Start(ctx context.Context) { if b == nil || b.mqttPublisher == nil { return } b.mqttPublisher.start(ctx) } func (b *MQTTBootstrap) Client() mqtt.Client { if b == nil || b.mqttPublisher == nil { return nil } return b.client } func PublishAgentMQTTEvent(client mqtt.Client, appLogger logger.Logger, eventType string, agent *services.AgentWithStats) { publishAgentEvent(client, appLogger, eventType, agent) }