mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-07 16:33:08 +00:00
test: Add test/refactor filter option
This commit is contained in:
parent
c9a9b02e48
commit
2ae370ca41
@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/status-im/go-waku/waku/v2/utils"
|
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
"go.opencensus.io/tag"
|
"go.opencensus.io/tag"
|
||||||
)
|
)
|
||||||
@ -27,13 +26,6 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
FilterSubscribeParameters struct {
|
|
||||||
host host.Host
|
|
||||||
selectedPeer peer.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
FilterSubscribeOption func(*FilterSubscribeParameters)
|
|
||||||
|
|
||||||
Filter struct {
|
Filter struct {
|
||||||
PeerID peer.ID
|
PeerID peer.ID
|
||||||
Topic string
|
Topic string
|
||||||
@ -65,41 +57,32 @@ type (
|
|||||||
// NOTE This is just a start, the design of this protocol isn't done yet. It
|
// NOTE This is just a start, the design of this protocol isn't done yet. It
|
||||||
// should be direct payload exchange (a la req-resp), not be coupled with the
|
// should be direct payload exchange (a la req-resp), not be coupled with the
|
||||||
// relay protocol.
|
// relay protocol.
|
||||||
|
|
||||||
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
||||||
|
|
||||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
|
||||||
return func(params *FilterSubscribeParameters) {
|
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
||||||
params.selectedPeer = p
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func WithAutomaticPeerSelection() FilterSubscribeOption {
|
wf := new(WakuFilter)
|
||||||
return func(params *FilterSubscribeParameters) {
|
wf.ctx = ctx
|
||||||
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1))
|
wf.MsgC = make(chan *protocol.Envelope)
|
||||||
if err == nil {
|
wf.h = host
|
||||||
params.selectedPeer = *p
|
wf.isFullNode = isFullNode
|
||||||
} else {
|
wf.filters = NewFilterMap()
|
||||||
log.Info("Error selecting peer: ", err)
|
wf.subscribers = NewSubscribers()
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
|
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
||||||
return func(params *FilterSubscribeParameters) {
|
go wf.FilterListener()
|
||||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
|
|
||||||
if err == nil {
|
|
||||||
params.selectedPeer = *p
|
|
||||||
} else {
|
|
||||||
log.Info("Error selecting peer: ", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultOptions() []FilterSubscribeOption {
|
if wf.isFullNode {
|
||||||
return []FilterSubscribeOption{
|
log.Info("Filter protocol started")
|
||||||
WithAutomaticPeerSelection(),
|
} else {
|
||||||
|
log.Info("Filter protocol started (only client mode)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return wf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilter) onRequest(s network.Stream) {
|
func (wf *WakuFilter) onRequest(s network.Stream) {
|
||||||
@ -148,32 +131,6 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
|
|
||||||
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
wf := new(WakuFilter)
|
|
||||||
wf.ctx = ctx
|
|
||||||
wf.MsgC = make(chan *protocol.Envelope)
|
|
||||||
wf.h = host
|
|
||||||
wf.isFullNode = isFullNode
|
|
||||||
wf.filters = NewFilterMap()
|
|
||||||
wf.subscribers = NewSubscribers()
|
|
||||||
|
|
||||||
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
|
||||||
go wf.FilterListener()
|
|
||||||
|
|
||||||
if wf.isFullNode {
|
|
||||||
log.Info("Filter protocol started")
|
|
||||||
} else {
|
|
||||||
log.Info("Filter protocol started (only client mode)")
|
|
||||||
}
|
|
||||||
|
|
||||||
return wf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
|
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
|
||||||
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
|
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
|
||||||
|
|
||||||
|
|||||||
52
waku/v2/protocol/filter/waku_filter_option.go
Normal file
52
waku/v2/protocol/filter/waku_filter_option.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package filter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
FilterSubscribeParameters struct {
|
||||||
|
host host.Host
|
||||||
|
selectedPeer peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
FilterSubscribeOption func(*FilterSubscribeParameters)
|
||||||
|
)
|
||||||
|
|
||||||
|
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||||
|
return func(params *FilterSubscribeParameters) {
|
||||||
|
params.selectedPeer = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithAutomaticPeerSelection() FilterSubscribeOption {
|
||||||
|
return func(params *FilterSubscribeParameters) {
|
||||||
|
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1))
|
||||||
|
if err == nil {
|
||||||
|
params.selectedPeer = *p
|
||||||
|
} else {
|
||||||
|
log.Info("Error selecting peer: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
|
||||||
|
return func(params *FilterSubscribeParameters) {
|
||||||
|
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
|
||||||
|
if err == nil {
|
||||||
|
params.selectedPeer = *p
|
||||||
|
} else {
|
||||||
|
log.Info("Error selecting peer: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultOptions() []FilterSubscribeOption {
|
||||||
|
return []FilterSubscribeOption{
|
||||||
|
WithAutomaticPeerSelection(),
|
||||||
|
}
|
||||||
|
}
|
||||||
34
waku/v2/protocol/filter/waku_filter_option_test.go
Normal file
34
waku/v2/protocol/filter/waku_filter_option_test.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package filter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/status-im/go-waku/tests"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFilterOption(t *testing.T) {
|
||||||
|
port, err := tests.FindFreePort(t, "", 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
options := []FilterSubscribeOption{
|
||||||
|
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||||
|
WithAutomaticPeerSelection(),
|
||||||
|
WithFastestPeerSelection(context.Background()),
|
||||||
|
}
|
||||||
|
|
||||||
|
params := new(FilterSubscribeParameters)
|
||||||
|
params.host = host
|
||||||
|
|
||||||
|
for _, opt := range options {
|
||||||
|
opt(params)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, host, params.host)
|
||||||
|
require.NotNil(t, params.selectedPeer)
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user