diff --git a/waku/v2/peers/peerstore.go b/waku/v2/peers/peerstore.go index eb754c39..d8b5b234 100644 --- a/waku/v2/peers/peerstore.go +++ b/waku/v2/peers/peerstore.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" ) @@ -38,7 +37,7 @@ type WakuPeerstore interface { Origin(p peer.ID, origin Origin) (Origin, error) PeersByOrigin(origin Origin) peer.IDSlice SetENR(p peer.ID, enr *enode.Node) error - ENR(p peer.ID, origin Origin) (*enr.Record, error) + ENR(p peer.ID, origin Origin) (*enode.Node, error) AddConnFailure(p peer.AddrInfo) ResetConnFailures(p peer.AddrInfo) ConnFailures(p peer.AddrInfo) int diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 81c136bb..0c65dbe3 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -65,6 +67,42 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.wg = sync.WaitGroup{} w.log = log.Named("relay") + cfg := pubsub.DefaultGossipSubParams() + cfg.PruneBackoff = time.Minute + cfg.UnsubscribeBackoff = 5 * time.Second + cfg.GossipFactor = 0.25 + cfg.D = 6 + cfg.Dlo = 4 + cfg.Dhi = 12 + cfg.Dout = 3 + cfg.Dlazy = 6 + cfg.HeartbeatInterval = time.Second + cfg.HistoryLength = 6 + cfg.HistoryGossip = 3 + cfg.FanoutTTL = time.Minute + + peerScoreParams := &pubsub.PeerScoreParams{ + DecayInterval: 12 * time.Second, // how often peer scoring is updated + DecayToZero: 0.01, // below this we consider the parameter to be zero + RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects + AppSpecificScore: func(p peer.ID) float64 { + return 0 + }, + AppSpecificWeight: 0.0, // p5: application specific, unset + IPColocationFactorWeight: -50, // p6: penalizes peers sharing more than threshold ips + IPColocationFactorThreshold: 5.0, // + BehaviourPenaltyWeight: -10, // p7: penalizes bad behaviour (weight and decay) + BehaviourPenaltyDecay: 0.986, + } + + peerScoreThresholds := &pubsub.PeerScoreThresholds{ + GossipThreshold: -100, // no gossip is sent to peers below this score + PublishThreshold: -1000, // no self-published msgs are sent to peers below this score + GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score + OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset. + + } + // default options required by WakuRelay w.opts = append([]pubsub.Option{ pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), @@ -83,12 +121,17 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou } }, ), + pubsub.WithGossipSubParams(cfg), + pubsub.WithFloodPublish(true), + pubsub.WithSeenMessagesTTL(2 * time.Minute), + pubsub.WithPeerScore(peerScoreParams, peerScoreThresholds), + pubsub.WithDefaultValidator(func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { + msg := new(pb.WakuMessage) + err := proto.Unmarshal(message.Data, msg) + return err == nil + }), }, opts...) - // We disable overriding gossipsub parameters by adding them as the last value in the options - cfg := pubsub.DefaultGossipSubParams() - w.opts = append(w.opts, pubsub.WithGossipSubParams(cfg)) - return w } @@ -158,16 +201,6 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { return pubSubTopic, nil } -/* -func (w *WakuRelay) validatorFactory(pubsubTopic string) func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { - return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { - msg := new(pb.WakuMessage) - err := proto.Unmarshal(message.Data, msg) - return err == nil - } -} -*/ - func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) { sub, ok := w.relaySubs[topic] if !ok { @@ -176,15 +209,6 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } - /* - // TODO: Add a function to validate the WakuMessage integrity - // Rejects messages that are not WakuMessage - err = w.pubsub.RegisterTopicValidator(topic, w.validatorFactory(topic)) - if err != nil { - return nil, err - } - */ - sub, err = pubSubTopic.Subscribe() if err != nil { return nil, err