feat: add notification events handling and MQTT integration

- Implemented notification event publishing with a new NotificationEventPublisher interface.
- Created a noopNotificationEventPublisher for testing purposes.
- Added functionality to publish notification created events via MQTT.
- Introduced a new stream event publisher for handling job logs and updates.
- Added database migration for popup_ads table.
- Created tests for notification events and popup ads functionality.
- Established MQTT connection and publishing helpers for event messages.
This commit is contained in:
2026-03-29 15:47:09 +00:00
parent a910e6c624
commit 863a0ea2f6
42 changed files with 4606 additions and 576 deletions

View File

@@ -0,0 +1,187 @@
package mqtt
import (
"context"
"encoding/json"
"fmt"
"time"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"stream.api/internal/dto"
"stream.api/internal/service"
"stream.api/pkg/logger"
)
type agentRuntime interface {
ListAgentsWithStats() []*dto.AgentWithStats
}
type streamEventPublisher struct {
client pahomqtt.Client
jobService *service.JobService
agentRT agentRuntime
logger logger.Logger
prefix string
}
func newStreamEventPublisher(jobService *service.JobService, agentRT agentRuntime, appLogger logger.Logger) (*streamEventPublisher, error) {
client, err := connectPahoClient(defaultMQTTBrokerURL, fmt.Sprintf("stream-api-%d", time.Now().UnixNano()))
if err != nil {
return nil, err
}
return &streamEventPublisher{
client: client,
jobService: jobService,
agentRT: agentRT,
logger: appLogger,
prefix: defaultMQTTPrefix,
}, nil
}
func (p *streamEventPublisher) start(ctx context.Context) {
if p == nil || p.jobService == nil {
return
}
go p.consumeLogs(ctx)
go p.consumeJobUpdates(ctx)
go p.consumeResources(ctx)
}
func (p *streamEventPublisher) consumeLogs(ctx context.Context) {
ch, err := p.jobService.SubscribeJobLogs(ctx, "")
if err != nil {
p.logger.Error("Failed to subscribe job logs for MQTT", "error", err)
return
}
for {
select {
case <-ctx.Done():
return
case entry, ok := <-ch:
if !ok {
return
}
if err := publishMQTTJSON(p.client, p.logTopic(entry.JobID), entry); err != nil {
p.logger.Error("Failed to publish MQTT job log", "error", err, "job_id", entry.JobID)
}
}
}
}
func (p *streamEventPublisher) consumeJobUpdates(ctx context.Context) {
ch, err := p.jobService.SubscribeJobUpdates(ctx)
if err != nil {
p.logger.Error("Failed to subscribe job updates for MQTT", "error", err)
return
}
for {
select {
case <-ctx.Done():
return
case msg, ok := <-ch:
if !ok {
return
}
var payload map[string]any
if err := json.Unmarshal([]byte(msg), &payload); err != nil {
p.logger.Error("Failed to decode MQTT job update payload", "error", err)
continue
}
jobID, _ := payload["job_id"].(string)
if err := p.publishEvent("job_update", payload, jobID); err != nil {
p.logger.Error("Failed to publish MQTT job update", "error", err, "job_id", jobID)
}
}
}
}
func (p *streamEventPublisher) consumeResources(ctx context.Context) {
ch, err := p.jobService.SubscribeSystemResources(ctx)
if err != nil {
p.logger.Error("Failed to subscribe resources for MQTT", "error", err)
return
}
for {
select {
case <-ctx.Done():
return
case entry, ok := <-ch:
if !ok {
return
}
if err := p.publishEvent("resource_update", entry, ""); err != nil {
p.logger.Error("Failed to publish MQTT resource update", "error", err, "agent_id", entry.AgentID)
}
agent := p.findAgent(entry.AgentID)
if agent == nil {
continue
}
if err := p.publishEvent("agent_update", mapAgentPayload(agent), ""); err != nil {
p.logger.Error("Failed to publish MQTT agent update", "error", err, "agent_id", entry.AgentID)
}
}
}
}
func (p *streamEventPublisher) findAgent(agentID string) *dto.AgentWithStats {
if p == nil || p.agentRT == nil {
return nil
}
for _, agent := range p.agentRT.ListAgentsWithStats() {
if agent != nil && agent.Agent != nil && agent.Agent.ID == agentID {
return agent
}
}
return nil
}
func (p *streamEventPublisher) publishEvent(eventType string, payload any, jobID string) error {
message := mqttEvent{Type: eventType, Payload: payload}
if jobID != "" {
if err := publishMQTTJSON(p.client, p.jobTopic(jobID), message); err != nil {
return err
}
}
return publishMQTTJSON(p.client, p.eventsTopic(), message)
}
func (p *streamEventPublisher) logTopic(jobID string) string {
return fmt.Sprintf("%s/logs/%s", p.prefix, jobID)
}
func (p *streamEventPublisher) jobTopic(jobID string) string {
return fmt.Sprintf("%s/job/%s", p.prefix, jobID)
}
func (p *streamEventPublisher) eventsTopic() string {
return fmt.Sprintf("%s/events", p.prefix)
}
func mapAgentPayload(agent *dto.AgentWithStats) map[string]any {
if agent == nil || agent.Agent == nil {
return map[string]any{}
}
return map[string]any{
"id": agent.Agent.ID,
"name": agent.Name,
"platform": agent.Platform,
"backend": agent.Backend,
"version": agent.Version,
"capacity": agent.Capacity,
"status": string(agent.Status),
"cpu": agent.CPU,
"ram": agent.RAM,
"last_heartbeat": agent.LastHeartbeat,
"created_at": agent.CreatedAt,
"updated_at": agent.UpdatedAt,
"active_job_count": agent.ActiveJobCount,
}
}