mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-04 06:53:06 +00:00
feat: option to emit event to bus upon message sent (#1261)
This commit is contained in:
parent
0c594b3140
commit
6550ff35bc
@ -6,6 +6,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/libp2p/go-libp2p/core/event"
|
||||||
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
@ -53,6 +55,12 @@ type MessageSender struct {
|
|||||||
messageSentCheck ISentCheck
|
messageSentCheck ISentCheck
|
||||||
rateLimiter *PublishRateLimiter
|
rateLimiter *PublishRateLimiter
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
evtMessageSent event.Emitter
|
||||||
|
}
|
||||||
|
|
||||||
|
type MessageSent struct {
|
||||||
|
Size uint32 // Size of payload in bytes
|
||||||
|
Timestamp int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type Request struct {
|
type Request struct {
|
||||||
@ -96,6 +104,15 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess
|
|||||||
return ms
|
return ms
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender {
|
||||||
|
evtMessageSent, err := host.EventBus().Emitter(new(MessageSent))
|
||||||
|
if err != nil {
|
||||||
|
ms.logger.Error("failed to create message sent emitter", zap.Error(err))
|
||||||
|
}
|
||||||
|
ms.evtMessageSent = evtMessageSent
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
|
||||||
func (ms *MessageSender) Send(req *Request) error {
|
func (ms *MessageSender) Send(req *Request) error {
|
||||||
logger := ms.logger.With(
|
logger := ms.logger.With(
|
||||||
zap.Stringer("envelopeHash", req.envelope.Hash()),
|
zap.Stringer("envelopeHash", req.envelope.Hash()),
|
||||||
@ -149,6 +166,16 @@ func (ms *MessageSender) Send(req *Request) error {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ms.evtMessageSent != nil {
|
||||||
|
err := ms.evtMessageSent.Emit(MessageSent{
|
||||||
|
Size: uint32(len(req.envelope.Message().Payload)),
|
||||||
|
Timestamp: req.envelope.Message().GetTimestamp(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to emit message sent event", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package publish
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -129,3 +130,46 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) {
|
|||||||
|
|
||||||
return host, relay
|
return host, relay
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMessageSentEmitter(t *testing.T) {
|
||||||
|
host, relayNode := createRelayNode(t)
|
||||||
|
err := relayNode.Start(context.Background())
|
||||||
|
require.Nil(t, err)
|
||||||
|
defer relayNode.Stop()
|
||||||
|
|
||||||
|
_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
|
||||||
|
require.Nil(t, err)
|
||||||
|
publisher := NewDefaultPublisher(nil, relayNode)
|
||||||
|
sender, err := NewMessageSender(Relay, publisher, utils.Logger())
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)}
|
||||||
|
sender.WithMessageSentCheck(check)
|
||||||
|
sender.WithMessageSentEmitter(host)
|
||||||
|
|
||||||
|
msg := &pb.WakuMessage{
|
||||||
|
Payload: []byte{1, 2, 3},
|
||||||
|
Timestamp: utils.GetUnixEpoch(),
|
||||||
|
ContentTopic: "test-content-topic",
|
||||||
|
}
|
||||||
|
envelope := protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), "test-pubsub-topic")
|
||||||
|
req := NewRequest(context.TODO(), envelope)
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
sub, err := host.EventBus().Subscribe(new(MessageSent))
|
||||||
|
require.Nil(t, err)
|
||||||
|
defer sub.Close()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for msgSentEvt := range sub.Out() {
|
||||||
|
msgSent := msgSentEvt.(MessageSent)
|
||||||
|
require.Equal(t, uint32(len(msg.Payload)), msgSent.Size)
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
err = sender.Send(req)
|
||||||
|
require.Nil(t, err)
|
||||||
|
go wg.Wait()
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user