chore_: upgrade go-waku (#5799)

This commit is contained in:
Iuri Matias 2024-09-09 15:02:22 -04:00 committed by GitHub
parent 00db9c8a9b
commit e3dd2b2377
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 73 additions and 11 deletions

2
go.mod
View File

@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 h1:LKKgMmvUYFOzrWVQYLbI4nmXza4hTY7XsFk9WAXL/r0= github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ=
github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -144,6 +144,13 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
j = len(contentTopics) j = len(contentTopics)
} }
select {
case <-interest.ctx.Done():
return
default:
// continue...
}
now := m.timesource.Now() now := m.timesource.Now()
err := m.fetchMessagesBatch(c, interest, i, j, now) err := m.fetchMessagesBatch(c, interest, i, j, now)
if err != nil { if err != nil {
@ -260,6 +267,13 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
j = len(missingHashes) j = len(missingHashes)
} }
select {
case <-interest.ctx.Done():
return nil
default:
// continue...
}
wg.Add(1) wg.Add(1)
go func(messageHashes []pb.MessageHash) { go func(messageHashes []pb.MessageHash) {
defer wg.Wait() defer wg.Wait()

View File

@ -3,6 +3,7 @@ package publish
import ( import (
"container/heap" "container/heap"
"context" "context"
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
) )
@ -59,6 +60,44 @@ func (pq *envelopePriorityQueue) Pop() any {
return item return item
} }
type safeEnvelopePriorityQueue struct {
pq envelopePriorityQueue
lock sync.Mutex
}
func (spq *safeEnvelopePriorityQueue) Push(task *envelopePriority) {
spq.lock.Lock()
defer spq.lock.Unlock()
heap.Push(&spq.pq, task)
}
func (spq *safeEnvelopePriorityQueue) Pop() *envelopePriority {
spq.lock.Lock()
defer spq.lock.Unlock()
if len(spq.pq) == 0 {
return nil
}
task := heap.Pop(&spq.pq).(*envelopePriority)
return task
}
// Len returns the length of the priority queue in a thread-safe manner
func (spq *safeEnvelopePriorityQueue) Len() int {
spq.lock.Lock()
defer spq.lock.Unlock()
return spq.pq.Len()
}
func newSafePriorityQueue() *safeEnvelopePriorityQueue {
result := &safeEnvelopePriorityQueue{
pq: make(envelopePriorityQueue, 0),
}
heap.Init(&result.pq)
return result
}
// MessageQueue is a structure used to handle the ordering of the messages to publish // MessageQueue is a structure used to handle the ordering of the messages to publish
type MessageQueue struct { type MessageQueue struct {
usePriorityQueue bool usePriorityQueue bool
@ -66,7 +105,7 @@ type MessageQueue struct {
toSendChan chan *protocol.Envelope toSendChan chan *protocol.Envelope
throttledPrioritySendQueue chan *envelopePriority throttledPrioritySendQueue chan *envelopePriority
envelopeAvailableOnPriorityQueueSignal chan struct{} envelopeAvailableOnPriorityQueueSignal chan struct{}
envelopePriorityQueue envelopePriorityQueue envelopePriorityQueue *safeEnvelopePriorityQueue
} }
// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a // NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a
@ -77,10 +116,9 @@ func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue {
} }
if m.usePriorityQueue { if m.usePriorityQueue {
m.envelopePriorityQueue = make(envelopePriorityQueue, 0) m.envelopePriorityQueue = newSafePriorityQueue()
m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize) m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize)
m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize) m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize)
heap.Init(&m.envelopePriorityQueue)
} else { } else {
m.toSendChan = make(chan *protocol.Envelope, bufferSize) m.toSendChan = make(chan *protocol.Envelope, bufferSize)
} }
@ -98,8 +136,7 @@ func (m *MessageQueue) Start(ctx context.Context) {
continue continue
} }
heap.Push(&m.envelopePriorityQueue, envelopePriority) m.envelopePriorityQueue.Push(envelopePriority)
m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}
case <-ctx.Done(): case <-ctx.Done():
@ -150,7 +187,10 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
select { select {
case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
if ok { if ok {
ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope e := m.envelopePriorityQueue.Pop()
if e != nil {
ch <- e.envelope
}
} }
case envelope, ok := <-m.toSendChan: case envelope, ok := <-m.toSendChan:

View File

@ -59,7 +59,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
if allPeersPingDuration != 0 { if allPeersPingDuration != 0 {
allPeersTicker := time.NewTicker(allPeersPingDuration) allPeersTicker := time.NewTicker(allPeersPingDuration)
defer allPeersTicker.Stop() defer allPeersTicker.Stop()
randomPeersTickerC = allPeersTicker.C allPeersTickerC = allPeersTicker.C
} }
lastTimeExecuted := w.timesource.Now() lastTimeExecuted := w.timesource.Now()

View File

@ -280,12 +280,20 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
if err != nil { if err != nil {
return pb.MessageHash{}, err return pb.MessageHash{}, err
} }
_, err = w.subscribeToPubsubTopic(params.pubsubTopic)
if err != nil {
return pb.MessageHash{}, err
}
} }
if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) { if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) {
return pb.MessageHash{}, errors.New("not enough peers to publish") return pb.MessageHash{}, errors.New("not enough peers to publish")
} }
if !w.IsSubscribed(params.pubsubTopic) {
return pb.MessageHash{}, errors.New("cannot publish to unsubscribed topic")
}
w.topicsMutex.Lock() w.topicsMutex.Lock()
defer w.topicsMutex.Unlock() defer w.topicsMutex.Unlock()

2
vendor/modules.txt vendored
View File

@ -1010,7 +1010,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 # github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests