From 0db40c7de58bcbb4398b92f6b98f58ebb02aff6d Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 18 Feb 2022 13:49:11 -0400 Subject: [PATCH] fix: broadcaster blocked after publishing 1024 messages (#188) --- waku/node.go | 3 +- waku/v2/node/wakunode2.go | 3 +- waku/v2/node/wakunode2_test.go | 107 +++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/waku/node.go b/waku/node.go index 727c8804..b923e0f7 100644 --- a/waku/node.go +++ b/waku/node.go @@ -233,8 +233,9 @@ func Execute(options Options) { if !options.Relay.Disable { for _, nodeTopic := range options.Relay.Topics { - _, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) + sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") + wakuNode.Broadcaster().Unregister(sub.C) } } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 6de72f9a..b8f10cdd 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -414,10 +414,11 @@ func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) } if w.opts.enableRelay { - _, err = w.relay.Subscribe(w.ctx) + sub, err := w.relay.Subscribe(w.ctx) if err != nil { return err } + w.Broadcaster().Unregister(sub.C) } // TODO: rlnRelay diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 1ff18af0..e3d44da9 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -2,8 +2,11 @@ package node import ( "context" + "fmt" "net" + "sync" "testing" + "time" "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/tests" @@ -32,3 +35,107 @@ func TestWakuNode2(t *testing.T) { require.NoError(t, err) } + +func Test1100(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + key1, _ := tests.RandomHex(32) + prvKey1, _ := crypto.HexToECDSA(key1) + + hostAddr2, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + key2, _ := tests.RandomHex(32) + prvKey2, _ := crypto.HexToECDSA(key2) + + wakuNode1, err := New(ctx, + WithPrivateKey(prvKey1), + WithHostAddress(hostAddr1), + WithWakuRelay(), + ) + require.NoError(t, err) + err = wakuNode1.Start() + require.NoError(t, err) + defer wakuNode1.Stop() + + wakuNode2, err := New(ctx, + WithPrivateKey(prvKey2), + WithHostAddress(hostAddr2), + WithWakuRelay(), + ) + require.NoError(t, err) + err = wakuNode2.Start() + require.NoError(t, err) + defer wakuNode2.Stop() + + err = wakuNode2.DialPeer(ctx, wakuNode1.ListenAddresses()[0].String()) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + sub1, err := wakuNode1.Relay().Subscribe(ctx) + require.NoError(t, err) + sub2, err := wakuNode1.Relay().Subscribe(ctx) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(3) + go func() { + defer wg.Done() + + ticker := time.NewTimer(20 * time.Second) + defer ticker.Stop() + + msgCnt := 0 + for { + select { + case <-ticker.C: + if msgCnt != 1100 { + require.Fail(t, "Timeout Sub1") + } + case <-sub1.C: + msgCnt++ + if msgCnt == 1100 { + return + } + } + } + }() + + go func() { + defer wg.Done() + + ticker := time.NewTimer(20 * time.Second) + defer ticker.Stop() + + msgCnt := 0 + for { + select { + case <-ticker.C: + if msgCnt != 1100 { + require.Fail(t, "Timeout Sub2") + } + case <-sub2.C: + msgCnt++ + if msgCnt == 1100 { + return + } + } + } + }() + + go func() { + defer wg.Done() + for i := 1; i <= 1100; i++ { + msg := createTestMsg(0) + msg.Payload = []byte(fmt.Sprint(i)) + msg.Timestamp = float64(i) + if err := wakuNode2.Publish(ctx, msg); err != nil { + require.Fail(t, "Could not publish all messages") + } + } + }() + + wg.Wait() + +}