WIP: replace go-wakurelay-pubsub by go-libp2p-pubsub

This commit is contained in:
Richard Ramos 2021-07-29 18:08:53 -04:00
parent 761ae88bbd
commit dbd7a1c2d7
3 changed files with 95 additions and 21 deletions

View File

@ -16,6 +16,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
p2pproto "github.com/libp2p/go-libp2p-core/protocol"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
@ -29,7 +31,6 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
wakurelay "github.com/status-im/go-wakurelay-pubsub"
)
var log = logging.Logger("wakunode")
@ -202,6 +203,49 @@ func (w *WakuNode) ID() string {
return w.host.ID().Pretty()
}
func (w *WakuNode) GetPeerStats() PeerStats {
return w.peers
}
func (w *WakuNode) IsOnline() bool {
hasRelay := false
hasLightPush := false
hasStore := false
hasFilter := false
for _, v := range w.peers {
for _, protocol := range v {
if !hasRelay && protocol == string(relay.WakuRelayID_v200) {
hasRelay = true
}
if !hasLightPush && protocol == string(lightpush.WakuLightPushProtocolId) {
hasLightPush = true
}
if !hasStore && protocol == string(store.WakuStoreProtocolId) {
hasStore = true
}
if !hasFilter && protocol == string(filter.WakuFilterProtocolId) {
hasFilter = true
}
if hasRelay || hasLightPush && (hasStore || hasFilter) {
return true
}
}
}
return false
}
func (w *WakuNode) HasHistory() bool {
for _, v := range w.peers {
for _, protocol := range v {
if protocol == string(store.WakuStoreProtocolId) {
return true
}
}
}
return false
}
func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty()))
var result []ma.Multiaddr
@ -219,7 +263,7 @@ func (w *WakuNode) Filter() *filter.WakuFilter {
return w.filter
}
func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...wakurelay.Option) error {
func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...)

View File

@ -8,11 +8,11 @@ import (
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
rendezvous "github.com/status-im/go-waku-rendezvous"
"github.com/status-im/go-waku/waku/v2/protocol/store"
wakurelay "github.com/status-im/go-wakurelay-pubsub"
)
// Default clientId
@ -25,7 +25,7 @@ type WakuNodeParameters struct {
enableRelay bool
enableFilter bool
wOpts []wakurelay.Option
wOpts []pubsub.Option
enableStore bool
shouldResume bool
@ -94,7 +94,7 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption {
// WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption
// accepts a list of WakuRelay gossipsub option to setup the protocol
func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption {
func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRelay = true
params.wOpts = opts
@ -102,7 +102,7 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption {
}
}
func WithRendezvous(discoverOpts ...wakurelay.DiscoverOpt) WakuNodeOption {
func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRendezvous = true
params.rendezvousOpts = discoverOpts
@ -118,8 +118,9 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption {
}
}
// WithWakuFilter enables the Waku V2 Filter protocol.
func WithWakuFilter() WakuNodeOption {
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(opts ...pubsub.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilter = true
return nil

View File

@ -2,42 +2,71 @@ package relay
import (
"context"
"crypto/sha256"
"errors"
"sync"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/status-im/go-waku/waku/v2/protocol"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
wakurelay "github.com/status-im/go-wakurelay-pubsub"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
var log = logging.Logger("wakurelay")
type Topic string
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
type WakuRelay struct {
host host.Host
pubsub *wakurelay.PubSub
pubsub *pubsub.PubSub
topics map[Topic]bool
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*wakurelay.Topic
relaySubs map[Topic]*wakurelay.Subscription
wakuRelayTopics map[Topic]*pubsub.Topic
relaySubs map[Topic]*pubsub.Subscription
}
func NewWakuRelay(ctx context.Context, h host.Host, opts ...wakurelay.Option) (*WakuRelay, error) {
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
func msgIdFn(pmsg *pubsub_pb.Message) string {
hash := sha256.Sum256(pmsg.Data)
return string(hash[:])
}
func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.topics = make(map[Topic]bool)
w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic)
w.relaySubs = make(map[Topic]*wakurelay.Subscription)
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
w.relaySubs = make(map[Topic]*pubsub.Subscription)
ps, err := wakurelay.NewWakuRelaySubWithMatcherFunc(ctx, h, protocol.PrefixTextMatch, opts...)
// default options required by WakuRelay
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
opts = append(opts, pubsub.WithNoAuthor())
opts = append(opts, pubsub.WithMessageIdFn(msgIdFn))
opts = append(opts, pubsub.WithGossipSubProtocols(
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
func (feat GossipSubFeature, proto protocol.ID) bool {
switch feat {
case GossipSubFeatureMesh:
return proto == GossipSubID_v11 || proto == GossipSubID_v10
case GossipSubFeaturePX:
return proto == GossipSubID_v11
default:
return false
}
}
))
ps, err := pubsub.NewGossipSub(ctx, h, opts...)
if err != nil {
return nil, err
}
@ -48,7 +77,7 @@ func NewWakuRelay(ctx context.Context, h host.Host, opts ...wakurelay.Option) (*
return w, nil
}
func (w *WakuRelay) PubSub() *wakurelay.PubSub {
func (w *WakuRelay) PubSub() *pubsub.PubSub {
return w.pubsub
}
@ -63,11 +92,11 @@ func (w *WakuRelay) Topics() []Topic {
return result
}
func (w *WakuRelay) SetPubSub(pubSub *wakurelay.PubSub) {
func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
w.pubsub = pubSub
}
func (w *WakuRelay) upsertTopic(topic Topic) (*wakurelay.Topic, error) {
func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
@ -84,7 +113,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*wakurelay.Topic, error) {
return pubSubTopic, nil
}
func (w *WakuRelay) Subscribe(topic Topic) (subs *wakurelay.Subscription, isNew bool, err error) {
func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew bool, err error) {
sub, ok := w.relaySubs[topic]
if !ok {