From 783ad01d92d9d5bbd8d8f1692a5858d28477d12d Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 15 Mar 2021 19:59:18 -0400 Subject: [PATCH] Use a ticker for getting next message --- main.go | 5 +++-- waku/v2/node/wakunode2.go | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index e1f8b9ec..5be96605 100644 --- a/main.go +++ b/main.go @@ -49,7 +49,7 @@ func main() { } fmt.Println("Received message:", string(payload)) - //sub.Unsubscribe() + // sub.Unsubscribe() } } }() @@ -57,7 +57,8 @@ func main() { // Write loop go func() { for { - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) + var contentTopic uint32 = 1 var version uint32 = 0 diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 23a02a76..f283dab4 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -8,6 +8,7 @@ import ( "log" "net" "sync" + "time" proto "github.com/golang/protobuf/proto" "github.com/libp2p/go-libp2p" @@ -181,6 +182,9 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { subscription.quit = make(chan struct{}) go func(ctx context.Context, sub *pubsub.Subscription) { + nextMsgTicker := time.NewTicker(time.Millisecond * 10) + defer nextMsgTicker.Stop() + for { select { case <-subscription.quit: @@ -189,10 +193,11 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { close(subscription.C) subscription.closed = true return - default: + case <-nextMsgTicker.C: msg, err := sub.Next(ctx) if err != nil { + fmt.Println("Error receiving message", err) return // Should close channel? }