Push subscriptions in parallel

This commit is contained in:
Nicholas Molnar 2022-05-16 09:19:42 -07:00 committed by Richard Ramos
parent 4664737faa
commit 38fc9ee8bb
3 changed files with 15 additions and 4 deletions

1
go.mod
View File

@ -31,5 +31,6 @@ require (
github.com/urfave/cli/v2 v2.4.0 github.com/urfave/cli/v2 v2.4.0
go.opencensus.io v0.23.0 go.opencensus.io v0.23.0
go.uber.org/zap v1.21.0 go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect
google.golang.org/protobuf v1.28.0 // indirect google.golang.org/protobuf v1.28.0 // indirect
) )

2
go.sum
View File

@ -1224,6 +1224,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -19,6 +19,7 @@ import (
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -187,9 +188,11 @@ func (wf *WakuFilter) FilterListener() {
handle := func(envelope *protocol.Envelope) error { // async handle := func(envelope *protocol.Envelope) error { // async
msg := envelope.Message() msg := envelope.Message()
topic := envelope.PubsubTopic() topic := envelope.PubsubTopic()
g, _ := errgroup.WithContext(context.Background())
// Each subscriber is a light node that earlier on invoked // Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node // a FilterRequest on this node
for subscriber := range wf.subscribers.Items() { for subscriber := range wf.subscribers.Items() {
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic {
wf.log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) wf.log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic)
continue continue
@ -200,15 +203,20 @@ func (wf *WakuFilter) FilterListener() {
wf.log.Info("found matching contentTopic ", filter, msg) wf.log.Info("found matching contentTopic ", filter, msg)
// Do a message push to light node // Do a message push to light node
wf.log.Info("pushing messages to light node: ", subscriber.peer) wf.log.Info("pushing messages to light node: ", subscriber.peer)
if err := wf.pushMessage(subscriber, msg); err != nil { g.Go(func() (err error) {
err = wf.pushMessage(subscriber, msg)
if err != nil {
wf.log.Error(err)
}
return err return err
} })
// Break if we have found a match
break
} }
} }
} }
return nil return g.Wait()
} }
for m := range wf.MsgC { for m := range wf.MsgC {