feat: Sharded peer management - Relay (#764)

* feat: connect/disconnect to peers based on node topic sub/unsub

* feat: maintain healty relay connections per pubSubTopic

Co-authored-by: richΛrd <info@richardramos.me>

* chore: add config to limit peerstore capacity (#770)
This commit is contained in:
Prem Chaitanya Prathi 2023-09-27 12:16:37 +05:30 committed by GitHub
parent d324234c81
commit 388f56b43f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 338 additions and 117 deletions

View File

@ -32,6 +32,12 @@ var (
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
PeerStoreCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "peer-store-capacity",
Usage: "Maximum stored peers in the peerstore.",
Destination: &options.PeerStoreCapacity,
EnvVars: []string{"WAKUNODE2_PEERSTORE_CAPACITY"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},

View File

@ -22,6 +22,7 @@ func main() {
TcpPort,
Address,
MaxPeerConnections,
PeerStoreCapacity,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,

View File

@ -129,6 +129,7 @@ func Execute(options NodeOptions) {
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
node.WithPeerStoreCapacity(options.PeerStoreCapacity),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
@ -322,7 +323,7 @@ func Execute(options NodeOptions) {
}
for _, d := range discoveredNodes {
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil)
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil, true)
}
//For now assuming that static peers added support/listen on all topics specified via commandLine.

View File

@ -169,6 +169,7 @@ type NodeOptions struct {
UserAgent string
PProf bool
MaxPeerConnections int
PeerStoreCapacity int
PeerExchange PeerExchangeOptions
Websocket WSOptions

View File

@ -253,7 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
if err != nil {
@ -701,7 +701,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics
}
// AddDiscoveredPeer to add a discovered peer to the node peerStore
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string) {
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) {
p := peermanager.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
@ -710,7 +710,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
},
PubSubTopics: pubsubTopics,
}
w.peermanager.AddDiscoveredPeer(p)
w.peermanager.AddDiscoveredPeer(p, connectNow)
}
// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress

View File

@ -88,6 +88,7 @@ type WakuNodeParameters struct {
rendezvousDB *rendezvous.DB
maxPeerConnections int
peerStoreCapacity int
enableDiscV5 bool
udpPort uint
@ -356,6 +357,13 @@ func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
}
}
func WithPeerStoreCapacity(capacity int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.peerStoreCapacity = capacity
return nil
}
}
// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {

View File

@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
@ -113,8 +114,14 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) {
if !ok {
return
}
c.pm.AddDiscoveredPeer(p)
c.PushToChan(p)
triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize {
triggerImmediateConnection = true
}
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
@ -137,8 +144,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
}
func (c *PeerConnectionStrategy) start() error {
c.WaitGroup().Add(2)
go c.shouldDialPeers()
c.WaitGroup().Add(1)
go c.dialPeers()
c.consumeSubscriptions()
@ -155,22 +162,6 @@ func (c *PeerConnectionStrategy) isPaused() bool {
return c.paused.Load()
}
func (c *PeerConnectionStrategy) shouldDialPeers() {
defer c.WaitGroup().Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Context().Done():
return
case <-ticker.C:
_, outRelayPeers := c.pm.getRelayPeers()
c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target
}
}
}
// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {

View File

@ -2,8 +2,11 @@ package peermanager
import (
"context"
"errors"
"sync"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@ -13,6 +16,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -20,9 +24,15 @@ import (
"go.uber.org/zap"
)
// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node.
type NodeTopicDetails struct {
topic *pubsub.Topic
}
// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
peerConnector *PeerConnectionStrategy
maxPeers int
maxRelayPeers int
logger *zap.Logger
InRelayPeersTarget int
@ -31,9 +41,12 @@ type PeerManager struct {
serviceSlots *ServiceSlots
ctx context.Context
sub event.Subscription
topicMutex sync.RWMutex
subRelayTopics map[string]*NodeTopicDetails
}
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5
// 80% relay peers 20% service peers
func relayAndServicePeers(maxConnections int) (int, int) {
@ -52,22 +65,29 @@ func inAndOutRelayPeers(relayPeers int) (int, int) {
}
// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager {
func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager {
maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers)
if maxPeers == 0 || maxConnections > maxPeers {
maxPeers = maxConnsToPeerRatio * maxConnections
}
pm := &PeerManager{
logger: logger.Named("peer-manager"),
maxRelayPeers: maxRelayPeers,
InRelayPeersTarget: inRelayPeersTarget,
OutRelayPeersTarget: outRelayPeersTarget,
serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers,
}
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers),
zap.Int("outRelayPeersTarget", outRelayPeersTarget),
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget))
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget),
zap.Int("maxPeers", maxPeers))
return pm
}
@ -82,44 +102,6 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
pm.peerConnector = pc
}
func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error {
var err error
pm.sub, err = bus.Subscribe(new(relay.EvtPeerTopic))
if err != nil {
return err
}
return nil
}
func (pm *PeerManager) peerEventLoop(ctx context.Context) {
defer pm.sub.Close()
for {
select {
case e := <-pm.sub.Out():
peerEvt := e.(relay.EvtPeerTopic)
wps := pm.host.Peerstore().(*wps.WakuPeerstoreImpl)
peerID := peerEvt.PeerID
if peerEvt.State == relay.PEER_JOINED {
err := wps.AddPubSubTopic(peerID, peerEvt.Topic)
if err != nil {
pm.logger.Error("failed to add pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.String("topic", peerEvt.Topic), zap.Error(err))
}
} else if peerEvt.State == relay.PEER_LEFT {
err := wps.RemovePubSubTopic(peerID, peerEvt.Topic)
if err != nil {
pm.logger.Error("failed to remove pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.Error(err))
}
} else {
pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State)))
}
case <-ctx.Done():
return
}
}
}
// Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) {
pm.ctx = ctx
@ -131,6 +113,7 @@ func (pm *PeerManager) Start(ctx context.Context) {
// This is a connectivity loop, which currently checks and prunes inbound connections.
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
pm.connectToRelayPeers()
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
defer t.Stop()
for {
@ -144,10 +127,12 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
}
// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
peers := pm.host.Network().Peers()
func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
if len(specificPeers) == 0 {
specificPeers = pm.host.Network().Peers()
}
for _, p := range peers {
for _, p := range specificPeers {
direction, err := pm.host.Peerstore().(wps.WakuPeerstore).Direction(p)
if err == nil {
if direction == network.DirInbound {
@ -163,9 +148,11 @@ func (pm *PeerManager) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers p
return inPeers, outPeers, nil
}
func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection()
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers)
if err != nil {
return
}
@ -182,57 +169,91 @@ func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers
return
}
func (pm *PeerManager) connectToRelayPeers() {
// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic.
// If not it will look into peerStore to initiate more connections.
// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle
func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
for topicStr, topicInst := range pm.subRelayTopics {
curPeers := topicInst.topic.ListPeers()
curPeerLen := len(curPeers)
if curPeerLen < waku_proto.GossipSubOptimalFullMeshSize {
pm.logger.Info("Subscribed topic is unhealthy, initiating more connections to maintain health",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("optimumPeers", waku_proto.GossipSubOptimalFullMeshSize))
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 {
//TODO: Trigger on-demand discovery for this topic.
continue
}
//Connect to eligible peers.
numPeersToConnect := waku_proto.GossipSubOptimalFullMeshSize - curPeerLen
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
}
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect])
}
}
}
// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic.
// If not, initiates connections to additional peers.
// It also checks for incoming relay connections and prunes once they cross inRelayTarget
func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()),
zap.Int("outRelayPeers", outRelayPeers.Len()))
pm.ensureMinRelayConnsPerTopic()
inRelayPeers, outRelayPeers := pm.getRelayPeers(nil)
pm.logger.Info("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InRelayPeersTarget {
pm.pruneInRelayConns(inRelayPeers)
}
if outRelayPeers.Len() > pm.OutRelayPeersTarget {
return
}
totalRelayPeers := inRelayPeers.Len() + outRelayPeers.Len()
// Establish additional connections connected peers are lesser than target.
//What if the not connected peers in peerstore are not relay peers???
if totalRelayPeers < pm.maxRelayPeers {
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers()
if notConnectedPeers.Len() == 0 {
return
}
//Connect to eligible peers.
numPeersToConnect := pm.maxRelayPeers - totalRelayPeers
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
}
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect])
} //Else: Should we raise some sort of unhealthy event??
}
func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) PeerData {
return PeerData{
// addrInfoToPeerData returns addressinfo for a peer
// If addresses are expired, it removes the peer from host peerStore and returns nil.
func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: host.Peerstore().Addrs(peerID),
Addrs: addrs,
},
}
}
// connectToPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host)
pm.peerConnector.PushToChan(peerData)
if peerData == nil {
continue
}
pm.peerConnector.PushToChan(*peerData)
}
}
func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) {
for _, peerID := range pm.host.Peerstore().Peers() {
// getNotConnectedPers returns peers for a pubSubTopic that are not connected.
func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) {
var peerList peer.IDSlice
if pubsubTopic == "" {
peerList = pm.host.Peerstore().Peers()
} else {
peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
}
for _, peerID := range peerList {
if pm.host.Network().Connectedness(peerID) != network.Connected {
notConnectedPeers = append(notConnectedPeers, peerID)
}
@ -240,13 +261,15 @@ func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) {
return
}
// pruneInRelayConns prune any incoming relay connections crossing derived inrelayPeerTarget
func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
//Start disconnecting peers, based on what?
//For now, just disconnect most recently connected peers
//For now no preference is used
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning",
zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget))
//TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers.
pm.logger.Info("peer connections exceed target relay peers, hence pruning",
zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InRelayPeersTarget))
for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
@ -262,7 +285,17 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
// AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots.
// TODO: It maybe good to set in service-slots based on services supported in the ENR
func (pm *PeerManager) AddDiscoveredPeer(p PeerData) {
func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) {
//Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes.
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return
}
//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
pm.logger.Debug("Found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID))
return
}
// Try to fetch shard info from ENR to arrive at pubSub topics.
if len(p.PubSubTopics) == 0 && p.ENR != nil {
shards, err := wenr.RelaySharding(p.ENR.Record())
@ -278,7 +311,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) {
p.PubSubTopics = append(p.PubSubTopics, topicStr)
}
} else {
pm.logger.Info("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
}
}
}
@ -292,14 +325,25 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) {
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
}
if connectNow {
pm.peerConnector.PushToChan(p)
}
}
// addPeer adds peer to only the peerStore.
// It also sets additional metadata such as origin, ENR and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return errors.New("peer store capacity reached")
}
pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID))
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL)
if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
} else {
//Need to re-evaluate the address expiry
// For now expiring them with default addressTTL which is an hour.
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL)
}
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
if err != nil {
pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID))

View File

@ -31,7 +31,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
defer h1.Close()
// host 1 is used by peer manager
pm := NewPeerManager(10, utils.Logger())
pm := NewPeerManager(10, 20, utils.Logger())
pm.SetHost(h1)
return ctx, pm, func() {

View File

@ -0,0 +1,166 @@
package peermanager
import (
"context"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error {
var err error
pm.sub, err = bus.Subscribe([]interface{}{new(relay.EvtPeerTopic), new(relay.EvtRelaySubscribed), new(relay.EvtRelayUnsubscribed)})
return err
}
func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topicInst *pubsub.Topic) {
pm.logger.Info("handleNewRelayTopicSubscription", zap.String("pubSubTopic", pubsubTopic))
pm.topicMutex.Lock()
defer pm.topicMutex.Unlock()
_, ok := pm.subRelayTopics[pubsubTopic]
if ok {
//Nothing to be done, as we are already subscribed to this topic.
return
}
pm.subRelayTopics[pubsubTopic] = &NodeTopicDetails{topicInst}
//Check how many relay peers we are connected to that subscribe to this topic, if less than D find peers in peerstore and connect.
//If no peers in peerStore, trigger discovery for this topic?
relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
var notConnectedPeers peer.IDSlice
connectedPeers := 0
for _, peer := range relevantPeersForPubSubTopic {
if pm.host.Network().Connectedness(peer) == network.Connected {
connectedPeers++
} else {
notConnectedPeers = append(notConnectedPeers, peer)
}
}
if connectedPeers >= waku_proto.GossipSubOptimalFullMeshSize { //TODO: Use a config rather than hard-coding.
// Should we use optimal number or define some sort of a config for the node to choose from?
// A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has
// or bandwidth it can support.
// Should we link this to bandwidth management somehow or just depend on some sort of config profile?
pm.logger.Info("Optimal required relay peers for new pubSubTopic are already connected ", zap.String("pubSubTopic", pubsubTopic),
zap.Int("connectedPeerCount", connectedPeers))
return
}
triggerDiscovery := false
if notConnectedPeers.Len() > 0 {
numPeersToConnect := notConnectedPeers.Len() - connectedPeers
if numPeersToConnect < 0 {
numPeersToConnect = notConnectedPeers.Len()
} else if numPeersToConnect-connectedPeers > waku_proto.GossipSubOptimalFullMeshSize {
numPeersToConnect = waku_proto.GossipSubOptimalFullMeshSize - connectedPeers
}
if numPeersToConnect+connectedPeers < waku_proto.GossipSubOptimalFullMeshSize {
triggerDiscovery = true
}
//For now all peers are being given same priority,
// Later we may want to choose peers that have more shards in common over others.
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect])
} else {
triggerDiscovery = true
}
if triggerDiscovery {
//TODO: Initiate on-demand discovery for this pubSubTopic.
// Use peer-exchange and rendevouz?
//Should we query discoverycache to find out if there are any more peers before triggering discovery?
return
}
}
func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) {
pm.logger.Info("handleNewRelayTopicUnSubscription", zap.String("pubSubTopic", pubsubTopic))
pm.topicMutex.Lock()
defer pm.topicMutex.Unlock()
_, ok := pm.subRelayTopics[pubsubTopic]
if !ok {
//Nothing to be done, as we are already unsubscribed from this topic.
return
}
delete(pm.subRelayTopics, pubsubTopic)
//If there are peers only subscribed to this topic, disconnect them.
relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
for _, peer := range relevantPeersForPubSubTopic {
if pm.host.Network().Connectedness(peer) == network.Connected {
peerTopics, err := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PubSubTopics(peer)
if err != nil {
pm.logger.Error("Could not retrieve pubsub topics for peer", zap.Error(err),
logging.HostID("peerID", peer))
continue
}
if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic {
err := pm.host.Network().ClosePeer(peer)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer",
logging.HostID("peerID", peer))
continue
}
pm.logger.Debug("Successfully disconnected connection towards peer",
logging.HostID("peerID", peer))
}
}
}
}
func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) {
wps := pm.host.Peerstore().(*wps.WakuPeerstoreImpl)
peerID := peerEvt.PeerID
if peerEvt.State == relay.PEER_JOINED {
err := wps.AddPubSubTopic(peerID, peerEvt.PubsubTopic)
if err != nil {
pm.logger.Error("failed to add pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.String("topic", peerEvt.PubsubTopic), zap.Error(err))
}
} else if peerEvt.State == relay.PEER_LEFT {
err := wps.RemovePubSubTopic(peerID, peerEvt.PubsubTopic)
if err != nil {
pm.logger.Error("failed to remove pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.Error(err))
}
} else {
pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State)))
}
}
func (pm *PeerManager) peerEventLoop(ctx context.Context) {
defer pm.sub.Close()
for {
select {
case e := <-pm.sub.Out():
switch e := e.(type) {
case relay.EvtPeerTopic:
{
peerEvt := (relay.EvtPeerTopic)(e)
pm.handlerPeerTopicEvent(peerEvt)
}
case relay.EvtRelaySubscribed:
{
eventDetails := (relay.EvtRelaySubscribed)(e)
pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst)
}
case relay.EvtRelayUnsubscribed:
{
eventDetails := (relay.EvtRelayUnsubscribed)(e)
pm.handleNewRelayTopicUnSubscription(eventDetails.Topic)
}
default:
pm.logger.Error("unsupported event type", zap.Any("eventType", e))
}
case <-ctx.Done():
return
}
}
}

View File

@ -71,7 +71,8 @@ type WakuRelay struct {
// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
type EvtRelaySubscribed struct {
Topic string
Topic string
TopicInst *pubsub.Topic
}
// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed
@ -87,9 +88,9 @@ const (
)
type EvtPeerTopic struct {
Topic string
PeerID peer.ID
State PeerTopicState
PubsubTopic string
PeerID peer.ID
State PeerTopicState
}
func msgIDFn(pmsg *pubsub_pb.Message) string {
@ -115,11 +116,11 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
cfg.PruneBackoff = time.Minute
cfg.UnsubscribeBackoff = 5 * time.Second
cfg.GossipFactor = 0.25
cfg.D = 6
cfg.D = waku_proto.GossipSubOptimalFullMeshSize
cfg.Dlo = 4
cfg.Dhi = 12
cfg.Dout = 3
cfg.Dlazy = 6
cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize
cfg.HeartbeatInterval = time.Second
cfg.HistoryLength = 6
cfg.HistoryGossip = 3
@ -331,7 +332,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
w.topicEvtHanders[topic] = evtHandler
w.relaySubs[topic] = sub
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic})
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic, pubSubTopic})
if err != nil {
return nil, err
}
@ -554,13 +555,13 @@ func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandl
}
if evt.Type == pubsub.PeerJoin {
w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_JOINED})
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_JOINED})
if err != nil {
w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err))
}
} else if evt.Type == pubsub.PeerLeave {
w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer))
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_LEFT})
err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_LEFT})
if err != nil {
w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err))
}

View File

@ -6,6 +6,8 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
)
const GossipSubOptimalFullMeshSize = 6
// FulltextMatch is the default matching function used for checking if a peer
// supports a protocol or not
func FulltextMatch(expectedProtocol string) func(string) bool {