fix: filter can submit broadcaster envelopes

This commit is contained in:
Richard Ramos 2022-12-08 23:09:15 -04:00 committed by RichΛrd
parent 84c7022e2d
commit a66bf8a893
5 changed files with 65 additions and 35 deletions

View File

@ -3,14 +3,17 @@ package filter
import (
"sync"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
type FilterMap struct {
sync.RWMutex
items map[string]Filter
timesource timesource.Timesource
items map[string]Filter
broadcaster v2.Broadcaster
}
type FilterMapItem struct {
@ -18,9 +21,11 @@ type FilterMapItem struct {
Value Filter
}
func NewFilterMap() *FilterMap {
func NewFilterMap(broadcaster v2.Broadcaster, timesource timesource.Timesource) *FilterMap {
return &FilterMap{
items: make(map[string]Filter),
timesource: timesource,
items: make(map[string]Filter),
broadcaster: broadcaster,
}
}
@ -79,24 +84,29 @@ func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) {
fm.RLock()
defer fm.RUnlock()
for key, filter := range fm.items {
envelope := protocol.NewEnvelope(msg, utils.GetUnixEpoch(), filter.Topic)
filter, ok := fm.items[requestId]
if !ok {
// We do this because the key for the filter is set to the requestId received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestId of the coresponding filter.
if requestId != "" && requestId == key {
filter.Chan <- envelope
continue
}
return
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic)
// Broadcasting message so it's stored
fm.broadcaster.Submit(envelope)
if msg.ContentTopic == "" {
filter.Chan <- envelope
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}

View File

@ -4,11 +4,13 @@ import (
"testing"
"github.com/stretchr/testify/require"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
func TestFilterMap(t *testing.T) {
fmap := NewFilterMap()
fmap := NewFilterMap(v2.NewBroadcaster(100), timesource.NewDefaultClock())
filter := Filter{
PeerID: "id",

View File

@ -13,9 +13,12 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
@ -62,7 +65,7 @@ type (
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
func NewWakuFilter(ctx context.Context, host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
wf := new(WakuFilter)
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
@ -84,7 +87,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za
wf.MsgC = make(chan *protocol.Envelope, 1024)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.timeout)
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
@ -126,6 +129,10 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible
subscriber.filter.Topic = relay.DefaultWakuTopic
}
len := wf.subscribers.Append(subscriber)
logger.Info("adding subscriber")
@ -192,7 +199,7 @@ func (wf *WakuFilter) filterListener() {
for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) {
logger := logger.With(logging.HostID("subscriber", subscriber.peer))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != "" && subscriber.filter.Topic != pubsubTopic {
if subscriber.filter.Topic != pubsubTopic {
logger.Info("pubsub topic mismatch",
zap.String("subscriberTopic", subscriber.filter.Topic),
zap.String("messageTopic", pubsubTopic))

View File

@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -23,7 +24,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger())
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
@ -39,7 +40,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger())
filter, _ := NewWakuFilter(context.Background(), host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
return filter, host
}
@ -69,7 +70,7 @@ func TestWakuFilter(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger())
node2Filter, _ := NewWakuFilter(ctx, host2, broadcaster, true, timesource.NewDefaultClock(), utils.Logger())
broadcaster.Register(&testTopic, node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
@ -154,7 +155,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger(), WithTimeout(3*time.Second))
node2Filter, _ := NewWakuFilter(ctx, host2, v2.NewBroadcaster(10), true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second))
broadcaster.Register(&testTopic, node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)

View File

@ -15,19 +15,29 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
var testTopic = "test"
func makeFilterService(t *testing.T) *FilterService {
n, err := node.New(context.Background(), node.WithWakuFilter(true), node.WithWakuRelay())
func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
var nodeOpts []node.WakuNodeOption
nodeOpts = append(nodeOpts, node.WithWakuFilter(isFullNode))
if isFullNode {
nodeOpts = append(nodeOpts, node.WithWakuRelay())
}
n, err := node.New(context.Background(), nodeOpts...)
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
_, err = n.Relay().SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err)
if isFullNode {
_, err = n.Relay().SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err)
}
return NewFilterService(n, 30, utils.Logger())
}
@ -39,15 +49,15 @@ func TestFilterSubscription(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger())
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger())
require.NoError(t, err)
_, err = node.SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err)
_, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger())
_, _ = filter.NewWakuFilter(context.Background(), host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
d := makeFilterService(t)
d := makeFilterService(t, true)
defer d.node.Stop()
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty()))
@ -83,10 +93,10 @@ func TestFilterSubscription(t *testing.T) {
}
func TestFilterGetV1Messages(t *testing.T) {
serviceA := makeFilterService(t)
serviceA := makeFilterService(t, true)
var reply SuccessReply
serviceB := makeFilterService(t)
serviceB := makeFilterService(t, false)
go serviceB.Start()
defer serviceB.Stop()