diff --git a/go.mod b/go.mod index 3e6a9415e..f3cdbb5fa 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 + github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 6f43fbe2d..31adf513a 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 h1:LKKgMmvUYFOzrWVQYLbI4nmXza4hTY7XsFk9WAXL/r0= -github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ= +github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index bafde783b..058af9a48 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -144,6 +144,13 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter j = len(contentTopics) } + select { + case <-interest.ctx.Done(): + return + default: + // continue... + } + now := m.timesource.Now() err := m.fetchMessagesBatch(c, interest, i, j, now) if err != nil { @@ -260,6 +267,13 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, j = len(missingHashes) } + select { + case <-interest.ctx.Done(): + return nil + default: + // continue... + } + wg.Add(1) go func(messageHashes []pb.MessageHash) { defer wg.Wait() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go index ad153ff9c..03b7a16a6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go @@ -3,6 +3,7 @@ package publish import ( "container/heap" "context" + "sync" "github.com/waku-org/go-waku/waku/v2/protocol" ) @@ -59,6 +60,44 @@ func (pq *envelopePriorityQueue) Pop() any { return item } +type safeEnvelopePriorityQueue struct { + pq envelopePriorityQueue + lock sync.Mutex +} + +func (spq *safeEnvelopePriorityQueue) Push(task *envelopePriority) { + spq.lock.Lock() + defer spq.lock.Unlock() + heap.Push(&spq.pq, task) +} + +func (spq *safeEnvelopePriorityQueue) Pop() *envelopePriority { + spq.lock.Lock() + defer spq.lock.Unlock() + + if len(spq.pq) == 0 { + return nil + } + task := heap.Pop(&spq.pq).(*envelopePriority) + return task +} + +// Len returns the length of the priority queue in a thread-safe manner +func (spq *safeEnvelopePriorityQueue) Len() int { + spq.lock.Lock() + defer spq.lock.Unlock() + + return spq.pq.Len() +} + +func newSafePriorityQueue() *safeEnvelopePriorityQueue { + result := &safeEnvelopePriorityQueue{ + pq: make(envelopePriorityQueue, 0), + } + heap.Init(&result.pq) + return result +} + // MessageQueue is a structure used to handle the ordering of the messages to publish type MessageQueue struct { usePriorityQueue bool @@ -66,7 +105,7 @@ type MessageQueue struct { toSendChan chan *protocol.Envelope throttledPrioritySendQueue chan *envelopePriority envelopeAvailableOnPriorityQueueSignal chan struct{} - envelopePriorityQueue envelopePriorityQueue + envelopePriorityQueue *safeEnvelopePriorityQueue } // NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a @@ -77,10 +116,9 @@ func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue { } if m.usePriorityQueue { - m.envelopePriorityQueue = make(envelopePriorityQueue, 0) + m.envelopePriorityQueue = newSafePriorityQueue() m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize) m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize) - heap.Init(&m.envelopePriorityQueue) } else { m.toSendChan = make(chan *protocol.Envelope, bufferSize) } @@ -98,8 +136,7 @@ func (m *MessageQueue) Start(ctx context.Context) { continue } - heap.Push(&m.envelopePriorityQueue, envelopePriority) - + m.envelopePriorityQueue.Push(envelopePriority) m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} case <-ctx.Done(): @@ -150,7 +187,10 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope { select { case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: if ok { - ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope + e := m.envelopePriorityQueue.Pop() + if e != nil { + ch <- e.envelope + } } case envelope, ok := <-m.toSendChan: diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go index eb28d5171..94ebbb74a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go @@ -59,7 +59,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t if allPeersPingDuration != 0 { allPeersTicker := time.NewTicker(allPeersPingDuration) defer allPeersTicker.Stop() - randomPeersTickerC = allPeersTicker.C + allPeersTickerC = allPeersTicker.C } lastTimeExecuted := w.timesource.Now() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go index 936b37c58..2ff8329af 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -280,12 +280,20 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . if err != nil { return pb.MessageHash{}, err } + _, err = w.subscribeToPubsubTopic(params.pubsubTopic) + if err != nil { + return pb.MessageHash{}, err + } } if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) { return pb.MessageHash{}, errors.New("not enough peers to publish") } + if !w.IsSubscribed(params.pubsubTopic) { + return pb.MessageHash{}, errors.New("cannot publish to unsubscribed topic") + } + w.topicsMutex.Lock() defer w.topicsMutex.Unlock() diff --git a/vendor/modules.txt b/vendor/modules.txt index 7f5b06bff..67645f7a7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1010,7 +1010,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 +# github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests