feat : autoshard relay api (#807)

* fix: using relay without bcaster should consume and drop messages

* update relay api usage

* move subscription to broadcaster

* move filter logic under subscription

* Support more than 1 relay subscription for a pubSubTopic

* modify relay Publish API to derive pubSubTopic based on autosharding

* implement relay RPC methods for autosharding

* remove relay msgChannel and relay on pubsub buffersize for subscription

Co-authored-by: richΛrd <info@richardramos.me>

* handle relay subscribe with noConsumer and address issue reported in code review

* chore: reorg relay code

---------

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2023-10-21 01:26:18 +05:30 committed by GitHub
parent 4af7e7a500
commit b5be83a02e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 996 additions and 501 deletions

View File

@ -382,15 +382,13 @@ func Execute(options NodeOptions) error {
var wg sync.WaitGroup
if options.Relay.Enable {
for nodeTopic := range pubSubTopicMap {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
if err != nil {
return err
}
sub.Unsubscribe()
if len(options.Rendezvous.Nodes) != 0 {
// Register the node in rendezvous point
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
@ -549,17 +547,18 @@ func Execute(options NodeOptions) error {
return nil
}
func processTopics(options NodeOptions) (map[string]struct{}, error) {
func processTopics(options NodeOptions) (map[string][]string, error) {
//Using a map to avoid duplicate pub-sub topics that can result from autosharding
// or same-topic being passed twice.
pubSubTopicMap := make(map[string]struct{})
pubSubTopicMap := make(map[string][]string)
for _, topic := range options.Relay.Topics.Value() {
pubSubTopicMap[topic] = struct{}{}
pubSubTopicMap[topic] = []string{}
}
for _, topic := range options.Relay.PubSubTopics.Value() {
pubSubTopicMap[topic] = struct{}{}
pubSubTopicMap[topic] = []string{}
}
//Get pubSub topics from contentTopics if they are as per autosharding
@ -569,11 +568,14 @@ func processTopics(options NodeOptions) (map[string]struct{}, error) {
return nil, err
}
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount)
pubSubTopicMap[pTopic.String()] = struct{}{}
if _, ok := pubSubTopicMap[pTopic.String()]; !ok {
pubSubTopicMap[pTopic.String()] = []string{}
}
pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopic)
}
//If no topics are passed, then use default waku topic.
if len(pubSubTopicMap) == 0 {
pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{}
pubSubTopicMap[relay.DefaultWakuTopic] = []string{}
}
return pubSubTopicMap, nil

View File

@ -107,7 +107,7 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re
var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), topic)
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
@ -129,18 +129,20 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
sub, err = r.node.Relay().Subscribe(req.Context())
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
sub, err = r.node.Relay().SubscribeToTopic(req.Context(), topic)
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}

View File

@ -11,7 +11,7 @@ type Adder func(msg *protocol.Envelope)
type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
cancel context.CancelFunc
adder Adder
}
@ -26,7 +26,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService
func (r *runnerService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.sub = r.broadcaster.RegisterForAll(1024)
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for {
select {
case <-ctx.Done():

View File

@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -37,9 +38,9 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
require.NoError(t, err)
if isFullNode {
sub, err := n.Relay().SubscribeToTopic(context.Background(), testTopic)
sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
go func() {
for range sub.Ch {
for range sub[0].Ch {
}
}()
require.NoError(t, err)
@ -62,14 +63,15 @@ func TestFilterSubscription(t *testing.T) {
err = node.Start(context.Background())
require.NoError(t, err)
_, err = node.SubscribeToTopic(context.Background(), testTopic)
_, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)
b2 := relay.NewBroadcaster(10)
require.NoError(t, b2.Start(context.Background()))
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
f.SetHost(host)
err = f.Start(context.Background(), relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = f.Start(context.Background(), sub)
require.NoError(t, err)
d := makeFilterService(t, true)

View File

@ -33,6 +33,11 @@ type RelayMessageArgs struct {
Message *RPCWakuMessage `json:"message,omitempty"`
}
// RelayAutoMessageArgs represents the requests used for posting messages
type RelayAutoMessageArgs struct {
Message *RPCWakuMessage `json:"message,omitempty"`
}
// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing
type TopicsArgs struct {
Topics []string `json:"topics,omitempty"`
@ -120,28 +125,97 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return nil
}
// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.
*reply = true
return nil
}
// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.
*reply = true
return nil
}
// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error {
var err error
msg := args.Message.toProto()
if msg == nil {
err := fmt.Errorf("invalid message format received")
r.log.Error("publishing message", zap.Error(err))
return err
}
if msg.ContentTopic == "" {
err := fmt.Errorf("content-topic cannot be empty")
r.log.Error("publishing message", zap.Error(err))
return err
}
if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
}
_, err = r.node.Relay().Publish(req.Context(), msg)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err
}
*reply = true
return nil
}
// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method
// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding.
func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
sub, err := r.node.Relay().GetSubscription(args.Topic)
if err != nil {
return err
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
default:
break
}
return nil
}
// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
var err error
if topic == "" {
topic = relay.DefaultWakuTopic
}
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
sub.Unsubscribe()
} else {
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
r.messagesMutex.Lock()
r.messages[topic] = make([]*pb.WakuMessage, 0)
r.messagesMutex.Unlock()
@ -155,7 +229,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
err := r.node.Relay().Unsubscribe(ctx, topic)
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
return err

View File

@ -9,7 +9,7 @@ type Adder func(msg *protocol.Envelope)
type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
adder Adder
}
@ -21,7 +21,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService
}
func (r *runnerService) Start() {
r.sub = r.broadcaster.RegisterForAll(1024)
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for envelope := range r.sub.Ch {
r.adder(envelope)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -116,13 +117,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string
}
func readLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string) {
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}
for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic {
continue
}

View File

@ -3,7 +3,6 @@ package main
import (
"chat2/pb"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
@ -24,7 +23,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-zerokit-rln/rln"
"golang.org/x/crypto/pbkdf2"
"google.golang.org/protobuf/proto"
)
@ -84,13 +82,13 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
} else {
for _, topic := range topics {
sub, err := node.Relay().SubscribeToTopic(ctx, topic)
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = make(chan *protocol.Envelope)
go func() {
for e := range sub.Ch {
for e := range sub[0].Ch {
chat.C <- e
}
}()
@ -356,12 +354,6 @@ func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Mess
return msg, nil
}
func generateSymKey(password string) []byte {
// AesKeyLength represents the length (in bytes) of an private key
AESKeyLength := 256 / 8
return pbkdf2.Key([]byte(password), nil, 65356, AESKeyLength, sha256.New)
}
func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
defer c.wg.Done()

View File

@ -172,13 +172,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
pubsubTopic := pubSubTopic.String()
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))
if err != nil {
log.Error("Could not subscribe: ", err)
return
}
for value := range sub.Ch {
for value := range sub[0].Ch {
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
if err != nil {
fmt.Println(err)

View File

@ -142,13 +142,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
}
func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic.String())
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic.String()))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}
for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic.String() {
continue
}

View File

@ -69,18 +69,18 @@ func relaySubscribe(topic string) error {
return nil
}
subscription, err := wakuState.node.Relay().SubscribeToTopic(context.Background(), topicToSubscribe)
subscription, err := wakuState.node.Relay().Subscribe(context.Background(), protocol.NewContentFilter(topicToSubscribe))
if err != nil {
return err
}
relaySubscriptions[topicToSubscribe] = subscription
relaySubscriptions[topicToSubscribe] = subscription[0]
go func(subscription *relay.Subscription) {
for envelope := range subscription.Ch {
send("message", toSubscriptionMessage(envelope))
}
}(subscription)
}(subscription[0])
return nil
}
@ -123,5 +123,5 @@ func RelayUnsubscribe(topic string) error {
delete(relaySubscriptions, topicToUnsubscribe)
return wakuState.node.Relay().Unsubscribe(context.Background(), topicToUnsubscribe)
return wakuState.node.Relay().Unsubscribe(context.Background(), protocol.NewContentFilter(topicToUnsubscribe))
}

View File

@ -10,7 +10,9 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -39,10 +41,10 @@ func TestBasicSendingReceiving(t *testing.T) {
require.NoError(t, write(ctx, wakuNode, "test"))
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
value := <-sub.Ch
value := <-sub[0].Ch
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
require.NoError(t, err)
@ -70,6 +72,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro
Timestamp: timestamp,
}
_, err = wakuNode.Relay().Publish(ctx, msg)
_, err = wakuNode.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic)
return err
}

View File

@ -16,5 +16,5 @@ type Service interface {
type ReceptorService interface {
SetHost(h host.Host)
Stop()
Start(context.Context, relay.Subscription) error
Start(context.Context, *relay.Subscription) error
}

View File

@ -679,7 +679,7 @@ func (w *WakuNode) mountDiscV5() error {
return err
}
func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error {
func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) error {
err := w.store.Start(ctx, sub)
if err != nil {
w.log.Error("starting store", zap.Error(err))

View File

@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -54,11 +55,11 @@ func TestWakuNode2(t *testing.T) {
err = wakuNode.Start(ctx)
require.NoError(t, err)
_, err = wakuNode.Relay().SubscribeToTopic(ctx, "waku/rs/1/1")
_, err = wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
require.NoError(t, err)
time.Sleep(time.Second * 1)
err = wakuNode.Relay().Unsubscribe(ctx, "waku/rs/1/1")
err = wakuNode.Relay().Unsubscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
require.NoError(t, err)
defer wakuNode.Stop()
@ -151,9 +152,9 @@ func Test500(t *testing.T) {
time.Sleep(2 * time.Second)
sub1, err := wakuNode1.Relay().Subscribe(ctx)
sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
sub2, err := wakuNode1.Relay().Subscribe(ctx)
sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
wg := sync.WaitGroup{}
@ -168,7 +169,7 @@ func Test500(t *testing.T) {
select {
case <-ticker.C:
require.Fail(t, "Timeout Sub1")
case msg := <-sub1.Ch:
case msg := <-sub1[0].Ch:
if msg == nil {
return
}
@ -189,7 +190,7 @@ func Test500(t *testing.T) {
select {
case <-ticker.C:
require.Fail(t, "Timeout Sub2")
case msg := <-sub2.Ch:
case msg := <-sub2[0].Ch:
if msg == nil {
return
}
@ -206,7 +207,7 @@ func Test500(t *testing.T) {
msg := createTestMsg(0)
msg.Payload = int2Bytes(i)
msg.Timestamp = int64(i)
if _, err := wakuNode2.Relay().Publish(ctx, msg); err != nil {
if _, err := wakuNode2.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil {
require.Fail(t, "Could not publish all messages")
}
time.Sleep(5 * time.Millisecond)
@ -234,9 +235,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode1.Stop()
subs, err := wakuNode1.Relay().Subscribe(ctx)
subs, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
subs.Unsubscribe()
defer subs[0].Unsubscribe()
// NODE2: Filter Client/Store
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
@ -286,7 +287,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
time.Sleep(500 * time.Millisecond)
if _, err := wakuNode1.Relay().Publish(ctx, msg); err != nil {
if _, err := wakuNode1.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil {
require.Fail(t, "Could not publish all messages")
}

View File

@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
@ -232,7 +231,7 @@ func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption {
}
// WithAdvertiseAddresses is a WakuNodeOption that allows overriding the address used in the waku node with custom value
func WithAdvertiseAddresses(advertiseAddrs ...ma.Multiaddr) WakuNodeOption {
func WithAdvertiseAddresses(advertiseAddrs ...multiaddr.Multiaddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.advertiseAddrs = advertiseAddrs
return WithMultiaddress(advertiseAddrs...)(params)

View File

@ -2,6 +2,9 @@ package protocol
import "golang.org/x/exp/maps"
type PubsubTopicStr = string
type ContentTopicStr = string
type ContentTopicSet map[string]struct{}
func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
@ -28,3 +31,40 @@ func (cf ContentFilter) ContentTopicsList() []string {
func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}
func (cf ContentFilter) Equals(cf1 ContentFilter) bool {
if cf.PubsubTopic != cf1.PubsubTopic ||
len(cf.ContentTopics) != len(cf1.ContentTopics) {
return false
}
for topic := range cf.ContentTopics {
_, ok := cf1.ContentTopics[topic]
if !ok {
return false
}
}
return true
}
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) {
pubSubTopicMap := make(map[string][]string)
if contentFilter.PubsubTopic != "" {
pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList()
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, err
}
_, ok := pubSubTopicMap[pTopicStr]
if !ok {
pubSubTopicMap[pTopicStr] = []string{}
}
pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString)
}
}
return pubSubTopicMap, nil
}

View File

@ -10,7 +10,7 @@ import (
// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified.
const DefaultContentTopic = "/waku/2/default-content/proto"
var ErrInvalidFormat = errors.New("invalid format")
var ErrInvalidFormat = errors.New("invalid content topic format")
var ErrMissingGeneration = errors.New("missing part: generation")
var ErrInvalidGeneration = errors.New("generation should be a number")

View File

@ -214,29 +214,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
return nil
}
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func contentFilterToPubSubTopicMap(contentFilter protocol.ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string)
if contentFilter.PubsubTopic != "" {
pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList()
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := protocol.GetPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, err
}
_, ok := pubSubTopicMap[pTopicStr]
if !ok {
pubSubTopicMap[pTopicStr] = []string{}
}
pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString)
}
}
return pubSubTopicMap, nil
}
// Subscribe setups a subscription to receive messages that match a specific content filter
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
@ -273,7 +250,8 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
}
}
pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter)
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return nil, err
}
@ -454,7 +432,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
return nil, err
}
pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter)
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return nil, err
}

View File

@ -63,10 +63,10 @@ func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.
err = relay.Start(context.Background())
s.Require().NoError(err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic))
s.Require().NoError(err)
return relay, sub, host, broadcaster
return relay, sub[0], host, broadcaster
}
func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bool) *WakuFilterLightNode {
@ -97,7 +97,7 @@ func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log)
node2Filter.SetHost(host)
sub := broadcaster.Register(topic)
sub := broadcaster.Register(protocol.NewContentFilter(topic))
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)

View File

@ -31,7 +31,7 @@ const peerHasNoSubscription = "peer has no subscriptions"
type (
WakuFilterFullNode struct {
h host.Host
msgSub relay.Subscription
msgSub *relay.Subscription
metrics Metrics
log *zap.Logger
*protocol.CommonService
@ -66,13 +66,13 @@ func (wf *WakuFilterFullNode) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilterFullNode) Start(ctx context.Context, sub relay.Subscription) error {
func (wf *WakuFilterFullNode) Start(ctx context.Context, sub *relay.Subscription) error {
return wf.CommonService.Start(ctx, func() error {
return wf.start(sub)
})
}
func (wf *WakuFilterFullNode) start(sub relay.Subscription) error {
func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {
wf.h.SetStreamHandlerMatch(FilterSubscribeID_v20beta1, protocol.PrefixTextMatch(string(FilterSubscribeID_v20beta1)), wf.onRequest(wf.Context()))
wf.msgSub = sub

View File

@ -51,7 +51,7 @@ type (
h host.Host
pm *peermanager.PeerManager
isFullNode bool
msgSub relay.Subscription
msgSub *relay.Subscription
metrics Metrics
log *zap.Logger
@ -89,13 +89,13 @@ func (wf *WakuFilter) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilter) Start(ctx context.Context, sub relay.Subscription) error {
func (wf *WakuFilter) Start(ctx context.Context, sub *relay.Subscription) error {
return wf.CommonService.Start(ctx, func() error {
return wf.start(sub)
})
}
func (wf *WakuFilter) start(sub relay.Subscription) error {
func (wf *WakuFilter) start(sub *relay.Subscription) error {
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(wf.Context()))
wf.msgSub = sub
wf.WaitGroup().Add(1)

View File

@ -12,11 +12,15 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
var testTopic = "/waku/2/go/filter/test"
var testContentTopic = "TopicA"
func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
@ -29,10 +33,10 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*
err = relay.Start(context.Background())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic))
require.NoError(t, err)
return relay, sub, host
return relay, sub[0], host
}
func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
@ -46,7 +50,8 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
require.NoError(t, b.Start(context.Background()))
filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
filter.SetHost(host)
err = filter.Start(context.Background(), relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(testTopic, testContentTopic))
err = filter.Start(context.Background(), sub)
require.NoError(t, err)
return filter, host
@ -66,9 +71,6 @@ func TestWakuFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
testTopic := "/waku/2/go/filter/test"
testContentTopic := "TopicA"
node1, host1 := makeWakuFilter(t)
defer node1.Stop()
@ -80,7 +82,7 @@ func TestWakuFilter(t *testing.T) {
node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
node2Filter.SetHost(host2)
sub := broadcaster.Register(testTopic)
sub := broadcaster.Register(protocol.NewContentFilter(testTopic))
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
@ -156,9 +158,6 @@ func TestWakuFilterPeerFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
testTopic := "/waku/2/go/filter/test"
testContentTopic := "TopicA"
node1, host1 := makeWakuFilter(t)
broadcaster := relay.NewBroadcaster(10)
@ -171,7 +170,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger(), WithTimeout(3*time.Second))
node2Filter.SetHost(host2)
sub := broadcaster.Register(testTopic)
sub := broadcaster.Register(protocol.NewContentFilter(testTopic))
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)

View File

@ -33,10 +33,10 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su
err = relay.Start(context.Background())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), pusubTopic)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(pusubTopic))
require.NoError(t, err)
return relay, sub, host
return relay, sub[0], host
}
// Node1: Relay

View File

@ -4,6 +4,7 @@ import (
"context"
n "github.com/waku-org/go-noise"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
@ -16,7 +17,7 @@ type NoiseMessenger interface {
}
type contentTopicSubscription struct {
broadcastSub relay.Subscription
broadcastSub *relay.Subscription
msgChan chan *pb.WakuMessage
}
@ -39,16 +40,19 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic
topic = relay.DefaultWakuTopic
}
subs, err := r.SubscribeToTopic(ctx, topic)
subs, err := r.Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
return nil, err
}
//Note: Safely assuming 0th index as subscription is based on pubSubTopic.
// Once this API is changed to support subscription based on contentTopics, this logic should also be changed.
sub := subs[0]
ctx, cancel := context.WithCancel(ctx)
wr := &NoiseWakuRelay{
relay: r,
relaySub: subs,
relaySub: sub,
cancel: cancel,
timesource: timesource,
broadcaster: relay.NewBroadcaster(1024),
@ -65,10 +69,10 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic
for {
select {
case <-ctx.Done():
subs.Unsubscribe()
sub.Unsubscribe()
wr.broadcaster.Stop()
return
case envelope := <-subs.Ch:
case envelope := <-sub.Ch:
if envelope != nil {
wr.broadcaster.Submit(envelope)
}
@ -84,7 +88,7 @@ func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-c
msgChan: make(chan *pb.WakuMessage, 1024),
}
broadcastSub := r.broadcaster.RegisterForAll(1024)
broadcastSub := r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
sub.broadcastSub = broadcastSub
subscriptionCh := r.subscriptionChPerContentTopic[contentTopic]

View File

@ -9,29 +9,58 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
)
type chStore struct {
type BroadcasterParameters struct {
dontConsume bool //Indicates whether to consume messages from subscription or drop
chLen int
}
type BroadcasterOption func(*BroadcasterParameters)
// WithoutConsumer option let's a user subscribe to a broadcaster without consuming messages received.
// This is useful for a relayNode where only a subscribe is required in order to relay messages in gossipsub network.
func DontConsume() BroadcasterOption {
return func(params *BroadcasterParameters) {
params.dontConsume = true
}
}
// WithBufferSize option let's a user set channel buffer to be set.
func WithBufferSize(size int) BroadcasterOption {
return func(params *BroadcasterParameters) {
params.chLen = size
}
}
// DefaultBroadcasterOptions specifies default options for broadcaster
func DefaultBroadcasterOptions() []BroadcasterOption {
return []BroadcasterOption{
WithBufferSize(0),
}
}
type Subscriptions struct {
mu sync.RWMutex
topicToChans map[string]map[int]chan *protocol.Envelope
topicsToSubs map[string]map[int]*Subscription //map of pubSubTopic to subscriptions
id int
}
func newChStore() chStore {
return chStore{
topicToChans: make(map[string]map[int]chan *protocol.Envelope),
func newSubStore() Subscriptions {
return Subscriptions{
topicsToSubs: make(map[string]map[int]*Subscription),
}
}
func (s *chStore) getNewCh(topic string, chLen int) Subscription {
func (s *Subscriptions) createNewSubscription(contentFilter protocol.ContentFilter, dontConsume bool, chLen int) *Subscription {
ch := make(chan *protocol.Envelope, chLen)
s.mu.Lock()
defer s.mu.Unlock()
s.id++
//
if s.topicToChans[topic] == nil {
s.topicToChans[topic] = make(map[int]chan *protocol.Envelope)
pubsubTopic := contentFilter.PubsubTopic
if s.topicsToSubs[pubsubTopic] == nil {
s.topicsToSubs[pubsubTopic] = make(map[int]*Subscription)
}
id := s.id
s.topicToChans[topic][id] = ch
return Subscription{
sub := Subscription{
ID: id,
// read only channel,will not block forever, returns once closed.
Ch: ch,
// Unsubscribe function is safe, can be called multiple times
@ -39,21 +68,25 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription {
Unsubscribe: func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.topicToChans[topic] == nil {
if s.topicsToSubs[pubsubTopic] == nil {
return
}
if ch := s.topicToChans[topic][id]; ch != nil {
close(ch)
delete(s.topicToChans[topic], id)
if sub := s.topicsToSubs[pubsubTopic][id]; sub != nil {
close(sub.Ch)
delete(s.topicsToSubs[pubsubTopic], id)
}
},
contentFilter: contentFilter,
noConsume: dontConsume,
}
s.topicsToSubs[pubsubTopic][id] = &sub
return &sub
}
func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) {
func (s *Subscriptions) broadcast(ctx context.Context, m *protocol.Envelope) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, ch := range s.topicToChans[m.PubsubTopic()] {
for _, sub := range s.topicsToSubs[m.PubsubTopic()] {
select {
// using ctx.Done for returning on cancellation is needed
// reason:
@ -62,36 +95,40 @@ func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) {
// this will also block the chStore close function as it uses same mutex
case <-ctx.Done():
return
case ch <- m:
default:
sub.Submit(ctx, m)
}
}
// send to all registered subscribers
for _, ch := range s.topicToChans[""] {
// send to all wildcard subscribers
for _, sub := range s.topicsToSubs[""] {
select {
case <-ctx.Done():
return
case ch <- m:
default:
sub.Submit(ctx, m)
}
}
}
func (s *chStore) close() {
func (s *Subscriptions) close() {
s.mu.Lock()
defer s.mu.Unlock()
for _, chans := range s.topicToChans {
for _, ch := range chans {
close(ch)
for _, subs := range s.topicsToSubs {
for _, sub := range subs {
close(sub.Ch)
}
}
s.topicToChans = nil
s.topicsToSubs = nil
}
// Broadcaster is used to create a fanout for an envelope that will be received by any subscriber interested in the topic of the message
type Broadcaster interface {
Start(ctx context.Context) error
Stop()
Register(topic string, chLen ...int) Subscription
RegisterForAll(chLen ...int) Subscription
Register(contentFilter protocol.ContentFilter, opts ...BroadcasterOption) *Subscription
RegisterForAll(opts ...BroadcasterOption) *Subscription
UnRegister(pubsubTopic string)
Submit(*protocol.Envelope)
}
@ -106,7 +143,7 @@ type broadcaster struct {
cancel context.CancelFunc
input chan *protocol.Envelope
//
chStore chStore
subscriptions Subscriptions
running atomic.Bool
}
@ -124,7 +161,7 @@ func (b *broadcaster) Start(ctx context.Context) error {
}
ctx, cancel := context.WithCancel(ctx)
b.cancel = cancel
b.chStore = newChStore()
b.subscriptions = newSubStore()
b.input = make(chan *protocol.Envelope, b.bufLen)
go b.run(ctx)
return nil
@ -137,7 +174,7 @@ func (b *broadcaster) run(ctx context.Context) {
return
case msg, ok := <-b.input:
if ok {
b.chStore.broadcast(ctx, msg)
b.subscriptions.broadcast(ctx, msg)
}
}
}
@ -150,27 +187,40 @@ func (b *broadcaster) Stop() {
}
// cancel must be before chStore.close(), so that broadcast releases lock before chStore.close() acquires it.
b.cancel() // exit the run loop,
b.chStore.close() // close all channels that we send to
b.subscriptions.close() // close all channels that we send to
close(b.input) // close input channel
}
// Register returns a subscription for an specific topic
func (b *broadcaster) Register(topic string, chLen ...int) Subscription {
return b.chStore.getNewCh(topic, getChLen(chLen))
// Register returns a subscription for an specific pubsub topic and/or list of contentTopics
func (b *broadcaster) Register(contentFilter protocol.ContentFilter, opts ...BroadcasterOption) *Subscription {
params := b.ProcessOpts(opts...)
return b.subscriptions.createNewSubscription(contentFilter, params.dontConsume, params.chLen)
}
func (b *broadcaster) ProcessOpts(opts ...BroadcasterOption) *BroadcasterParameters {
params := new(BroadcasterParameters)
optList := DefaultBroadcasterOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
return params
}
// UnRegister removes all subscriptions for an specific pubsub topic
func (b *broadcaster) UnRegister(pubsubTopic string) {
subs := b.subscriptions.topicsToSubs[pubsubTopic]
if len(subs) > 0 {
for _, sub := range subs {
sub.Unsubscribe()
}
}
}
// RegisterForAll returns a subscription for all topics
func (b *broadcaster) RegisterForAll(chLen ...int) Subscription {
return b.chStore.getNewCh("", getChLen(chLen))
}
func getChLen(chLen []int) int {
l := 0
if len(chLen) > 0 {
l = chLen[0]
}
return l
func (b *broadcaster) RegisterForAll(opts ...BroadcasterOption) *Subscription {
params := b.ProcessOpts(opts...)
return b.subscriptions.createNewSubscription(protocol.NewContentFilter(""), params.dontConsume, params.chLen)
}
// Submit is used to broadcast messages to subscribers. It only accepts value when running.

View File

@ -46,7 +46,7 @@ func TestBroadcastSpecificTopic(t *testing.T) {
for i := 0; i < 5; i++ {
wg.Add(1)
sub := b.Register("abc")
sub := b.Register(protocol.NewContentFilter("abc"))
go func() {
defer wg.Done()
@ -66,7 +66,7 @@ func TestBroadcastSpecificTopic(t *testing.T) {
func TestBroadcastCleanup(t *testing.T) {
b := NewBroadcaster(100)
require.NoError(t, b.Start(context.Background()))
sub := b.Register("test")
sub := b.Register(protocol.NewContentFilter("test"))
b.Stop()
<-sub.Ch
sub.Unsubscribe()
@ -78,7 +78,7 @@ func TestBroadcastUnregisterSub(t *testing.T) {
require.NoError(t, b.Start(context.Background()))
subForAll := b.RegisterForAll()
// unregister before submit
specificSub := b.Register("abc")
specificSub := b.Register(protocol.NewContentFilter("abc"))
specificSub.Unsubscribe()
//
env := protocol.NewEnvelope(&pb.WakuMessage{}, utils.GetUnixEpoch(), "abc")

View File

@ -0,0 +1,130 @@
package relay
import (
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/hash"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
)
var DefaultRelaySubscriptionBufferSize int = 1024
type RelaySubscribeParameters struct {
dontConsume bool
}
type RelaySubscribeOption func(*RelaySubscribeParameters) error
// WithoutConsumer option let's a user subscribe to relay without consuming messages received.
// This is useful for a relayNode where only a subscribe is required in order to relay messages in gossipsub network.
func WithoutConsumer() RelaySubscribeOption {
return func(params *RelaySubscribeParameters) error {
params.dontConsume = true
return nil
}
}
func msgIDFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
func (w *WakuRelay) setDefaultPeerScoreParams() {
w.peerScoreParams = &pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
DecayInterval: 12 * time.Second, // how often peer scoring is updated
DecayToZero: 0.01, // below this we consider the parameter to be zero
RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects
// p5: application specific, unset
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 0.0,
// p6: penalizes peers sharing more than threshold ips
IPColocationFactorWeight: -50,
IPColocationFactorThreshold: 5.0,
// p7: penalizes bad behaviour (weight and decay)
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: 0.986,
}
w.peerScoreThresholds = &pubsub.PeerScoreThresholds{
GossipThreshold: -100, // no gossip is sent to peers below this score
PublishThreshold: -1000, // no self-published msgs are sent to peers below this score
GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score
OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset.
}
}
func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option {
cfg := pubsub.DefaultGossipSubParams()
cfg.PruneBackoff = time.Minute
cfg.UnsubscribeBackoff = 5 * time.Second
cfg.GossipFactor = 0.25
cfg.D = waku_proto.GossipSubOptimalFullMeshSize
cfg.Dlo = 4
cfg.Dhi = 12
cfg.Dout = 3
cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize
cfg.HeartbeatInterval = time.Second
cfg.HistoryLength = 6
cfg.HistoryGossip = 3
cfg.FanoutTTL = time.Minute
w.setDefaultPeerScoreParams()
w.setDefaultTopicParams()
return []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(msgIDFn),
pubsub.WithGossipSubProtocols(
[]protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
switch feat {
case pubsub.GossipSubFeatureMesh:
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 || proto == WakuRelayID_v200
case pubsub.GossipSubFeaturePX:
return proto == pubsub.GossipSubID_v11 || proto == WakuRelayID_v200
default:
return false
}
},
),
pubsub.WithGossipSubParams(cfg),
pubsub.WithFloodPublish(true),
pubsub.WithSeenMessagesTTL(2 * time.Minute),
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
}
}
func (w *WakuRelay) setDefaultTopicParams() {
w.topicParams = &pubsub.TopicScoreParams{
TopicWeight: 1,
// p1: favours peers already in the mesh
TimeInMeshWeight: 0.01,
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 10.0,
// p2: rewards fast peers
FirstMessageDeliveriesWeight: 1.0,
FirstMessageDeliveriesDecay: 0.5,
FirstMessageDeliveriesCap: 10.0,
// p3: penalizes lazy peers. safe low value
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
MeshMessageDeliveriesCap: 0,
MeshMessageDeliveriesThreshold: 0,
MeshMessageDeliveriesWindow: 0,
MeshMessageDeliveriesActivation: 0,
// p3b: tracks history of prunes
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
// p4: penalizes invalid messages. highly penalize peers sending wrong messages
InvalidMessageDeliveriesWeight: -100.0,
InvalidMessageDeliveriesDecay: 0.5,
}
}

View File

@ -1,32 +1,57 @@
package relay
import "github.com/waku-org/go-waku/waku/v2/protocol"
import (
"context"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/slices"
)
// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
type Subscription struct {
Unsubscribe func()
Ch <-chan *protocol.Envelope
ID int
Unsubscribe func() //for internal use only. For relay Subscription use relay protocol's unsubscribe
Ch chan *protocol.Envelope
contentFilter protocol.ContentFilter
subType SubscriptionType
noConsume bool
}
// NoopSubscription creates a noop subscription that will not receive any envelope
func NoopSubscription() Subscription {
type SubscriptionType int
const (
SpecificContentTopics SubscriptionType = iota
AllContentTopics
)
// Submit allows a message to be submitted for a subscription
func (s *Subscription) Submit(ctx context.Context, msg *protocol.Envelope) {
//Filter and notify
// - if contentFilter doesn't have a contentTopic
// - if contentFilter has contentTopics and it matches with message
if !s.noConsume && (len(s.contentFilter.ContentTopicsList()) == 0 ||
(len(s.contentFilter.ContentTopicsList()) > 0 && slices.Contains[string](s.contentFilter.ContentTopicsList(), msg.Message().ContentTopic))) {
select {
case <-ctx.Done():
return
case s.Ch <- msg:
}
}
}
// NewSubscription creates a subscription that will only receive messages based on the contentFilter
func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
ch := make(chan *protocol.Envelope)
var subType SubscriptionType
if len(contentFilter.ContentTopicsList()) == 0 {
subType = AllContentTopics
}
return &Subscription{
Unsubscribe: func() {
close(ch)
return Subscription{
Unsubscribe: func() {},
Ch: ch,
}
}
// ArraySubscription creates a subscription for a list of envelopes
func ArraySubscription(msgs []*protocol.Envelope) Subscription {
ch := make(chan *protocol.Envelope, len(msgs))
for _, msg := range msgs {
ch <- msg
}
close(ch)
return Subscription{
Unsubscribe: func() {},
},
Ch: ch,
contentFilter: contentFilter,
subType: subType,
}
}

View File

@ -0,0 +1,102 @@
package relay
import (
"context"
"errors"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
)
// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
type EvtRelaySubscribed struct {
Topic string
TopicInst *pubsub.Topic
}
// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed
type EvtRelayUnsubscribed struct {
Topic string
}
type PeerTopicState int
const (
PEER_JOINED = iota
PEER_LEFT
)
type EvtPeerTopic struct {
PubsubTopic string
PeerID peer.ID
State PeerTopicState
}
// Events returns the event bus on which WakuRelay events will be emitted
func (w *WakuRelay) Events() event.Bus {
return w.events
}
func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.TopicEventHandler, error) {
handler, err := topic.EventHandler()
if err != nil {
return nil, err
}
w.WaitGroup().Add(1)
go w.topicEventPoll(topic.String(), handler)
return handler, nil
}
func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) {
defer w.WaitGroup().Done()
for {
evt, err := handler.NextPeerEvent(w.Context())
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
break
}
w.log.Error("failed to get next peer event", zap.String("topic", topic), zap.Error(err))
continue
}
if evt.Peer.Validate() != nil { //Empty peerEvent is returned when context passed in done.
break
}
if evt.Type == pubsub.PeerJoin {
w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_JOINED})
if err != nil {
w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err))
}
} else if evt.Type == pubsub.PeerLeave {
w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_LEFT})
if err != nil {
w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err))
}
} else {
w.log.Error("unknown event type received", zap.String("topic", topic),
zap.Int("eventType", int(evt.Type)))
}
}
}
func (w *WakuRelay) CreateEventEmitters() error {
var err error
w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed))
if err != nil {
return err
}
w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed))
if err != nil {
return err
}
w.emitters.EvtPeerTopic, err = w.events.Emitter(new(EvtPeerTopic))
if err != nil {
return err
}
return nil
}

View File

@ -3,9 +3,7 @@ package relay
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
@ -17,9 +15,7 @@ import (
proto "google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/hash"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
@ -65,40 +61,13 @@ type WakuRelay struct {
EvtRelayUnsubscribed event.Emitter
EvtPeerTopic event.Emitter
}
contentSubs map[string]map[int]*Subscription
*waku_proto.CommonService
}
// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
type EvtRelaySubscribed struct {
Topic string
TopicInst *pubsub.Topic
}
// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed
type EvtRelayUnsubscribed struct {
Topic string
}
type PeerTopicState int
const (
PEER_JOINED = iota
PEER_LEFT
)
type EvtPeerTopic struct {
PubsubTopic string
PeerID peer.ID
State PeerTopicState
}
func msgIDFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource,
reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
w := new(WakuRelay)
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
@ -112,95 +81,9 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.events = eventbus.NewBus()
w.metrics = newMetrics(reg, w.log)
cfg := pubsub.DefaultGossipSubParams()
cfg.PruneBackoff = time.Minute
cfg.UnsubscribeBackoff = 5 * time.Second
cfg.GossipFactor = 0.25
cfg.D = waku_proto.GossipSubOptimalFullMeshSize
cfg.Dlo = 4
cfg.Dhi = 12
cfg.Dout = 3
cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize
cfg.HeartbeatInterval = time.Second
cfg.HistoryLength = 6
cfg.HistoryGossip = 3
cfg.FanoutTTL = time.Minute
w.peerScoreParams = &pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
DecayInterval: 12 * time.Second, // how often peer scoring is updated
DecayToZero: 0.01, // below this we consider the parameter to be zero
RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects
// p5: application specific, unset
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 0.0,
// p6: penalizes peers sharing more than threshold ips
IPColocationFactorWeight: -50,
IPColocationFactorThreshold: 5.0,
// p7: penalizes bad behaviour (weight and decay)
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: 0.986,
}
w.peerScoreThresholds = &pubsub.PeerScoreThresholds{
GossipThreshold: -100, // no gossip is sent to peers below this score
PublishThreshold: -1000, // no self-published msgs are sent to peers below this score
GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score
OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset.
}
w.topicParams = &pubsub.TopicScoreParams{
TopicWeight: 1,
// p1: favours peers already in the mesh
TimeInMeshWeight: 0.01,
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 10.0,
// p2: rewards fast peers
FirstMessageDeliveriesWeight: 1.0,
FirstMessageDeliveriesDecay: 0.5,
FirstMessageDeliveriesCap: 10.0,
// p3: penalizes lazy peers. safe low value
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
MeshMessageDeliveriesCap: 0,
MeshMessageDeliveriesThreshold: 0,
MeshMessageDeliveriesWindow: 0,
MeshMessageDeliveriesActivation: 0,
// p3b: tracks history of prunes
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
// p4: penalizes invalid messages. highly penalize peers sending wrong messages
InvalidMessageDeliveriesWeight: -100.0,
InvalidMessageDeliveriesDecay: 0.5,
}
// default options required by WakuRelay
w.opts = append([]pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(msgIDFn),
pubsub.WithGossipSubProtocols(
[]protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
switch feat {
case pubsub.GossipSubFeatureMesh:
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 || proto == WakuRelayID_v200
case pubsub.GossipSubFeaturePX:
return proto == pubsub.GossipSubID_v11 || proto == WakuRelayID_v200
default:
return false
}
},
),
pubsub.WithGossipSubParams(cfg),
pubsub.WithFloodPublish(true),
pubsub.WithSeenMessagesTTL(2 * time.Minute),
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
}, opts...)
w.opts = append(w.defaultPubsubOptions(), opts...)
w.contentSubs = make(map[string]map[int]*Subscription)
return w
}
@ -231,22 +114,16 @@ func (w *WakuRelay) Start(ctx context.Context) error {
}
func (w *WakuRelay) start() error {
if w.bcaster == nil {
return errors.New("broadcaster not specified for relay")
}
ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.opts...)
if err != nil {
return err
}
w.pubsub = ps
w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed))
if err != nil {
return err
}
w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed))
if err != nil {
return err
}
w.emitters.EvtPeerTopic, err = w.events.Emitter(new(EvtPeerTopic))
err = w.CreateEventEmitters()
if err != nil {
return err
}
@ -312,7 +189,7 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
return pubSubTopic, nil
}
func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) {
func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscription, err error) {
sub, ok := w.relaySubs[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
@ -320,11 +197,14 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
return nil, err
}
sub, err = pubSubTopic.Subscribe()
sub, err = pubSubTopic.Subscribe(pubsub.WithBufferSize(1024))
if err != nil {
return nil, err
}
w.WaitGroup().Add(1)
go w.pubsubTopicMsgHandler(topic, sub)
evtHandler, err := w.addPeerTopicEventListener(pubSubTopic)
if err != nil {
return nil, err
@ -337,10 +217,6 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
return nil, err
}
if w.bcaster != nil {
w.WaitGroup().Add(1)
go w.subscribeToTopic(topic, sub)
}
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
}
@ -385,9 +261,29 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage,
return hash, nil
}
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic
// Publish is used to broadcast a WakuMessage, the pubsubTopic is derived from contentTopic specified in the message via autosharding.
// To publish to a specific pubsubTopic, please use PublishToTopic
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
return w.PublishToTopic(ctx, message, DefaultWakuTopic)
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
return w.PublishToTopic(ctx, message, pubSubTopic)
}
func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil, err
}
contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic)
cSubs := w.contentSubs[pubSubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
return sub, nil
}
}
return nil, errors.New("no subscription found for content topic")
}
// Stop unmounts the relay protocol and stops all subscriptions
@ -409,72 +305,154 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish
}
// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic
func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) {
_, err := w.subscribe(topic)
// subscribe returns list of Subscription to receive messages based on content filter
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
var subscriptions []*Subscription
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return nil, err
}
params := new(RelaySubscribeParameters)
// Create client subscription
subscription := NoopSubscription()
if w.bcaster != nil {
subscription = w.bcaster.Register(topic, 1024)
var optList []RelaySubscribeOption
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}
for pubSubTopic, cTopics := range pubSubTopicMap {
w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics))
var cFilter waku_proto.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = waku_proto.NewContentTopicSet(cTopics...)
//Check if gossipsub subscription already exists for pubSubTopic
if !w.IsSubscribed(pubSubTopic) {
_, err := w.subscribeToPubsubTopic(cFilter.PubsubTopic)
if err != nil {
//TODO: Handle partial errors.
return nil, err
}
}
subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize))
// Create Content subscription
w.topicsMutex.RLock()
if _, ok := w.contentSubs[pubSubTopic]; !ok {
w.contentSubs[pubSubTopic] = map[int]*Subscription{}
}
w.contentSubs[pubSubTopic][subscription.ID] = subscription
w.topicsMutex.RUnlock()
subscriptions = append(subscriptions, subscription)
go func() {
<-ctx.Done()
subscription.Unsubscribe()
}()
return &subscription, nil
}
// Subscribe returns a Subscription to receive messages from the default waku pubsub topic
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
return subscriptions, nil
}
// Subscribe returns a Subscription to receive messages as per contentFilter
// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding)
func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
return w.subscribe(ctx, contentFilter, opts...)
}
// Unsubscribe closes a subscription to a pubsub topic
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error {
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return err
}
w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()
sub, ok := w.relaySubs[topic]
for pubSubTopic, cTopics := range pubSubTopicMap {
cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...)
pubsubUnsubscribe := false
sub, ok := w.relaySubs[pubSubTopic]
if !ok {
return fmt.Errorf("not subscribed to topic")
return errors.New("not subscribed to topic")
}
w.log.Info("unsubscribing from topic", zap.String("topic", sub.Topic()))
cSubs := w.contentSubs[pubSubTopic]
if cSubs != nil {
//Remove relevant subscription
for subID, sub := range cSubs {
if sub.contentFilter.Equals(cfTemp) {
sub.Unsubscribe()
delete(cSubs, subID)
}
}
if len(cSubs) == 0 {
pubsubUnsubscribe = true
}
} else {
//Should not land here ideally
w.log.Error("pubsub subscriptions exists, but contentSubscription doesn't for contentFilter",
zap.String("pubsubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics))
w.relaySubs[topic].Cancel()
delete(w.relaySubs, topic)
evtHandler, ok := w.topicEvtHanders[topic]
if ok {
evtHandler.Cancel()
delete(w.topicEvtHanders, topic)
return errors.New("unexpected error in unsubscribe")
}
err := w.wakuRelayTopics[topic].Close()
if pubsubUnsubscribe {
err = w.unsubscribeFromPubsubTopic(sub)
if err != nil {
return err
}
delete(w.wakuRelayTopics, topic)
w.RemoveTopicValidator(topic)
err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{topic})
if err != nil {
return err
}
}
return nil
}
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func() {
defer close(msgChannel)
// unsubscribeFromPubsubTopic unsubscribes subscription from underlying pubsub.
// Note: caller has to acquire topicsMutex in order to avoid race conditions
func (w *WakuRelay) unsubscribeFromPubsubTopic(sub *pubsub.Subscription) error {
pubSubTopic := sub.Topic()
w.log.Info("unsubscribing from topic", zap.String("topic", pubSubTopic))
sub.Cancel()
delete(w.relaySubs, pubSubTopic)
w.bcaster.UnRegister(pubSubTopic)
delete(w.contentSubs, pubSubTopic)
evtHandler, ok := w.topicEvtHanders[pubSubTopic]
if ok {
evtHandler.Cancel()
delete(w.topicEvtHanders, pubSubTopic)
}
err := w.wakuRelayTopics[pubSubTopic].Close()
if err != nil {
return err
}
delete(w.wakuRelayTopics, pubSubTopic)
w.RemoveTopicValidator(pubSubTopic)
err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic})
if err != nil {
return err
}
return nil
}
func (w *WakuRelay) pubsubTopicMsgHandler(pubsubTopic string, sub *pubsub.Subscription) {
defer w.WaitGroup().Done()
for {
msg, err := sub.Next(ctx)
msg, err := sub.Next(w.Context())
if err != nil {
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
@ -482,40 +460,16 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
sub.Cancel()
return
}
msgChannel <- msg
}
}()
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscription) {
defer w.WaitGroup().Done()
subChannel := w.nextMessage(w.Context(), sub)
for {
select {
case <-w.Context().Done():
return
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg, ok := <-subChannel:
if !ok {
return
}
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
w.log.Error("decoding message", zap.Error(err))
return
}
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)
w.metrics.RecordMessage(envelope)
if w.bcaster != nil {
w.bcaster.Submit(envelope)
}
}
}
}
@ -523,51 +477,3 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio
func (w *WakuRelay) Params() pubsub.GossipSubParams {
return w.params
}
// Events returns the event bus on which WakuRelay events will be emitted
func (w *WakuRelay) Events() event.Bus {
return w.events
}
func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.TopicEventHandler, error) {
handler, err := topic.EventHandler()
if err != nil {
return nil, err
}
w.WaitGroup().Add(1)
go w.topicEventPoll(topic.String(), handler)
return handler, nil
}
func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) {
defer w.WaitGroup().Done()
for {
evt, err := handler.NextPeerEvent(w.Context())
if err != nil {
if err == context.Canceled {
break
}
w.log.Error("failed to get next peer event", zap.String("topic", topic), zap.Error(err))
continue
}
if evt.Peer.Validate() != nil { //Empty peerEvent is returned when context passed in done.
break
}
if evt.Type == pubsub.PeerJoin {
w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_JOINED})
if err != nil {
w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err))
}
} else if evt.Type == pubsub.PeerLeave {
w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_LEFT})
if err != nil {
w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err))
}
} else {
w.log.Error("unknown event type received", zap.String("topic", topic),
zap.Int("eventType", int(evt.Type)))
}
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"sync"
"testing"
"time"
@ -13,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -26,14 +28,18 @@ func TestWakuRelay(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
sub, err := relay.subscribe(testTopic)
defer sub.Cancel()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(testTopic), true)
@ -47,9 +53,8 @@ func TestWakuRelay(t *testing.T) {
go func() {
defer cancel()
_, err := sub.Next(ctx)
require.NoError(t, err)
msg := <-subs[0].Ch
fmt.Println("msg received ", msg)
}()
msg := &pb.WakuMessage{
@ -63,9 +68,8 @@ func TestWakuRelay(t *testing.T) {
time.Sleep(2 * time.Second)
err = relay.Unsubscribe(ctx, testTopic)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(testTopic))
require.NoError(t, err)
<-ctx.Done()
}
@ -74,9 +78,12 @@ func createRelayNode(t *testing.T) (host.Host, *WakuRelay) {
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = bcaster.Start(context.Background())
require.NoError(t, err)
return host, relay
}
@ -102,7 +109,7 @@ func TestGossipsubScore(t *testing.T) {
require.NoError(t, err)
}
sub, err := relay[i].subscribe(testTopic)
sub, err := relay[i].subscribeToPubsubTopic(testTopic)
require.NoError(t, err)
go func() {
for {
@ -160,3 +167,147 @@ func TestMsgID(t *testing.T) {
require.Equal(t, expectedMsgIDBytes, []byte(msgID))
}
func waitForTimeout(t *testing.T, ch chan *protocol.Envelope) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case env, ok := <-ch:
if ok {
t.Error("should not receive another message with payload", string(env.Message().Payload))
}
case <-time.After(2 * time.Second):
// Timeout elapsed, all good
}
}()
wg.Wait()
}
func waitForMsg(t *testing.T, ch chan *protocol.Envelope, cTopicExpected string) *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case env := <-ch:
fmt.Println("msg received", env)
require.Equal(t, cTopicExpected, env.Message().GetContentTopic())
case <-time.After(5 * time.Second):
t.Error("Message timeout")
}
}()
return &wg
}
func TestWakuRelayAutoShard(t *testing.T) {
testcTopic := "/toychat/2/huilong/proto"
testcTopic1 := "/toychat/1/huilong/proto"
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
defer bcaster.Stop()
//Create a contentTopic level subscription
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)
sub, err := relay.GetSubscription(testcTopic)
require.NoError(t, err)
_, ok := sub.contentFilter.ContentTopics[testcTopic]
require.Equal(t, true, ok)
_, err = relay.GetSubscription(testcTopic1)
require.Error(t, err)
topics := relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, subs[0].contentFilter.PubsubTopic, topics[0])
ctx, cancel := context.WithCancel(context.Background())
bytesToSend := []byte{1}
defer cancel()
//Create a pubSub level subscription
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
msg := &pb.WakuMessage{
Payload: bytesToSend,
Version: 0,
ContentTopic: testcTopic,
Timestamp: 0,
}
_, err = relay.Publish(context.Background(), msg)
require.NoError(t, err)
wg := waitForMsg(t, subs[0].Ch, testcTopic)
wg.Wait()
wg = waitForMsg(t, subs1[0].Ch, testcTopic)
wg.Wait()
//Test publishing to different content-topic
msg1 := &pb.WakuMessage{
Payload: bytesToSend,
Version: 0,
ContentTopic: testcTopic1,
Timestamp: 0,
}
_, err = relay.PublishToTopic(context.Background(), msg1, subs[0].contentFilter.PubsubTopic)
require.NoError(t, err)
wg = waitForMsg(t, subs1[0].Ch, testcTopic1)
wg.Wait()
//Should not receive message as subscription is for a different cTopic.
waitForTimeout(t, subs[0].Ch)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
_, err = relay.GetSubscription(testcTopic)
require.Error(t, err)
_, err = relay.GetSubscription(testcTopic1)
require.Error(t, err)
topics = relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, subs[0].contentFilter.PubsubTopic, topics[0])
wg2 := waitForMsg(t, subs1[0].Ch, testcTopic1)
msg2 := &pb.WakuMessage{
Payload: bytesToSend,
Version: 0,
ContentTopic: testcTopic1,
Timestamp: 1,
}
_, err = relay.PublishToTopic(context.Background(), msg2, subs[0].contentFilter.PubsubTopic)
require.NoError(t, err)
wg2.Wait()
err = relay.Unsubscribe(ctx, protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
}

View File

@ -32,9 +32,11 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
bcaster := relay.NewBroadcaster(1024)
relay := relay.NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = bcaster.Start(context.Background())
s.Require().NoError(err)
err = relay.Start(context.Background())
s.Require().NoError(err)
defer relay.Stop()

View File

@ -47,7 +47,8 @@ func TestResume(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx, relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
@ -69,7 +70,9 @@ func TestResume(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()
@ -107,7 +110,9 @@ func TestResumeWithListOfPeers(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx, relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
@ -121,7 +126,9 @@ func TestResumeWithListOfPeers(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()
@ -148,7 +155,9 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx, relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
@ -162,7 +171,9 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()

View File

@ -52,7 +52,7 @@ type WakuStore struct {
cancel context.CancelFunc
timesource timesource.Timesource
metrics Metrics
MsgC relay.Subscription
MsgC *relay.Subscription
wg *sync.WaitGroup
log *zap.Logger

View File

@ -85,7 +85,7 @@ type MessageProvider interface {
type Store interface {
SetHost(h host.Host)
Start(context.Context, relay.Subscription) error
Start(context.Context, *relay.Subscription) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
@ -104,7 +104,7 @@ func (store *WakuStore) SetHost(h host.Host) {
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context, sub relay.Subscription) error {
func (store *WakuStore) Start(ctx context.Context, sub *relay.Subscription) error {
if store.started {
return nil
}

View File

@ -17,6 +17,19 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
)
// SimulateSubscription creates a subscription for a list of envelopes
func SimulateSubscription(msgs []*protocol.Envelope) *relay.Subscription {
ch := make(chan *protocol.Envelope, len(msgs))
for _, msg := range msgs {
ch <- msg
}
close(ch)
return &relay.Subscription{
Unsubscribe: func() {},
Ch: ch,
}
}
func TestWakuStoreProtocolQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -38,7 +51,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
}
// Simulate a message has been received via relay protocol
sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)})
sub := SimulateSubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)})
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
@ -47,7 +60,9 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()
@ -88,7 +103,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) {
}
// Simulate a message has been received via relay protocol
sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)})
sub := SimulateSubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)})
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
@ -127,7 +142,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
msg4 := tests.CreateWakuMessage(topic1, now+4)
msg5 := tests.CreateWakuMessage(topic1, now+5)
sub := relay.ArraySubscription([]*protocol.Envelope{
sub := SimulateSubscription([]*protocol.Envelope{
protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1),
@ -146,7 +161,9 @@ func TestWakuStoreProtocolNext(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()
@ -202,7 +219,7 @@ func TestWakuStoreResult(t *testing.T) {
msg4 := tests.CreateWakuMessage(topic1, now+4)
msg5 := tests.CreateWakuMessage(topic1, now+5)
sub := relay.ArraySubscription([]*protocol.Envelope{
sub := SimulateSubscription([]*protocol.Envelope{
protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1),
@ -221,7 +238,9 @@ func TestWakuStoreResult(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()
@ -296,7 +315,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
msg8 := tests.CreateWakuMessage(topic1, now+8)
msg9 := tests.CreateWakuMessage(topic1, now+9)
sub := relay.ArraySubscription([]*protocol.Envelope{
sub := SimulateSubscription([]*protocol.Envelope{
protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1),
@ -320,7 +339,10 @@ func TestWakuStoreProtocolFind(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = s2.Start(ctx, sub1)
require.NoError(t, err)
defer s2.Stop()