chore_: extract message hash query for outgoing messages to go-waku (#5652)

* chore_: extract message sent check to go-waku

* chore_: clear tests

* chore_: refactor confirm messages sent
This commit is contained in:
kaichao 2024-08-08 15:14:04 +08:00 committed by GitHub
parent d7fcbd3444
commit 45cea612d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 38 additions and 230 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 h1:aTOUQm0kKtHiqraFpqj1Ja++C+qyZyeiSPKtXe3Ctac=
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 h1:UN5y6imIQBXnuq/bPAYJgT6XMZRgQgUO5Mn9VFi3c5A=
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -125,8 +125,8 @@ func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) {
m.storePeerID = peerID
}
// CheckIfMessagesStored checks if the tracked outgoing messages are stored periodically
func (m *MessageSentCheck) CheckIfMessagesStored() {
// Start checks if the tracked outgoing messages are stored periodically
func (m *MessageSentCheck) Start() {
ticker := time.NewTicker(m.hashQueryInterval)
defer ticker.Stop()
for {

2
vendor/modules.txt vendored
View File

@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9
# github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests

View File

@ -19,7 +19,6 @@
package wakuv2
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
@ -87,10 +86,6 @@ const requestTimeout = 30 * time.Second
const bootnodesQueryBackoffMs = 200
const bootnodesMaxRetries = 7
const cacheTTL = 20 * time.Minute
const maxHashQueryLength = 100
const hashQueryInterval = 3 * time.Second
const messageSentPeriod = 3 // in seconds
const messageExpiredPerid = 10 // in seconds
const maxRelayPeers = 300
const randomPeersKeepAliveInterval = 5 * time.Second
const allPeersKeepAliveInterval = 5 * time.Minute
@ -159,10 +154,7 @@ type Waku struct {
storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
storeMsgIDsMu sync.RWMutex
sendMsgIDs map[string]map[gethcommon.Hash]uint32
sendMsgIDsMu sync.RWMutex
storePeerID peer.ID
messageSentCheck *publish.MessageSentCheck
topicHealthStatusChan chan peermanager.TopicHealthStatus
connectionNotifChan chan node.PeerConnection
@ -245,8 +237,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
storeMsgIDs: make(map[gethcommon.Hash]bool),
timesource: ts,
storeMsgIDsMu: sync.RWMutex{},
sendMsgIDs: make(map[string]map[gethcommon.Hash]uint32),
sendMsgIDsMu: sync.RWMutex{},
logger: logger,
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
@ -995,158 +985,17 @@ func (w *Waku) SkipPublishToTopic(value bool) {
w.cfg.SkipPublishToTopic = value
}
func (w *Waku) checkIfMessagesStored() {
if !w.cfg.EnableStoreConfirmationForMessagesSent {
return
}
ticker := time.NewTicker(hashQueryInterval)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
w.logger.Debug("stop the look for message stored check")
return
case <-ticker.C:
w.sendMsgIDsMu.Lock()
w.logger.Debug("running loop for messages stored check", zap.Any("messageIds", w.sendMsgIDs))
pubsubTopics := make([]string, 0, len(w.sendMsgIDs))
pubsubMessageIds := make([][]gethcommon.Hash, 0, len(w.sendMsgIDs))
pubsubMessageTime := make([][]uint32, 0, len(w.sendMsgIDs))
for pubsubTopic, subMsgs := range w.sendMsgIDs {
var queryMsgIds []gethcommon.Hash
var queryMsgTime []uint32
for msgID, sendTime := range subMsgs {
if len(queryMsgIds) >= maxHashQueryLength {
break
}
// message is sent 5 seconds ago, check if it's stored
if uint32(w.timesource.Now().Unix()) > sendTime+messageSentPeriod {
queryMsgIds = append(queryMsgIds, msgID)
queryMsgTime = append(queryMsgTime, sendTime)
}
}
w.logger.Debug("store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic))
if len(queryMsgIds) > 0 {
pubsubTopics = append(pubsubTopics, pubsubTopic)
pubsubMessageIds = append(pubsubMessageIds, queryMsgIds)
pubsubMessageTime = append(pubsubMessageTime, queryMsgTime)
}
}
w.sendMsgIDsMu.Unlock()
pubsubProcessedMessages := make([][]gethcommon.Hash, len(pubsubTopics))
for i, pubsubTopic := range pubsubTopics {
processedMessages := w.messageHashBasedQuery(w.ctx, pubsubMessageIds[i], pubsubMessageTime[i], pubsubTopic)
pubsubProcessedMessages[i] = processedMessages
}
w.sendMsgIDsMu.Lock()
for i, pubsubTopic := range pubsubTopics {
subMsgs, ok := w.sendMsgIDs[pubsubTopic]
if !ok {
continue
}
for _, hash := range pubsubProcessedMessages[i] {
delete(subMsgs, hash)
if len(subMsgs) == 0 {
delete(w.sendMsgIDs, pubsubTopic)
} else {
w.sendMsgIDs[pubsubTopic] = subMsgs
}
}
}
w.logger.Debug("messages for next store hash query", zap.Any("messageIds", w.sendMsgIDs))
w.sendMsgIDsMu.Unlock()
}
}
}
func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
if !w.cfg.EnableStoreConfirmationForMessagesSent {
return
}
w.sendMsgIDsMu.Lock()
defer w.sendMsgIDsMu.Unlock()
for pubsubTopic, subMsgs := range w.sendMsgIDs {
for _, hash := range hashes {
delete(subMsgs, hash)
if len(subMsgs) == 0 {
delete(w.sendMsgIDs, pubsubTopic)
} else {
w.sendMsgIDs[pubsubTopic] = subMsgs
}
}
}
w.messageSentCheck.DeleteByMessageIDs(hashes)
}
func (w *Waku) SetStorePeerID(peerID peer.ID) {
w.storePeerID = peerID
}
// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes
func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, relayTime []uint32, pubsubTopic string) []gethcommon.Hash {
selectedPeer := w.storePeerID
if selectedPeer == "" {
w.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
return []gethcommon.Hash{}
if w.messageSentCheck != nil {
w.messageSentCheck.SetStorePeerID(peerID)
}
var opts []store.RequestOption
requestID := protocol.GenerateRequestID()
opts = append(opts, store.WithRequestID(requestID))
opts = append(opts, store.WithPeer(selectedPeer))
opts = append(opts, store.WithPaging(false, maxHashQueryLength))
opts = append(opts, store.IncludeData(false))
messageHashes := make([]pb.MessageHash, len(hashes))
for i, hash := range hashes {
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}
w.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes))
result, err := w.node.Store().QueryByHash(ctx, messageHashes, opts...)
if err != nil {
w.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
return []gethcommon.Hash{}
}
w.logger.Debug("store.queryByHash result", zap.String("requestID", hexutil.Encode(requestID)), zap.Int("messages", len(result.Messages())))
var ackHashes []gethcommon.Hash
var missedHashes []gethcommon.Hash
for i, hash := range hashes {
found := false
for _, msg := range result.Messages() {
if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) {
found = true
break
}
}
if found {
ackHashes = append(ackHashes, hash)
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeSent,
})
}
if !found && uint32(w.timesource.Now().Unix()) > relayTime[i]+messageExpiredPerid {
missedHashes = append(missedHashes, hash)
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeExpired,
})
}
}
w.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes))
w.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes))
return append(ackHashes, missedHashes...)
}
func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCriteria, cursor []byte, opts []store.RequestOption, processEnvelopes bool) ([]byte, int, error) {
@ -1340,7 +1189,9 @@ func (w *Waku) Start() error {
go w.sendQueue.Start(w.ctx)
go w.checkIfMessagesStored()
if w.cfg.EnableStoreConfirmationForMessagesSent {
w.confirmMessagesSent()
}
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
w.wg.Add(1)
@ -1349,6 +1200,30 @@ func (w *Waku) Start() error {
return nil
}
func (w *Waku) confirmMessagesSent() {
w.messageSentCheck = publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), w.logger)
go w.messageSentCheck.Start()
go func() {
for {
select {
case <-w.ctx.Done():
return
case hash := <-w.messageSentCheck.MessageStoredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeSent,
})
case hash := <-w.messageSentCheck.MessageExpiredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeExpired,
})
}
}
}()
}
func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) {
w.poolMu.Lock()
defer w.poolMu.Unlock()
@ -1538,14 +1413,7 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
ephemeral := e.Envelope.Message().Ephemeral
if w.cfg.EnableStoreConfirmationForMessagesSent && e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) {
w.sendMsgIDsMu.Lock()
subMsgs, ok := w.sendMsgIDs[e.PubsubTopic]
if !ok {
subMsgs = make(map[gethcommon.Hash]uint32)
}
subMsgs[e.Hash()] = e.Sent
w.sendMsgIDs[e.PubsubTopic] = subMsgs
w.sendMsgIDsMu.Unlock()
w.messageSentCheck.Add(e.PubsubTopic, e.Hash(), e.Sent)
}
matched := w.filters.NotifyWatchers(e)

View File

@ -17,7 +17,6 @@ import (
"github.com/libp2p/go-libp2p/core/metrics"
libp2pprotocol "github.com/libp2p/go-libp2p/core/protocol"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
ethdnsdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
@ -40,7 +39,6 @@ import (
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/wakuv2/common"
@ -599,64 +597,6 @@ func waitForEnvelope(t *testing.T, contentTopic string, envCh chan common.Envelo
}
}
func TestConfirmMessageDelivered(t *testing.T) {
aliceConfig := &Config{}
aliceNode, err := New(nil, "", aliceConfig, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, aliceNode.Start())
bobConfig := &Config{}
bobNode, err := New(nil, "", bobConfig, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, bobNode.Start())
addrs := aliceNode.ListenAddresses()
require.Greater(t, len(addrs), 0)
_, err = bobNode.AddRelayPeer(addrs[0])
require.NoError(t, err)
err = bobNode.DialPeer(addrs[0])
require.NoError(t, err)
filter := &common.Filter{
Messages: common.NewMemoryMessageStore(),
ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}),
}
_, err = aliceNode.Subscribe(filter)
require.NoError(t, err)
msgTimestamp := aliceNode.timestamp()
contentTopic := maps.Keys(filter.ContentTopics)[0]
_, err = aliceNode.Send(shard.DefaultShardPubsubTopic(), &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
Ephemeral: proto.Bool(false),
}, nil)
require.NoError(t, err)
time.Sleep(1 * time.Second)
messages := filter.Retrieve()
require.Len(t, messages, 1)
require.Len(t, aliceNode.sendMsgIDs, 1)
for _, msgs := range aliceNode.sendMsgIDs {
require.Len(t, msgs, 1)
for hash := range msgs {
require.Equal(t, hash, messages[0].Hash())
}
}
aliceNode.ConfirmMessageDelivered([]ethcommon.Hash{messages[0].Hash()})
require.Len(t, aliceNode.sendMsgIDs, 0)
require.NoError(t, aliceNode.Stop())
require.NoError(t, bobNode.Stop())
}
func TestOnlineChecker(t *testing.T) {
w, err := New(nil, "shards.staging", nil, nil, nil, nil, nil, nil)
require.NoError(t, w.Start())