From dbd7a1c2d739a7fd475ebfabccf038eb51d52dc7 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 29 Jul 2021 18:08:53 -0400 Subject: [PATCH] WIP: replace go-wakurelay-pubsub by go-libp2p-pubsub --- waku/v2/node/wakunode2.go | 48 +++++++++++++++++++++++- waku/v2/node/wakuoptions.go | 13 ++++--- waku/v2/protocol/relay/waku_relay.go | 55 +++++++++++++++++++++------- 3 files changed, 95 insertions(+), 21 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index c66926f1..3e7d650d 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -16,6 +16,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" p2pproto "github.com/libp2p/go-libp2p-core/protocol" + peerstore "github.com/libp2p/go-libp2p-peerstore" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" @@ -29,7 +31,6 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" - wakurelay "github.com/status-im/go-wakurelay-pubsub" ) var log = logging.Logger("wakunode") @@ -202,6 +203,49 @@ func (w *WakuNode) ID() string { return w.host.ID().Pretty() } +func (w *WakuNode) GetPeerStats() PeerStats { + return w.peers +} + +func (w *WakuNode) IsOnline() bool { + hasRelay := false + hasLightPush := false + hasStore := false + hasFilter := false + for _, v := range w.peers { + for _, protocol := range v { + if !hasRelay && protocol == string(relay.WakuRelayID_v200) { + hasRelay = true + } + if !hasLightPush && protocol == string(lightpush.WakuLightPushProtocolId) { + hasLightPush = true + } + if !hasStore && protocol == string(store.WakuStoreProtocolId) { + hasStore = true + } + if !hasFilter && protocol == string(filter.WakuFilterProtocolId) { + hasFilter = true + } + if hasRelay || hasLightPush && (hasStore || hasFilter) { + return true + } + } + } + + return false +} + +func (w *WakuNode) HasHistory() bool { + for _, v := range w.peers { + for _, protocol := range v { + if protocol == string(store.WakuStoreProtocolId) { + return true + } + } + } + return false +} + func (w *WakuNode) ListenAddresses() []ma.Multiaddr { hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) var result []ma.Multiaddr @@ -219,7 +263,7 @@ func (w *WakuNode) Filter() *filter.WakuFilter { return w.filter } -func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...wakurelay.Option) error { +func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) error { var err error w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index a70e54fb..70335278 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -8,11 +8,11 @@ import ( "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" + pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" rendezvous "github.com/status-im/go-waku-rendezvous" "github.com/status-im/go-waku/waku/v2/protocol/store" - wakurelay "github.com/status-im/go-wakurelay-pubsub" ) // Default clientId @@ -25,7 +25,7 @@ type WakuNodeParameters struct { enableRelay bool enableFilter bool - wOpts []wakurelay.Option + wOpts []pubsub.Option enableStore bool shouldResume bool @@ -94,7 +94,7 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption { // WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption // accepts a list of WakuRelay gossipsub option to setup the protocol -func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { +func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableRelay = true params.wOpts = opts @@ -102,7 +102,7 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { } } -func WithRendezvous(discoverOpts ...wakurelay.DiscoverOpt) WakuNodeOption { +func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableRendezvous = true params.rendezvousOpts = discoverOpts @@ -118,8 +118,9 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption { } } -// WithWakuFilter enables the Waku V2 Filter protocol. -func WithWakuFilter() WakuNodeOption { +// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption +// accepts a list of WakuFilter gossipsub options to setup the protocol +func WithWakuFilter(opts ...pubsub.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilter = true return nil diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index fa2a684f..5be048c7 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -2,42 +2,71 @@ package relay import ( "context" + "crypto/sha256" "errors" "sync" proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/protocol" - "github.com/status-im/go-waku/waku/v2/protocol" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb" - wakurelay "github.com/status-im/go-wakurelay-pubsub" + + pubsub "github.com/libp2p/go-libp2p-pubsub" ) var log = logging.Logger("wakurelay") type Topic string +const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") const DefaultWakuTopic Topic = "/waku/2/default-waku/proto" type WakuRelay struct { host host.Host - pubsub *wakurelay.PubSub + pubsub *pubsub.PubSub topics map[Topic]bool topicsMutex sync.Mutex - wakuRelayTopics map[Topic]*wakurelay.Topic - relaySubs map[Topic]*wakurelay.Subscription + wakuRelayTopics map[Topic]*pubsub.Topic + relaySubs map[Topic]*pubsub.Subscription } -func NewWakuRelay(ctx context.Context, h host.Host, opts ...wakurelay.Option) (*WakuRelay, error) { +// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn +func msgIdFn(pmsg *pubsub_pb.Message) string { + hash := sha256.Sum256(pmsg.Data) + return string(hash[:]) +} + +func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h w.topics = make(map[Topic]bool) - w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic) - w.relaySubs = make(map[Topic]*wakurelay.Subscription) + w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) + w.relaySubs = make(map[Topic]*pubsub.Subscription) - ps, err := wakurelay.NewWakuRelaySubWithMatcherFunc(ctx, h, protocol.PrefixTextMatch, opts...) + // default options required by WakuRelay + opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) + opts = append(opts, pubsub.WithNoAuthor()) + opts = append(opts, pubsub.WithMessageIdFn(msgIdFn)) + + opts = append(opts, pubsub.WithGossipSubProtocols( + []protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200}, + func (feat GossipSubFeature, proto protocol.ID) bool { + switch feat { + case GossipSubFeatureMesh: + return proto == GossipSubID_v11 || proto == GossipSubID_v10 + case GossipSubFeaturePX: + return proto == GossipSubID_v11 + default: + return false + } + } + )) + + ps, err := pubsub.NewGossipSub(ctx, h, opts...) if err != nil { return nil, err } @@ -48,7 +77,7 @@ func NewWakuRelay(ctx context.Context, h host.Host, opts ...wakurelay.Option) (* return w, nil } -func (w *WakuRelay) PubSub() *wakurelay.PubSub { +func (w *WakuRelay) PubSub() *pubsub.PubSub { return w.pubsub } @@ -63,11 +92,11 @@ func (w *WakuRelay) Topics() []Topic { return result } -func (w *WakuRelay) SetPubSub(pubSub *wakurelay.PubSub) { +func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { w.pubsub = pubSub } -func (w *WakuRelay) upsertTopic(topic Topic) (*wakurelay.Topic, error) { +func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() @@ -84,7 +113,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*wakurelay.Topic, error) { return pubSubTopic, nil } -func (w *WakuRelay) Subscribe(topic Topic) (subs *wakurelay.Subscription, isNew bool, err error) { +func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew bool, err error) { sub, ok := w.relaySubs[topic] if !ok {