go-libp2p-pubsub/gossipsub.go
Pop Chunhapanya b421b3ab05
GossipSub v1.2: IDONTWANT control message and priority queue. (#553)
## GossipSub v1.2 implementation

Specification: libp2p/specs#548

### Work Summary
Sending IDONTWANT

Implement a smart queue
Add priorities to the smart queue

    Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets

Handling IDONTWANT

Use a map to remember the message ids whose IDONTWANT packets have been received
Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached)
Clear the message IDs from the cache after 3 heartbeats

    Hash the message IDs before putting them into the cache.

More requested features

    Add a feature test to not send IDONTWANT if the other side doesnt support it

### Commit Summary

* Replace sending channel with the smart rpcQueue

Since we want to implement a priority queue later, we need to replace
the normal sending channels with the new smart structures first.

* Implement UrgentPush in the smart rpcQueue

UrgentPush allows you to push an rpc packet to the front of the queue so
that it will be popped out fast.

* Add IDONTWANT to rpc.proto and trace.proto

* Send IDONTWANT right before validation step

Most importantly, this commit adds a new method called PreValidation to
the interface PubSubRouter, which will be called right before validating
the gossipsub message.

In GossipSubRouter, PreValidation will send the IDONTWANT controll
messages to all the mesh peers of the topics of the received messages.

* Test GossipSub IDONWANT sending

* Send IDONWANT only for large messages

* Handle IDONTWANT control messages

When receiving IDONTWANTs, the host should remember the message ids
contained in IDONTWANTs using a hash map.

When receiving messages with those ids, it shouldn't forward them to the
peers who already sent the IDONTWANTs.

When the maximum number of IDONTWANTs is reached for any particular
peer, the host should ignore any excessive IDONTWANTs from that peer.

* Clear expired message IDs from the IDONTWANT cache

If the messages IDs received from IDONTWANTs are older than 3
heartbeats, they should be removed from the IDONTWANT cache.

* Keep the hashes of IDONTWANT message ids instead

Rather than keeping the raw message ids, keep their hashes instead to
save memory and protect again memory DoS attacks.

* Increase GossipSubMaxIHaveMessages to 1000

* fixup! Clear expired message IDs from the IDONTWANT cache

* Not send IDONTWANT if the receiver doesn't support

* fixup! Replace sending channel with the smart rpcQueue

* Not use pointers in rpcQueue

* Simply rcpQueue by using only one mutex

* Check ctx error in rpc sending worker

Co-authored-by: Steven Allen <steven@stebalien.com>

* fixup! Simply rcpQueue by using only one mutex

* fixup! Keep the hashes of IDONTWANT message ids instead

* Use AfterFunc instead implementing our own

* Fix misc lint errors

* fixup! Fix misc lint errors

* Revert "Increase GossipSubMaxIHaveMessages to 1000"

This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec.

* Increase GossipSubMaxIDontWantMessages to 1000

* fixup! Handle IDONTWANT control messages

* Skip TestGossipsubConnTagMessageDeliveries

* Skip FuzzAppendOrMergeRPC

* Revert "Skip FuzzAppendOrMergeRPC"

This reverts commit f141e13234de0960d139339acb636a1afea9e219.

* fixup! Send IDONWANT only for large messages

* fixup! fixup! Keep the hashes of IDONTWANT message ids instead

* fixup! Implement UrgentPush in the smart rpcQueue

* fixup! Use AfterFunc instead implementing our own

---------

Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 18:16:35 +03:00

2166 lines
64 KiB
Go

package pubsub
import (
"context"
"crypto/sha256"
"fmt"
"io"
"math/rand"
"sort"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
)
const (
// GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol.
// It is advertised along with GossipSubID_v11 and GossipSubID_v12 for backwards compatibility.
GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")
// GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol.
// It is advertised along with GossipSubID_v12 for backwards compatibility.
// See the spec for details about how v1.1.0 compares to v1.0.0:
// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
// GossipSubID_v12 is the protocol ID for version 1.2.0 of the GossipSub protocol.
// See the spec for details about how v1.2.0 compares to v1.1.0:
// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md
GossipSubID_v12 = protocol.ID("/meshsub/1.2.0")
)
// Defines the default gossipsub parameters.
var (
GossipSubD = 6
GossipSubDlo = 5
GossipSubDhi = 12
GossipSubDscore = 4
GossipSubDout = 2
GossipSubHistoryLength = 5
GossipSubHistoryGossip = 3
GossipSubDlazy = 6
GossipSubGossipFactor = 0.25
GossipSubGossipRetransmission = 3
GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
GossipSubHeartbeatInterval = 1 * time.Second
GossipSubFanoutTTL = 60 * time.Second
GossipSubPrunePeers = 16
GossipSubPruneBackoff = time.Minute
GossipSubUnsubscribeBackoff = 10 * time.Second
GossipSubConnectors = 8
GossipSubMaxPendingConnections = 128
GossipSubConnectionTimeout = 30 * time.Second
GossipSubDirectConnectTicks uint64 = 300
GossipSubDirectConnectInitialDelay = time.Second
GossipSubOpportunisticGraftTicks uint64 = 60
GossipSubOpportunisticGraftPeers = 2
GossipSubGraftFloodThreshold = 10 * time.Second
GossipSubMaxIHaveLength = 5000
GossipSubMaxIHaveMessages = 10
GossipSubMaxIDontWantMessages = 1000
GossipSubIWantFollowupTime = 3 * time.Second
GossipSubIDontWantMessageThreshold = 1024 // 1KB
GossipSubIDontWantMessageTTL = 3 // 3 heartbeats
)
type checksum struct {
payload [32]byte
length uint8
}
// GossipSubParams defines all the gossipsub specific parameters.
type GossipSubParams struct {
// overlay parameters.
// D sets the optimal degree for a GossipSub topic mesh. For example, if D == 6,
// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
// D should be set somewhere between Dlo and Dhi.
D int
// Dlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
// If we have fewer than Dlo peers, we will attempt to graft some more into the mesh at
// the next heartbeat.
Dlo int
// Dhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
// If we have more than Dhi peers, we will select some to prune from the mesh at the next heartbeat.
Dhi int
// Dscore affects how peers are selected when pruning a mesh due to over subscription.
// At least Dscore of the retained peers will be high-scoring, while the remainder are
// chosen randomly.
Dscore int
// Dout sets the quota for the number of outbound connections to maintain in a topic mesh.
// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
// to at least Dout of the survivor peers. This prevents sybil attackers from overwhelming
// our mesh with incoming connections.
//
// Dout must be set below Dlo, and must not exceed D / 2.
Dout int
// gossip parameters
// HistoryLength controls the size of the message cache used for gossip.
// The message cache will remember messages for HistoryLength heartbeats.
HistoryLength int
// HistoryGossip controls how many cached message ids we will advertise in
// IHAVE gossip messages. When asked for our seen message IDs, we will return
// only those from the most recent HistoryGossip heartbeats. The slack between
// HistoryGossip and HistoryLength allows us to avoid advertising messages
// that will be expired by the time they're requested.
//
// HistoryGossip must be less than or equal to HistoryLength to
// avoid a runtime panic.
HistoryGossip int
// Dlazy affects how many peers we will emit gossip to at each heartbeat.
// We will send gossip to at least Dlazy peers outside our mesh. The actual
// number may be more, depending on GossipFactor and how many peers we're
// connected to.
Dlazy int
// GossipFactor affects how many peers we will emit gossip to at each heartbeat.
// We will send gossip to GossipFactor * (total number of non-mesh peers), or
// Dlazy, whichever is greater.
GossipFactor float64
// GossipRetransmission controls how many times we will allow a peer to request
// the same message id through IWANT gossip before we start ignoring them. This is designed
// to prevent peers from spamming us with requests and wasting our resources.
GossipRetransmission int
// heartbeat interval
// HeartbeatInitialDelay is the short delay before the heartbeat timer begins
// after the router is initialized.
HeartbeatInitialDelay time.Duration
// HeartbeatInterval controls the time between heartbeats.
HeartbeatInterval time.Duration
// SlowHeartbeatWarning is the duration threshold for heartbeat processing before emitting
// a warning; this would be indicative of an overloaded peer.
SlowHeartbeatWarning float64
// FanoutTTL controls how long we keep track of the fanout state. If it's been
// FanoutTTL since we've published to a topic that we're not subscribed to,
// we'll delete the fanout map for that topic.
FanoutTTL time.Duration
// PrunePeers controls the number of peers to include in prune Peer eXchange.
// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
// send them signed peer records for up to PrunePeers other peers that we
// know of.
PrunePeers int
// PruneBackoff controls the backoff time for pruned peers. This is how long
// a peer must wait before attempting to graft into our mesh again after being pruned.
// When pruning a peer, we send them our value of PruneBackoff so they know
// the minimum time to wait. Peers running older versions may not send a backoff time,
// so if we receive a prune message without one, we will wait at least PruneBackoff
// before attempting to re-graft.
PruneBackoff time.Duration
// UnsubscribeBackoff controls the backoff time to use when unsuscribing
// from a topic. A peer should not resubscribe to this topic before this
// duration.
UnsubscribeBackoff time.Duration
// Connectors controls the number of active connection attempts for peers obtained through PX.
Connectors int
// MaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
MaxPendingConnections int
// ConnectionTimeout controls the timeout for connection attempts.
ConnectionTimeout time.Duration
// DirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
// that are not currently connected.
DirectConnectTicks uint64
// DirectConnectInitialDelay is the initial delay before opening connections to direct peers
DirectConnectInitialDelay time.Duration
// OpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
// with opportunistic grafting. Every OpportunisticGraftTicks we will attempt to select some
// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
OpportunisticGraftTicks uint64
// OpportunisticGraftPeers is the number of peers to opportunistically graft.
OpportunisticGraftPeers int
// If a GRAFT comes before GraftFloodThreshold has elapsed since the last PRUNE,
// then there is an extra score penalty applied to the peer through P7.
GraftFloodThreshold time.Duration
// MaxIHaveLength is the maximum number of messages to include in an IHAVE message.
// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
// default if your system is pushing more than 5000 messages in HistoryGossip heartbeats;
// with the defaults this is 1666 messages/s.
MaxIHaveLength int
// MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
MaxIHaveMessages int
// MaxIDontWantMessages is the maximum number of IDONTWANT messages to accept from a peer within a heartbeat.
MaxIDontWantMessages int
// Time to wait for a message requested through IWANT following an IHAVE advertisement.
// If the message is not received within this window, a broken promise is declared and
// the router may apply bahavioural penalties.
IWantFollowupTime time.Duration
// IDONTWANT is only sent for messages larger than the threshold. This should be greater than
// D_high * the size of the message id. Otherwise, the attacker can do the amplication attack by sending
// small messages while the receiver replies back with larger IDONTWANT messages.
IDontWantMessageThreshold int
// IDONTWANT is cleared when it's older than the TTL.
IDontWantMessageTTL int
}
// NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router.
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
rt := DefaultGossipSubRouter(h)
opts = append(opts, WithRawTracer(rt.tagTracer))
return NewGossipSubWithRouter(ctx, h, rt, opts...)
}
// NewGossipSubWithRouter returns a new PubSub object using the given router.
func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
return NewPubSub(ctx, h, rt, opts...)
}
// DefaultGossipSubRouter returns a new GossipSubRouter with default parameters.
func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
params := DefaultGossipSubParams()
return &GossipSubRouter{
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
peerdontwant: make(map[peer.ID]int),
unwanted: make(map[peer.ID]map[checksum]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, params.MaxPendingConnections),
cab: pstoremem.NewAddrBook(),
mcache: NewMessageCache(params.HistoryGossip, params.HistoryLength),
protos: GossipSubDefaultProtocols,
feature: GossipSubDefaultFeatures,
tagTracer: newTagTracer(h.ConnManager()),
params: params,
}
}
// DefaultGossipSubParams returns the default gossip sub parameters
// as a config.
func DefaultGossipSubParams() GossipSubParams {
return GossipSubParams{
D: GossipSubD,
Dlo: GossipSubDlo,
Dhi: GossipSubDhi,
Dscore: GossipSubDscore,
Dout: GossipSubDout,
HistoryLength: GossipSubHistoryLength,
HistoryGossip: GossipSubHistoryGossip,
Dlazy: GossipSubDlazy,
GossipFactor: GossipSubGossipFactor,
GossipRetransmission: GossipSubGossipRetransmission,
HeartbeatInitialDelay: GossipSubHeartbeatInitialDelay,
HeartbeatInterval: GossipSubHeartbeatInterval,
FanoutTTL: GossipSubFanoutTTL,
PrunePeers: GossipSubPrunePeers,
PruneBackoff: GossipSubPruneBackoff,
UnsubscribeBackoff: GossipSubUnsubscribeBackoff,
Connectors: GossipSubConnectors,
MaxPendingConnections: GossipSubMaxPendingConnections,
ConnectionTimeout: GossipSubConnectionTimeout,
DirectConnectTicks: GossipSubDirectConnectTicks,
DirectConnectInitialDelay: GossipSubDirectConnectInitialDelay,
OpportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
OpportunisticGraftPeers: GossipSubOpportunisticGraftPeers,
GraftFloodThreshold: GossipSubGraftFloodThreshold,
MaxIHaveLength: GossipSubMaxIHaveLength,
MaxIHaveMessages: GossipSubMaxIHaveMessages,
MaxIDontWantMessages: GossipSubMaxIDontWantMessages,
IWantFollowupTime: GossipSubIWantFollowupTime,
IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
IDontWantMessageTTL: GossipSubIDontWantMessageTTL,
SlowHeartbeatWarning: 0.1,
}
}
// WithPeerScore is a gossipsub router option that enables peer scoring.
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
// sanity check: validate the score parameters
err := params.validate()
if err != nil {
return err
}
// sanity check: validate the threshold values
err = thresholds.validate()
if err != nil {
return err
}
gs.score = newPeerScore(params)
gs.gossipThreshold = thresholds.GossipThreshold
gs.publishThreshold = thresholds.PublishThreshold
gs.graylistThreshold = thresholds.GraylistThreshold
gs.acceptPXThreshold = thresholds.AcceptPXThreshold
gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold
gs.gossipTracer = newGossipTracer()
// hook the tracer
if ps.tracer != nil {
ps.tracer.raw = append(ps.tracer.raw, gs.score, gs.gossipTracer)
} else {
ps.tracer = &pubsubTracer{
raw: []RawTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
idGen: ps.idGen,
}
}
return nil
}
}
// WithFloodPublish is a gossipsub router option that enables flood publishing.
// When this is enabled, published messages are forwarded to all peers with score >=
// to publishThreshold
func WithFloodPublish(floodPublish bool) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.floodPublish = floodPublish
return nil
}
}
// WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE.
// This should generally be enabled in bootstrappers and well connected/trusted nodes
// used for bootstrapping.
func WithPeerExchange(doPX bool) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.doPX = doPX
return nil
}
}
// WithDirectPeers is a gossipsub router option that specifies peers with direct
// peering agreements. These peers are connected outside of the mesh, with all (valid)
// message unconditionally forwarded to them. The router will maintain open connections
// to these peers. Note that the peering agreement should be reciprocal with direct peers
// symmetrically configured at both ends.
func WithDirectPeers(pis []peer.AddrInfo) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
direct := make(map[peer.ID]struct{})
for _, pi := range pis {
direct[pi.ID] = struct{}{}
ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
}
gs.direct = direct
if gs.tagTracer != nil {
gs.tagTracer.direct = direct
}
return nil
}
}
// WithDirectConnectTicks is a gossipsub router option that sets the number of
// heartbeat ticks between attempting to reconnect direct peers that are not
// currently connected. A "tick" is based on the heartbeat interval, which is
// 1s by default. The default value for direct connect ticks is 300.
func WithDirectConnectTicks(t uint64) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.params.DirectConnectTicks = t
return nil
}
}
// WithGossipSubParams is a gossip sub router option that allows a custom
// config to be set when instantiating the gossipsub router.
func WithGossipSubParams(cfg GossipSubParams) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
// Overwrite current config and associated variables in the router.
gs.params = cfg
gs.connect = make(chan connectInfo, cfg.MaxPendingConnections)
gs.mcache = NewMessageCache(cfg.HistoryGossip, cfg.HistoryLength)
return nil
}
}
// GossipSubRouter is a router that implements the gossipsub protocol.
// For each topic we have joined, we maintain an overlay through which
// messages flow; this is the mesh map.
// For each topic we publish to without joining, we maintain a list of peers
// to use for injecting our messages in the overlay with stable routes; this
// is the fanout map. Fanout peer lists are expired if we don't publish any
// messages to their topic for GossipSubFanoutTTL.
type GossipSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
peerdontwant map[peer.ID]int // number of IDONTWANTs received from peer in the last heartbeat
unwanted map[peer.ID]map[checksum]int // TTL of the message ids peers don't want
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
cab peerstore.AddrBook
protos []protocol.ID
feature GossipSubFeatureTest
mcache *MessageCache
tracer *pubsubTracer
score *peerScore
gossipTracer *gossipTracer
tagTracer *tagTracer
gate *peerGater
// config for gossipsub parameters
params GossipSubParams
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes.
doPX bool
// threshold for accepting PX from a peer; this should be positive and limited to scores
// attainable by bootstrappers and trusted nodes
acceptPXThreshold float64
// threshold for peer score to emit/accept gossip
// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
// When there is no score, this value is 0.
gossipThreshold float64
// flood publish score threshold; we only publish to peers with score >= to the threshold
// when using flood publishing or the peer is a fanout or floodsub peer.
publishThreshold float64
// threshold for peer score before we graylist the peer and silently ignore its RPCs
graylistThreshold float64
// threshold for median peer score before triggering opportunistic grafting
opportunisticGraftThreshold float64
// whether to use flood publishing
floodPublish bool
// number of heartbeats since the beginning of time; this allows us to amortize some resource
// clean up -- eg backoff clean up.
heartbeatTicks uint64
}
type connectInfo struct {
p peer.ID
spr *record.Envelope
}
func (gs *GossipSubRouter) Protocols() []protocol.ID {
return gs.protos
}
func (gs *GossipSubRouter) Attach(p *PubSub) {
gs.p = p
gs.tracer = p.tracer
// start the scoring
gs.score.Start(gs)
// and the gossip tracing
gs.gossipTracer.Start(gs)
// and the tracer for connmgr tags
gs.tagTracer.Start(gs)
// start using the same msg ID function as PubSub for caching messages.
gs.mcache.SetMsgIdFn(p.idGen.ID)
// start the heartbeat
go gs.heartbeatTimer()
// start the PX connectors
for i := 0; i < gs.params.Connectors; i++ {
go gs.connector()
}
// Manage our address book from events emitted by libp2p
go gs.manageAddrBook()
// connect to direct peers
if len(gs.direct) > 0 {
go func() {
if gs.params.DirectConnectInitialDelay > 0 {
time.Sleep(gs.params.DirectConnectInitialDelay)
}
for p := range gs.direct {
gs.connect <- connectInfo{p: p}
}
}()
}
}
func (gs *GossipSubRouter) manageAddrBook() {
sub, err := gs.p.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerConnectednessChanged{},
})
if err != nil {
log.Errorf("failed to subscribe to peer identification events: %v", err)
return
}
defer sub.Close()
for {
select {
case <-gs.p.ctx.Done():
cabCloser, ok := gs.cab.(io.Closer)
if ok {
errClose := cabCloser.Close()
if errClose != nil {
log.Warnf("failed to close addr book: %v", errClose)
}
}
return
case ev := <-sub.Out():
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
if ev.SignedPeerRecord != nil {
cab, ok := peerstore.GetCertifiedAddrBook(gs.cab)
if ok {
ttl := peerstore.RecentlyConnectedAddrTTL
if gs.p.host.Network().Connectedness(ev.Peer) == network.Connected {
ttl = peerstore.ConnectedAddrTTL
}
_, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl)
if err != nil {
log.Warnf("failed to consume signed peer record: %v", err)
}
}
}
case event.EvtPeerConnectednessChanged:
if ev.Connectedness != network.Connected {
gs.cab.UpdateAddrs(ev.Peer, peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
}
}
}
}
}
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
// track the connection direction
outbound := false
conns := gs.p.host.Network().ConnsToPeer(p)
loop:
for _, c := range conns {
stat := c.Stat()
if stat.Limited {
continue
}
if stat.Direction == network.DirOutbound {
// only count the connection if it has a pubsub stream
for _, s := range c.GetStreams() {
if s.Protocol() == proto {
outbound = true
break loop
}
}
}
}
gs.outbound[p] = outbound
}
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
gs.tracer.RemovePeer(p)
delete(gs.peers, p)
for _, peers := range gs.mesh {
delete(peers, p)
}
for _, peers := range gs.fanout {
delete(peers, p)
}
delete(gs.gossip, p)
delete(gs.control, p)
delete(gs.outbound, p)
}
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := gs.p.topics[topic]
if !ok {
return false
}
fsPeers, gsPeers := 0, 0
// floodsub peers
for p := range tmap {
if !gs.feature(GossipSubFeatureMesh, gs.peers[p]) {
fsPeers++
}
}
// gossipsub peers
gsPeers = len(gs.mesh[topic])
if suggested == 0 {
suggested = gs.params.Dlo
}
if fsPeers+gsPeers >= suggested || gsPeers >= gs.params.Dhi {
return true
}
return false
}
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
_, direct := gs.direct[p]
if direct {
return AcceptAll
}
if gs.score.Score(p) < gs.graylistThreshold {
return AcceptNone
}
return gs.gate.AcceptFrom(p)
}
// PreValidation sends the IDONTWANT control messages to all the mesh
// peers. They need to be sent right before the validation because they
// should be seen by the peers as soon as possible.
func (gs *GossipSubRouter) PreValidation(msgs []*Message) {
tmids := make(map[string][]string)
for _, msg := range msgs {
if len(msg.GetData()) < gs.params.IDontWantMessageThreshold {
continue
}
topic := msg.GetTopic()
tmids[topic] = append(tmids[topic], gs.p.idGen.ID(msg))
}
for topic, mids := range tmids {
if len(mids) == 0 {
continue
}
// shuffle the messages got from the RPC envelope
shuffleStrings(mids)
// send IDONTWANT to all the mesh peers
for p := range gs.mesh[topic] {
// send to only peers that support IDONTWANT
if gs.feature(GossipSubFeatureIdontwant, gs.peers[p]) {
idontwant := []*pb.ControlIDontWant{{MessageIDs: mids}}
out := rpcWithControl(nil, nil, nil, nil, nil, idontwant)
gs.sendRPC(p, out, true)
}
}
}
}
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
ctl := rpc.GetControl()
if ctl == nil {
return
}
iwant := gs.handleIHave(rpc.from, ctl)
ihave := gs.handleIWant(rpc.from, ctl)
prune := gs.handleGraft(rpc.from, ctl)
gs.handlePrune(rpc.from, ctl)
gs.handleIDontWant(rpc.from, ctl)
if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 {
return
}
out := rpcWithControl(ihave, nil, iwant, nil, prune, nil)
gs.sendRPC(rpc.from, out, false)
}
func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
score := gs.score.Score(p)
if score < gs.gossipThreshold {
log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", p, score)
return nil
}
// IHAVE flood protection
gs.peerhave[p]++
if gs.peerhave[p] > gs.params.MaxIHaveMessages {
log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p])
return nil
}
if gs.iasked[p] >= gs.params.MaxIHaveLength {
log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p])
return nil
}
iwant := make(map[string]struct{})
for _, ihave := range ctl.GetIhave() {
topic := ihave.GetTopicID()
_, ok := gs.mesh[topic]
if !ok {
continue
}
if !gs.p.peerFilter(p, topic) {
continue
}
checkIwantMsgsLoop:
for msgIdx, mid := range ihave.GetMessageIDs() {
// prevent remote peer from sending too many msg_ids on a single IHAVE message
if msgIdx >= gs.params.MaxIHaveLength {
log.Debugf("IHAVE: peer %s has sent IHAVE on topic %s with too many messages (%d); ignoring remaining msgs", p, topic, len(ihave.MessageIDs))
break checkIwantMsgsLoop
}
if gs.p.seenMessage(mid) {
continue
}
iwant[mid] = struct{}{}
}
}
if len(iwant) == 0 {
return nil
}
iask := len(iwant)
if iask+gs.iasked[p] > gs.params.MaxIHaveLength {
iask = gs.params.MaxIHaveLength - gs.iasked[p]
}
log.Debugf("IHAVE: Asking for %d out of %d messages from %s", iask, len(iwant), p)
iwantlst := make([]string, 0, len(iwant))
for mid := range iwant {
iwantlst = append(iwantlst, mid)
}
// ask in random order
shuffleStrings(iwantlst)
// truncate to the messages we are actually asking for and update the iasked counter
iwantlst = iwantlst[:iask]
gs.iasked[p] += iask
gs.gossipTracer.AddPromise(p, iwantlst)
return []*pb.ControlIWant{{MessageIDs: iwantlst}}
}
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
// we don't respond to IWANT requests from any peer whose score is below the gossip threshold
score := gs.score.Score(p)
if score < gs.gossipThreshold {
log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", p, score)
return nil
}
ihave := make(map[string]*pb.Message)
for _, iwant := range ctl.GetIwant() {
for _, mid := range iwant.GetMessageIDs() {
msg, count, ok := gs.mcache.GetForPeer(mid, p)
if !ok {
continue
}
if !gs.p.peerFilter(p, msg.GetTopic()) {
continue
}
if count > gs.params.GossipRetransmission {
log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid)
continue
}
ihave[mid] = msg.Message
}
}
if len(ihave) == 0 {
return nil
}
log.Debugf("IWANT: Sending %d messages to %s", len(ihave), p)
msgs := make([]*pb.Message, 0, len(ihave))
for _, msg := range ihave {
msgs = append(msgs, msg)
}
return msgs
}
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
var prune []string
doPX := gs.doPX
score := gs.score.Score(p)
now := time.Now()
for _, graft := range ctl.GetGraft() {
topic := graft.GetTopicID()
if !gs.p.peerFilter(p, topic) {
continue
}
peers, ok := gs.mesh[topic]
if !ok {
// don't do PX when there is an unknown topic to avoid leaking our peers
doPX = false
// spam hardening: ignore GRAFTs for unknown topics
continue
}
// check if it is already in the mesh; if so do nothing (we might have concurrent grafting)
_, inMesh := peers[p]
if inMesh {
continue
}
// we don't GRAFT to/from direct peers; complain loudly if this happens
_, direct := gs.direct[p]
if direct {
log.Warnf("GRAFT: ignoring request from direct peer %s", p)
// this is possibly a bug from non-reciprocal configuration; send a PRUNE
prune = append(prune, topic)
// but don't PX
doPX = false
continue
}
// make sure we are not backing off that peer
expire, backoff := gs.backoff[topic][p]
if backoff && now.Before(expire) {
log.Debugf("GRAFT: ignoring backed off peer %s", p)
// add behavioural penalty
gs.score.AddPenalty(p, 1)
// no PX
doPX = false
// check the flood cutoff -- is the GRAFT coming too fast?
floodCutoff := expire.Add(gs.params.GraftFloodThreshold - gs.params.PruneBackoff)
if now.Before(floodCutoff) {
// extra penalty
gs.score.AddPenalty(p, 1)
}
// refresh the backoff
gs.addBackoff(p, topic, false)
prune = append(prune, topic)
continue
}
// check the score
if score < 0 {
// we don't GRAFT peers with negative score
log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", p, score, topic)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune = append(prune, topic)
// but we won't PX to them
doPX = false
// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
gs.addBackoff(p, topic, false)
continue
}
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
// from peers with outbound connections; this is a defensive check to restrict potential
// mesh takeover attacks combined with love bombing
if len(peers) >= gs.params.Dhi && !gs.outbound[p] {
prune = append(prune, topic)
gs.addBackoff(p, topic, false)
continue
}
log.Debugf("GRAFT: add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
}
if len(prune) == 0 {
return nil
}
cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune {
cprune = append(cprune, gs.makePrune(p, topic, doPX, false))
}
return cprune
}
func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
score := gs.score.Score(p)
for _, prune := range ctl.GetPrune() {
topic := prune.GetTopicID()
peers, ok := gs.mesh[topic]
if !ok {
continue
}
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
// is there a backoff specified by the peer? if so obey it.
backoff := prune.GetBackoff()
if backoff > 0 {
gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second)
} else {
gs.addBackoff(p, topic, false)
}
px := prune.GetPeers()
if len(px) > 0 {
// we ignore PX from peers with insufficient score
if score < gs.acceptPXThreshold {
log.Debugf("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]", p, score, topic)
continue
}
gs.pxConnect(px)
}
}
}
func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) {
if gs.unwanted[p] == nil {
gs.unwanted[p] = make(map[checksum]int)
}
// IDONTWANT flood protection
if gs.peerdontwant[p] >= gs.params.MaxIDontWantMessages {
log.Debugf("IDONWANT: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerdontwant[p])
return
}
gs.peerdontwant[p]++
// Remember all the unwanted message ids
for _, idontwant := range ctl.GetIdontwant() {
for _, mid := range idontwant.GetMessageIDs() {
gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL
}
}
}
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) {
backoff := gs.params.PruneBackoff
if isUnsubscribe {
backoff = gs.params.UnsubscribeBackoff
}
gs.doAddBackoff(p, topic, backoff)
}
func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
backoff, ok := gs.backoff[topic]
if !ok {
backoff = make(map[peer.ID]time.Time)
gs.backoff[topic] = backoff
}
expire := time.Now().Add(interval)
if backoff[p].Before(expire) {
backoff[p] = expire
}
}
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
if len(peers) > gs.params.PrunePeers {
shufflePeerInfo(peers)
peers = peers[:gs.params.PrunePeers]
}
toconnect := make([]connectInfo, 0, len(peers))
for _, pi := range peers {
p := peer.ID(pi.PeerID)
_, connected := gs.peers[p]
if connected {
continue
}
var spr *record.Envelope
if pi.SignedPeerRecord != nil {
// the peer sent us a signed record; ensure that it is valid
envelope, r, err := record.ConsumeEnvelope(pi.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
if err != nil {
log.Warnf("error unmarshalling peer record obtained through px: %s", err)
continue
}
rec, ok := r.(*peer.PeerRecord)
if !ok {
log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord")
continue
}
if rec.PeerID != p {
log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", rec.PeerID, p)
continue
}
spr = envelope
}
toconnect = append(toconnect, connectInfo{p, spr})
}
if len(toconnect) == 0 {
return
}
for _, ci := range toconnect {
select {
case gs.connect <- ci:
default:
log.Debugf("ignoring peer connection attempt; too many pending connections")
}
}
}
func (gs *GossipSubRouter) connector() {
for {
select {
case ci := <-gs.connect:
if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
continue
}
log.Debugf("connecting to %s", ci.p)
cab, ok := peerstore.GetCertifiedAddrBook(gs.cab)
if ok && ci.spr != nil {
_, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL)
if err != nil {
log.Debugf("error processing peer record: %s", err)
}
}
ctx, cancel := context.WithTimeout(gs.p.ctx, gs.params.ConnectionTimeout)
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p, Addrs: gs.cab.Addrs(ci.p)})
cancel()
if err != nil {
log.Debugf("error connecting to %s: %s", ci.p, err)
}
case <-gs.p.ctx.Done():
return
}
}
}
func (gs *GossipSubRouter) Publish(msg *Message) {
gs.mcache.Put(msg)
from := msg.ReceivedFrom
topic := msg.GetTopic()
tosend := make(map[peer.ID]struct{})
// any peers in the topic?
tmap, ok := gs.p.topics[topic]
if !ok {
return
}
if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
}
} else {
// direct peers
for p := range gs.direct {
_, inTopic := tmap[p]
if inTopic {
tosend[p] = struct{}{}
}
}
// floodsub peers
for p := range tmap {
if !gs.feature(GossipSubFeatureMesh, gs.peers[p]) && gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
}
// gossipsub peers
gmap, ok := gs.mesh[topic]
if !ok {
// we are not in the mesh for topic, use fanout peers
gmap, ok = gs.fanout[topic]
if !ok || len(gmap) == 0 {
// we don't have any, pick some with score above the publish threshold
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= gs.publishThreshold
})
if len(peers) > 0 {
gmap = peerListToMap(peers)
gs.fanout[topic] = gmap
}
}
gs.lastpub[topic] = time.Now().UnixNano()
}
for p := range gmap {
mid := gs.p.idGen.ID(msg)
// Check if it has already received an IDONTWANT for the message.
// If so, don't send it to the peer
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
continue
}
tosend[p] = struct{}{}
}
}
out := rpcWithMessages(msg.Message)
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
gs.sendRPC(pid, out, false)
}
}
func (gs *GossipSubRouter) Join(topic string) {
gmap, ok := gs.mesh[topic]
if ok {
return
}
log.Debugf("JOIN %s", topic)
gs.tracer.Join(topic)
gmap, ok = gs.fanout[topic]
if ok {
backoff := gs.backoff[topic]
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
for p := range gmap {
_, doBackOff := backoff[p]
if gs.score.Score(p) < 0 || doBackOff {
delete(gmap, p)
}
}
if len(gmap) < gs.params.D {
// we need more peers; eager, as this would get fixed in the next heartbeat
more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool {
// filter our current peers, direct peers, peers we are backing off, and
// peers with negative scores
_, inMesh := gmap[p]
_, direct := gs.direct[p]
_, doBackOff := backoff[p]
return !inMesh && !direct && !doBackOff && gs.score.Score(p) >= 0
})
for _, p := range more {
gmap[p] = struct{}{}
}
}
gs.mesh[topic] = gmap
delete(gs.fanout, topic)
delete(gs.lastpub, topic)
} else {
backoff := gs.backoff[topic]
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
// filter direct peers, peers we are backing off and peers with negative score
_, direct := gs.direct[p]
_, doBackOff := backoff[p]
return !direct && !doBackOff && gs.score.Score(p) >= 0
})
gmap = peerListToMap(peers)
gs.mesh[topic] = gmap
}
for p := range gmap {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
}
}
func (gs *GossipSubRouter) Leave(topic string) {
gmap, ok := gs.mesh[topic]
if !ok {
return
}
log.Debugf("LEAVE %s", topic)
gs.tracer.Leave(topic)
delete(gs.mesh, topic)
for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic, true)
// Add a backoff to this peer to prevent us from eagerly
// re-grafting this peer into our mesh if we rejoin this
// topic before the backoff period ends.
gs.addBackoff(p, topic, true)
}
}
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
graft := []*pb.ControlGraft{{TopicID: &topic}}
out := rpcWithControl(nil, nil, nil, graft, nil, nil)
gs.sendRPC(p, out, false)
}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string, isUnsubscribe bool) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX, isUnsubscribe)}
out := rpcWithControl(nil, nil, nil, nil, prune, nil)
gs.sendRPC(p, out, false)
}
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) {
// do we own the RPC?
own := false
// piggyback control message retries
ctl, ok := gs.control[p]
if ok {
out = copyRPC(out)
own = true
gs.piggybackControl(p, out, ctl)
delete(gs.control, p)
}
// piggyback gossip
ihave, ok := gs.gossip[p]
if ok {
if !own {
out = copyRPC(out)
own = true
}
gs.piggybackGossip(p, out, ihave)
delete(gs.gossip, p)
}
q, ok := gs.p.peers[p]
if !ok {
return
}
// If we're below the max message size, go ahead and send
if out.Size() < gs.p.maxMessageSize {
gs.doSendRPC(out, p, q, urgent)
return
}
// Potentially split the RPC into multiple RPCs that are below the max message size
outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out)
for _, rpc := range outRPCs {
if rpc.Size() > gs.p.maxMessageSize {
// This should only happen if a single message/control is above the maxMessageSize.
gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize))
continue
}
gs.doSendRPC(rpc, p, q, urgent)
}
}
func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
log.Debugf("dropping message to peer %s: %s", p, reason)
gs.tracer.DropRPC(rpc, p)
// push control messages that need to be retried
ctl := rpc.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
}
}
func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bool) {
var err error
if urgent {
err = q.UrgentPush(rpc, false)
} else {
err = q.Push(rpc, false)
}
if err != nil {
gs.doDropRPC(rpc, p, "queue full")
return
}
gs.tracer.SendRPC(rpc, p)
}
// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible.
// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs.
// If an RPC is too large and can't be split further (e.g. Message data is
// bigger than the RPC limit), then it will be returned as an oversized RPC.
// The caller should filter out oversized RPCs.
func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
if len(elems) == 0 {
return slice
}
if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit {
// Fast path: no merging needed and only one element
return append(slice, &elems[0])
}
out := slice
if len(out) == 0 {
out = append(out, &RPC{RPC: pb.RPC{}})
out[0].from = elems[0].from
}
for _, elem := range elems {
lastRPC := out[len(out)-1]
// Merge/Append publish messages
// TODO: Never merge messages. The current behavior is the same as the
// old behavior. In the future let's not merge messages. Since,
// it may increase message latency.
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
out = append(out, lastRPC)
}
}
// Merge/Append Subscriptions
for _, sub := range elem.GetSubscriptions() {
if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit {
lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub)
out = append(out, lastRPC)
}
}
// Merge/Append Control messages
if ctl := elem.GetControl(); ctl != nil {
if lastRPC.Control == nil {
lastRPC.Control = &pb.ControlMessage{}
if lastRPC.Size() > limit {
lastRPC.Control = nil
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, graft := range ctl.GetGraft() {
if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit {
lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft)
out = append(out, lastRPC)
}
}
for _, prune := range ctl.GetPrune() {
if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit {
lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune)
out = append(out, lastRPC)
}
}
for _, iwant := range ctl.GetIwant() {
if len(lastRPC.Control.Iwant) == 0 {
// Initialize with a single IWANT.
// For IWANTs we don't need more than a single one,
// since there are no topic IDs here.
newIWant := &pb.ControlIWant{}
if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit {
lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{newIWant},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, msgID := range iwant.GetMessageIDs() {
if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit {
lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
}
for _, ihave := range ctl.GetIhave() {
if len(lastRPC.Control.Ihave) == 0 ||
lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1].TopicID != ihave.TopicID {
// Start a new IHAVE if we are referencing a new topic ID
newIhave := &pb.ControlIHave{TopicID: ihave.TopicID}
if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit {
lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{newIhave},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, msgID := range ihave.GetMessageIDs() {
lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1]
if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit {
lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
}
}
}
return out
}
func (gs *GossipSubRouter) heartbeatTimer() {
time.Sleep(gs.params.HeartbeatInitialDelay)
select {
case gs.p.eval <- gs.heartbeat:
case <-gs.p.ctx.Done():
return
}
ticker := time.NewTicker(gs.params.HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case gs.p.eval <- gs.heartbeat:
case <-gs.p.ctx.Done():
return
}
case <-gs.p.ctx.Done():
return
}
}
}
func (gs *GossipSubRouter) heartbeat() {
start := time.Now()
defer func() {
if gs.params.SlowHeartbeatWarning > 0 {
slowWarning := time.Duration(gs.params.SlowHeartbeatWarning * float64(gs.params.HeartbeatInterval))
if dt := time.Since(start); dt > slowWarning {
log.Warnw("slow heartbeat", "took", dt)
}
}
}()
gs.heartbeatTicks++
tograft := make(map[peer.ID][]string)
toprune := make(map[peer.ID][]string)
noPX := make(map[peer.ID]bool)
// clean up expired backoffs
gs.clearBackoff()
// clean up iasked counters
gs.clearIHaveCounters()
// clean up IDONTWANT counters
gs.clearIDontWantCounters()
// apply IWANT request penalties
gs.applyIwantPenalties()
// ensure direct peers are connected
gs.directConnect()
// cache scores throughout the heartbeat
scores := make(map[peer.ID]float64)
score := func(p peer.ID) float64 {
s, ok := scores[p]
if !ok {
s = gs.score.Score(p)
scores[p] = s
}
return s
}
// maintain the mesh for topics we have joined
for topic, peers := range gs.mesh {
prunePeer := func(p peer.ID) {
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.addBackoff(p, topic, false)
topics := toprune[p]
toprune[p] = append(topics, topic)
}
graftPeer := func(p peer.ID) {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
topics := tograft[p]
tograft[p] = append(topics, topic)
}
// drop all peers with negative score, without PX
for p := range peers {
if score(p) < 0 {
log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, score(p), topic)
prunePeer(p)
noPX[p] = true
}
}
// do we have enough peers?
if l := len(peers); l < gs.params.Dlo {
backoff := gs.backoff[topic]
ineed := gs.params.D - l
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers, peers we are backing off, and peers with negative score
_, inMesh := peers[p]
_, doBackoff := backoff[p]
_, direct := gs.direct[p]
return !inMesh && !doBackoff && !direct && score(p) >= 0
})
for _, p := range plst {
graftPeer(p)
}
}
// do we have too many peers?
if len(peers) > gs.params.Dhi {
plst := peerMapToList(peers)
// sort by score (but shuffle first for the case we don't use the score)
shufflePeers(plst)
sort.Slice(plst, func(i, j int) bool {
return score(plst[i]) > score(plst[j])
})
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
shufflePeers(plst[gs.params.Dscore:])
// count the outbound peers we are keeping
outbound := 0
for _, p := range plst[:gs.params.D] {
if gs.outbound[p] {
outbound++
}
}
// if it's less than D_out, bubble up some outbound peers from the random selection
if outbound < gs.params.Dout {
rotate := func(i int) {
// rotate the plst to the right and put the ith peer in the front
p := plst[i]
for j := i; j > 0; j-- {
plst[j] = plst[j-1]
}
plst[0] = p
}
// first bubble up all outbound peers already in the selection to the front
if outbound > 0 {
ihave := outbound
for i := 1; i < gs.params.D && ihave > 0; i++ {
p := plst[i]
if gs.outbound[p] {
rotate(i)
ihave--
}
}
}
// now bubble up enough outbound peers outside the selection to the front
ineed := gs.params.Dout - outbound
for i := gs.params.D; i < len(plst) && ineed > 0; i++ {
p := plst[i]
if gs.outbound[p] {
rotate(i)
ineed--
}
}
}
// prune the excess peers
for _, p := range plst[gs.params.D:] {
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
prunePeer(p)
}
}
// do we have enough outboud peers?
if len(peers) >= gs.params.Dlo {
// count the outbound peers we have
outbound := 0
for p := range peers {
if gs.outbound[p] {
outbound++
}
}
// if it's less than D_out, select some peers with outbound connections and graft them
if outbound < gs.params.Dout {
ineed := gs.params.Dout - outbound
backoff := gs.backoff[topic]
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers, peers we are backing off, and peers with negative score
_, inMesh := peers[p]
_, doBackoff := backoff[p]
_, direct := gs.direct[p]
return !inMesh && !doBackoff && !direct && gs.outbound[p] && score(p) >= 0
})
for _, p := range plst {
graftPeer(p)
}
}
}
// should we try to improve the mesh with opportunistic grafting?
if gs.heartbeatTicks%gs.params.OpportunisticGraftTicks == 0 && len(peers) > 1 {
// Opportunistic grafting works as follows: we check the median score of peers in the
// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
// random with score over the median.
// The intention is to (slowly) improve an underperforming mesh by introducing good
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
// situations where we are stuck with poor peers and also recover from churn of good peers.
// now compute the median peer score in the mesh
plst := peerMapToList(peers)
sort.Slice(plst, func(i, j int) bool {
return score(plst[i]) < score(plst[j])
})
medianIndex := len(peers) / 2
medianScore := scores[plst[medianIndex]]
// if the median score is below the threshold, select a better peer (if any) and GRAFT
if medianScore < gs.opportunisticGraftThreshold {
backoff := gs.backoff[topic]
plst = gs.getPeers(topic, gs.params.OpportunisticGraftPeers, func(p peer.ID) bool {
_, inMesh := peers[p]
_, doBackoff := backoff[p]
_, direct := gs.direct[p]
return !inMesh && !doBackoff && !direct && score(p) > medianScore
})
for _, p := range plst {
log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", p, topic)
graftPeer(p)
}
}
}
// 2nd arg are mesh peers excluded from gossip. We already push
// messages to them, so its redundant to gossip IHAVEs.
gs.emitGossip(topic, peers)
}
// expire fanout for topics we haven't published to in a while
now := time.Now().UnixNano()
for topic, lastpub := range gs.lastpub {
if lastpub+int64(gs.params.FanoutTTL) < now {
delete(gs.fanout, topic)
delete(gs.lastpub, topic)
}
}
// maintain our fanout for topics we are publishing but we have not joined
for topic, peers := range gs.fanout {
// check whether our peers are still in the topic and have a score above the publish threshold
for p := range peers {
_, ok := gs.p.topics[topic][p]
if !ok || score(p) < gs.publishThreshold {
delete(peers, p)
}
}
// do we need more peers?
if len(peers) < gs.params.D {
ineed := gs.params.D - len(peers)
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers and peers with score above the publish threshold
_, inFanout := peers[p]
_, direct := gs.direct[p]
return !inFanout && !direct && score(p) >= gs.publishThreshold
})
for _, p := range plst {
peers[p] = struct{}{}
}
}
// 2nd arg are fanout peers excluded from gossip. We already push
// messages to them, so its redundant to gossip IHAVEs.
gs.emitGossip(topic, peers)
}
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
gs.sendGraftPrune(tograft, toprune, noPX)
// flush all pending gossip that wasn't piggybacked above
gs.flush()
// advance the message history window
gs.mcache.Shift()
}
func (gs *GossipSubRouter) clearIHaveCounters() {
if len(gs.peerhave) > 0 {
// throw away the old map and make a new one
gs.peerhave = make(map[peer.ID]int)
}
if len(gs.iasked) > 0 {
// throw away the old map and make a new one
gs.iasked = make(map[peer.ID]int)
}
}
func (gs *GossipSubRouter) clearIDontWantCounters() {
if len(gs.peerdontwant) > 0 {
// throw away the old map and make a new one
gs.peerdontwant = make(map[peer.ID]int)
}
// decrement TTLs of all the IDONTWANTs and delete it from the cache when it reaches zero
for _, mids := range gs.unwanted {
for mid := range mids {
mids[mid]--
if mids[mid] == 0 {
delete(mids, mid)
}
}
}
}
func (gs *GossipSubRouter) applyIwantPenalties() {
for p, count := range gs.gossipTracer.GetBrokenPromises() {
log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count)
gs.score.AddPenalty(p, count)
}
}
func (gs *GossipSubRouter) clearBackoff() {
// we only clear once every 15 ticks to avoid iterating over the map(s) too much
if gs.heartbeatTicks%15 != 0 {
return
}
now := time.Now()
for topic, backoff := range gs.backoff {
for p, expire := range backoff {
// add some slack time to the expiration
// https://github.com/libp2p/specs/pull/289
if expire.Add(2 * GossipSubHeartbeatInterval).Before(now) {
delete(backoff, p)
}
}
if len(backoff) == 0 {
delete(gs.backoff, topic)
}
}
}
func (gs *GossipSubRouter) directConnect() {
// we donly do this every some ticks to allow pending connections to complete and account
// for restarts/downtime
if gs.heartbeatTicks%gs.params.DirectConnectTicks != 0 {
return
}
var toconnect []peer.ID
for p := range gs.direct {
_, connected := gs.peers[p]
if !connected {
toconnect = append(toconnect, p)
}
}
if len(toconnect) > 0 {
go func() {
for _, p := range toconnect {
gs.connect <- connectInfo{p: p}
}
}()
}
}
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
for p, topics := range tograft {
graft := make([]*pb.ControlGraft, 0, len(topics))
for _, topic := range topics {
// copy topic string here since
// the reference to the string
// topic here changes with every
// iteration of the slice.
copiedID := topic
graft = append(graft, &pb.ControlGraft{TopicID: &copiedID})
}
var prune []*pb.ControlPrune
pruning, ok := toprune[p]
if ok {
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
}
}
out := rpcWithControl(nil, nil, nil, graft, prune, nil)
gs.sendRPC(p, out, false)
}
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
}
out := rpcWithControl(nil, nil, nil, nil, prune, nil)
gs.sendRPC(p, out, false)
}
}
// emitGossip emits IHAVE gossip advertising items in the message cache window
// of this topic.
func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}) {
mids := gs.mcache.GetGossipIDs(topic)
if len(mids) == 0 {
return
}
// shuffle to emit in random order
shuffleStrings(mids)
// if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list
if len(mids) > gs.params.MaxIHaveLength {
// we do the truncation (with shuffling) per peer below
log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len(mids))
}
// Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy.
// First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set.
// We also exclude direct peers, as there is no reason to emit gossip to them.
peers := make([]peer.ID, 0, len(gs.p.topics[topic]))
for p := range gs.p.topics[topic] {
_, inExclude := exclude[p]
_, direct := gs.direct[p]
if !inExclude && !direct && gs.feature(GossipSubFeatureMesh, gs.peers[p]) && gs.score.Score(p) >= gs.gossipThreshold {
peers = append(peers, p)
}
}
target := gs.params.Dlazy
factor := int(gs.params.GossipFactor * float64(len(peers)))
if factor > target {
target = factor
}
if target > len(peers) {
target = len(peers)
} else {
shufflePeers(peers)
}
peers = peers[:target]
// Emit the IHAVE gossip to the selected peers.
for _, p := range peers {
peerMids := mids
if len(mids) > gs.params.MaxIHaveLength {
// we do this per peer so that we emit a different set for each peer.
// we have enough redundancy in the system that this will significantly increase the message
// coverage when we do truncate.
peerMids = make([]string, gs.params.MaxIHaveLength)
shuffleStrings(mids)
copy(peerMids, mids)
}
gs.enqueueGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: peerMids})
}
}
func (gs *GossipSubRouter) flush() {
// send gossip first, which will also piggyback pending control
for p, ihave := range gs.gossip {
delete(gs.gossip, p)
out := rpcWithControl(nil, ihave, nil, nil, nil, nil)
gs.sendRPC(p, out, false)
}
// send the remaining control messages that wasn't merged with gossip
for p, ctl := range gs.control {
delete(gs.control, p)
out := rpcWithControl(nil, nil, nil, ctl.Graft, ctl.Prune, nil)
gs.sendRPC(p, out, false)
}
}
func (gs *GossipSubRouter) enqueueGossip(p peer.ID, ihave *pb.ControlIHave) {
gossip := gs.gossip[p]
gossip = append(gossip, ihave)
gs.gossip[p] = gossip
}
func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) {
ctl := out.GetControl()
if ctl == nil {
ctl = &pb.ControlMessage{}
out.Control = ctl
}
ctl.Ihave = ihave
}
func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
// remove IHAVE/IWANT/IDONTWANT from control message, gossip is not retried
ctl.Ihave = nil
ctl.Iwant = nil
ctl.Idontwant = nil
if ctl.Graft != nil || ctl.Prune != nil {
gs.control[p] = ctl
}
}
func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) {
// check control message for staleness first
var tograft []*pb.ControlGraft
var toprune []*pb.ControlPrune
for _, graft := range ctl.GetGraft() {
topic := graft.GetTopicID()
peers, ok := gs.mesh[topic]
if !ok {
continue
}
_, ok = peers[p]
if ok {
tograft = append(tograft, graft)
}
}
for _, prune := range ctl.GetPrune() {
topic := prune.GetTopicID()
peers, ok := gs.mesh[topic]
if !ok {
toprune = append(toprune, prune)
continue
}
_, ok = peers[p]
if !ok {
toprune = append(toprune, prune)
}
}
if len(tograft) == 0 && len(toprune) == 0 {
return
}
xctl := out.Control
if xctl == nil {
xctl = &pb.ControlMessage{}
out.Control = xctl
}
if len(tograft) > 0 {
xctl.Graft = append(xctl.Graft, tograft...)
}
if len(toprune) > 0 {
xctl.Prune = append(xctl.Prune, toprune...)
}
}
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune {
if !gs.feature(GossipSubFeaturePX, gs.peers[p]) {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
}
backoff := uint64(gs.params.PruneBackoff / time.Second)
if isUnsubscribe {
backoff = uint64(gs.params.UnsubscribeBackoff / time.Second)
}
var px []*pb.PeerInfo
if doPX {
// select peers for Peer eXchange
peers := gs.getPeers(topic, gs.params.PrunePeers, func(xp peer.ID) bool {
return p != xp && gs.score.Score(xp) >= 0
})
cab, ok := peerstore.GetCertifiedAddrBook(gs.cab)
px = make([]*pb.PeerInfo, 0, len(peers))
for _, p := range peers {
// see if we have a signed peer record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through px anyway.
var recordBytes []byte
if ok {
spr := cab.GetPeerRecord(p)
var err error
if spr != nil {
recordBytes, err = spr.Marshal()
if err != nil {
log.Warnf("error marshaling signed peer record for %s: %s", p, err)
}
}
}
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes})
}
}
return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff}
}
func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
tmap, ok := gs.p.topics[topic]
if !ok {
return nil
}
peers := make([]peer.ID, 0, len(tmap))
for p := range tmap {
if gs.feature(GossipSubFeatureMesh, gs.peers[p]) && filter(p) && gs.p.peerFilter(p, topic) {
peers = append(peers, p)
}
}
shufflePeers(peers)
if count > 0 && len(peers) > count {
peers = peers[:count]
}
return peers
}
// WithDefaultTagTracer returns the tag tracer of the GossipSubRouter as a PubSub option.
// This is useful for cases where the GossipSubRouter is instantiated externally, and is
// injected into the GossipSub constructor as a dependency. This allows the tag tracer to be
// also injected into the GossipSub constructor as a PubSub option dependency.
func (gs *GossipSubRouter) WithDefaultTagTracer() Option {
return WithRawTracer(gs.tagTracer)
}
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
pmap := make(map[peer.ID]struct{})
for _, p := range peers {
pmap[p] = struct{}{}
}
return pmap
}
func peerMapToList(peers map[peer.ID]struct{}) []peer.ID {
plst := make([]peer.ID, 0, len(peers))
for p := range peers {
plst = append(plst, p)
}
return plst
}
func shufflePeers(peers []peer.ID) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
}
}
func shufflePeerInfo(peers []*pb.PeerInfo) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
}
}
func shuffleStrings(lst []string) {
for i := range lst {
j := rand.Intn(i + 1)
lst[i], lst[j] = lst[j], lst[i]
}
}
func computeChecksum(mid string) checksum {
var cs checksum
if len(mid) > 32 || len(mid) == 0 {
cs.payload = sha256.Sum256([]byte(mid))
} else {
cs.length = uint8(copy(cs.payload[:], mid))
}
return cs
}