fix: broadcaster blocked after publishing 1024 messages (#188)

This commit is contained in:
Richard Ramos 2022-02-18 13:49:11 -04:00 committed by GitHub
parent 11d1f8fb0d
commit 0db40c7de5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 2 deletions

View File

@ -233,8 +233,9 @@ func Execute(options Options) {
if !options.Relay.Disable { if !options.Relay.Disable {
for _, nodeTopic := range options.Relay.Topics { for _, nodeTopic := range options.Relay.Topics {
_, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
failOnErr(err, "Error subscring to topic") failOnErr(err, "Error subscring to topic")
wakuNode.Broadcaster().Unregister(sub.C)
} }
} }

View File

@ -414,10 +414,11 @@ func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option)
} }
if w.opts.enableRelay { if w.opts.enableRelay {
_, err = w.relay.Subscribe(w.ctx) sub, err := w.relay.Subscribe(w.ctx)
if err != nil { if err != nil {
return err return err
} }
w.Broadcaster().Unregister(sub.C)
} }
// TODO: rlnRelay // TODO: rlnRelay

View File

@ -2,8 +2,11 @@ package node
import ( import (
"context" "context"
"fmt"
"net" "net"
"sync"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
@ -32,3 +35,107 @@ func TestWakuNode2(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func Test1100(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key1, _ := tests.RandomHex(32)
prvKey1, _ := crypto.HexToECDSA(key1)
hostAddr2, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key2, _ := tests.RandomHex(32)
prvKey2, _ := crypto.HexToECDSA(key2)
wakuNode1, err := New(ctx,
WithPrivateKey(prvKey1),
WithHostAddress(hostAddr1),
WithWakuRelay(),
)
require.NoError(t, err)
err = wakuNode1.Start()
require.NoError(t, err)
defer wakuNode1.Stop()
wakuNode2, err := New(ctx,
WithPrivateKey(prvKey2),
WithHostAddress(hostAddr2),
WithWakuRelay(),
)
require.NoError(t, err)
err = wakuNode2.Start()
require.NoError(t, err)
defer wakuNode2.Stop()
err = wakuNode2.DialPeer(ctx, wakuNode1.ListenAddresses()[0].String())
require.NoError(t, err)
time.Sleep(2 * time.Second)
sub1, err := wakuNode1.Relay().Subscribe(ctx)
require.NoError(t, err)
sub2, err := wakuNode1.Relay().Subscribe(ctx)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
ticker := time.NewTimer(20 * time.Second)
defer ticker.Stop()
msgCnt := 0
for {
select {
case <-ticker.C:
if msgCnt != 1100 {
require.Fail(t, "Timeout Sub1")
}
case <-sub1.C:
msgCnt++
if msgCnt == 1100 {
return
}
}
}
}()
go func() {
defer wg.Done()
ticker := time.NewTimer(20 * time.Second)
defer ticker.Stop()
msgCnt := 0
for {
select {
case <-ticker.C:
if msgCnt != 1100 {
require.Fail(t, "Timeout Sub2")
}
case <-sub2.C:
msgCnt++
if msgCnt == 1100 {
return
}
}
}
}()
go func() {
defer wg.Done()
for i := 1; i <= 1100; i++ {
msg := createTestMsg(0)
msg.Payload = []byte(fmt.Sprint(i))
msg.Timestamp = float64(i)
if err := wakuNode2.Publish(ctx, msg); err != nil {
require.Fail(t, "Could not publish all messages")
}
}
}()
wg.Wait()
}