Files
stream.api/internal/transport/mqtt/publish_helpers.go
claude 863a0ea2f6 feat: add notification events handling and MQTT integration
- Implemented notification event publishing with a new NotificationEventPublisher interface.
- Created a noopNotificationEventPublisher for testing purposes.
- Added functionality to publish notification created events via MQTT.
- Introduced a new stream event publisher for handling job logs and updates.
- Added database migration for popup_ads table.
- Created tests for notification events and popup ads functionality.
- Established MQTT connection and publishing helpers for event messages.
2026-03-29 15:47:09 +00:00

65 lines
1.5 KiB
Go

package mqtt
import (
"encoding/json"
"fmt"
"time"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"stream.api/pkg/logger"
)
func connectPahoClient(broker, clientID string) (pahomqtt.Client, error) {
opts := pahomqtt.NewClientOptions().
AddBroker(broker).
SetClientID(clientID).
SetAutoReconnect(true).
SetConnectRetry(true).
SetKeepAlive(60 * time.Second).
SetPingTimeout(10 * time.Second).
SetConnectRetryInterval(3 * time.Second)
client := pahomqtt.NewClient(opts)
token := client.Connect()
if ok := token.WaitTimeout(defaultPublishWait); !ok {
return nil, fmt.Errorf("mqtt connect timeout")
}
if err := token.Error(); err != nil {
return nil, err
}
return client, nil
}
func publishMQTTEvent(client pahomqtt.Client, appLogger logger.Logger, prefix string, event mqttEvent) {
if client == nil {
return
}
if err := publishMQTTJSON(client, fmt.Sprintf("%s/events", prefix), event); err != nil {
appLogger.Error("Failed to publish MQTT event", "error", err, "type", event.Type)
}
}
func publishMQTTJSON(client pahomqtt.Client, topic string, payload any) error {
if client == nil {
return nil
}
encoded, err := json.Marshal(payload)
if err != nil {
return err
}
return publishPahoMessage(client, topic, encoded)
}
func publishPahoMessage(client pahomqtt.Client, topic string, payload []byte) error {
if client == nil {
return nil
}
token := client.Publish(topic, 0, false, payload)
if ok := token.WaitTimeout(defaultPublishWait); !ok {
return fmt.Errorf("mqtt publish timeout")
}
return token.Error()
}