mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 05:13:07 +00:00
In [drand](https://github.com/drand/drand) we have a gossipsub relay to allow users to subscribe to getting random values over pubsub. We want to support pure gossip relays who relay from a relay. For this we need direct peering agreements and want to mitigate the possibility of "missing" randomness messages by ensuring the direct connect ticks period is less than the period between updates. This PR simply adds a new functional option allowing us to set the direct connect ticks value without modifying the global variable.
1813 lines
52 KiB
Go
1813 lines
52 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"sort"
|
|
"time"
|
|
|
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
|
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/libp2p/go-libp2p-core/peerstore"
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
|
"github.com/libp2p/go-libp2p-core/record"
|
|
)
|
|
|
|
const (
|
|
// GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol.
|
|
// It is advertised along with GossipSubID_v11 for backwards compatibility.
|
|
GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")
|
|
|
|
// GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol.
|
|
// See the spec for details about how v1.1.0 compares to v1.0.0:
|
|
// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
|
|
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
|
|
)
|
|
|
|
var (
|
|
// overlay parameters.
|
|
|
|
// GossipSubD sets the optimal degree for a GossipSub topic mesh. For example, if GossipSubD == 6,
|
|
// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
|
|
// GossipSubD should be set somewhere between GossipSubDlo and GossipSubDhi.
|
|
GossipSubD = 6
|
|
|
|
// GossipSubDlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
|
|
// If we have fewer than GossipSubDlo peers, we will attempt to graft some more into the mesh at
|
|
// the next heartbeat.
|
|
GossipSubDlo = 5
|
|
|
|
// GossipSubDhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
|
|
// If we have more than GossipSubDhi peers, we will select some to prune from the mesh at the next heartbeat.
|
|
GossipSubDhi = 12
|
|
|
|
// GossipSubDscore affects how peers are selected when pruning a mesh due to over subscription.
|
|
// At least GossipSubDscore of the retained peers will be high-scoring, while the remainder are
|
|
// chosen randomly.
|
|
GossipSubDscore = 4
|
|
|
|
// GossipSubDout sets the quota for the number of outbound connections to maintain in a topic mesh.
|
|
// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
|
|
// to at least GossipSubDout of the survivor peers. This prevents sybil attackers from overwhelming
|
|
// our mesh with incoming connections.
|
|
//
|
|
// GossipSubDout must be set below GossipSubDlo, and must not exceed GossipSubD / 2.
|
|
GossipSubDout = 2
|
|
|
|
// gossip parameters
|
|
|
|
// GossipSubHistoryLength controls the size of the message cache used for gossip.
|
|
// The message cache will remember messages for GossipSubHistoryLength heartbeats.
|
|
GossipSubHistoryLength = 5
|
|
|
|
// GossipSubHistoryGossip controls how many cached message ids we will advertise in
|
|
// IHAVE gossip messages. When asked for our seen message IDs, we will return
|
|
// only those from the most recent GossipSubHistoryGossip heartbeats. The slack between
|
|
// GossipSubHistoryGossip and GossipSubHistoryLength allows us to avoid advertising messages
|
|
// that will be expired by the time they're requested.
|
|
//
|
|
// GossipSubHistoryGossip must be less than or equal to GossipSubHistoryLength to
|
|
// avoid a runtime panic.
|
|
GossipSubHistoryGossip = 3
|
|
|
|
// GossipSubDlazy affects how many peers we will emit gossip to at each heartbeat.
|
|
// We will send gossip to at least GossipSubDlazy peers outside our mesh. The actual
|
|
// number may be more, depending on GossipSubGossipFactor and how many peers we're
|
|
// connected to.
|
|
GossipSubDlazy = 6
|
|
|
|
// GossipSubGossipFactor affects how many peers we will emit gossip to at each heartbeat.
|
|
// We will send gossip to GossipSubGossipFactor * (total number of non-mesh peers), or
|
|
// GossipSubDlazy, whichever is greater.
|
|
GossipSubGossipFactor = 0.25
|
|
|
|
// GossipSubGossipRetransmission controls how many times we will allow a peer to request
|
|
// the same message id through IWANT gossip before we start ignoring them. This is designed
|
|
// to prevent peers from spamming us with requests and wasting our resources.
|
|
GossipSubGossipRetransmission = 3
|
|
|
|
// heartbeat interval
|
|
|
|
// GossipSubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
|
|
// after the router is initialized.
|
|
GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
|
|
|
|
// GossipSubHeartbeatInterval controls the time between heartbeats.
|
|
GossipSubHeartbeatInterval = 1 * time.Second
|
|
|
|
// GossipSubFanoutTTL controls how long we keep track of the fanout state. If it's been
|
|
// GossipSubFanoutTTL since we've published to a topic that we're not subscribed to,
|
|
// we'll delete the fanout map for that topic.
|
|
GossipSubFanoutTTL = 60 * time.Second
|
|
|
|
// GossipSubPrunePeers controls the number of peers to include in prune Peer eXchange.
|
|
// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
|
|
// send them signed peer records for up to GossipSubPrunePeers other peers that we
|
|
// know of.
|
|
GossipSubPrunePeers = 16
|
|
|
|
// GossipSubPruneBackoff controls the backoff time for pruned peers. This is how long
|
|
// a peer must wait before attempting to graft into our mesh again after being pruned.
|
|
// When pruning a peer, we send them our value of GossipSubPruneBackoff so they know
|
|
// the minimum time to wait. Peers running older versions may not send a backoff time,
|
|
// so if we receive a prune message without one, we will wait at least GossipSubPruneBackoff
|
|
// before attempting to re-graft.
|
|
GossipSubPruneBackoff = time.Minute
|
|
|
|
// GossipSubConnectors controls the number of active connection attempts for peers obtained through PX.
|
|
GossipSubConnectors = 8
|
|
|
|
// GossipSubMaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
|
|
GossipSubMaxPendingConnections = 128
|
|
|
|
// GossipSubConnectionTimeout controls the timeout for connection attempts.
|
|
GossipSubConnectionTimeout = 30 * time.Second
|
|
|
|
// GossipSubDirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
|
|
// that are not currently connected.
|
|
GossipSubDirectConnectTicks uint64 = 300
|
|
|
|
// GossipSubDirectConnectInitialDelay is the initial delay before opening connections to direct peers
|
|
GossipSubDirectConnectInitialDelay = time.Second
|
|
|
|
// GossipSubOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
|
|
// with opportunistic grafting. Every GossipSubOpportunisticGraftTicks we will attempt to select some
|
|
// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
|
|
// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
|
|
GossipSubOpportunisticGraftTicks uint64 = 60
|
|
|
|
// GossipSubOpportunisticGraftPeers is the number of peers to opportunistically graft.
|
|
GossipSubOpportunisticGraftPeers = 2
|
|
|
|
// If a GRAFT comes before GossipSubGraftFloodThreshold has elapsed since the last PRUNE,
|
|
// then there is an extra score penalty applied to the peer through P7.
|
|
GossipSubGraftFloodThreshold = 10 * time.Second
|
|
|
|
// GossipSubMaxIHaveLength is the maximum number of messages to include in an IHAVE message.
|
|
// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
|
|
// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
|
|
// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats;
|
|
// with the defaults this is 1666 messages/s.
|
|
GossipSubMaxIHaveLength = 5000
|
|
|
|
// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
|
|
GossipSubMaxIHaveMessages = 10
|
|
|
|
// Time to wait for a message requested through IWANT following an IHAVE advertisement.
|
|
// If the message is not received within this window, a broken promise is declared and
|
|
// the router may apply bahavioural penalties.
|
|
GossipSubIWantFollowupTime = 3 * time.Second
|
|
)
|
|
|
|
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
|
|
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
|
rt := &GossipSubRouter{
|
|
peers: make(map[peer.ID]protocol.ID),
|
|
mesh: make(map[string]map[peer.ID]struct{}),
|
|
fanout: make(map[string]map[peer.ID]struct{}),
|
|
lastpub: make(map[string]int64),
|
|
gossip: make(map[peer.ID][]*pb.ControlIHave),
|
|
control: make(map[peer.ID]*pb.ControlMessage),
|
|
backoff: make(map[string]map[peer.ID]time.Time),
|
|
peerhave: make(map[peer.ID]int),
|
|
iasked: make(map[peer.ID]int),
|
|
outbound: make(map[peer.ID]bool),
|
|
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
|
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
|
|
|
// these are configured per router to allow variation in tests
|
|
D: GossipSubD,
|
|
Dlo: GossipSubDlo,
|
|
Dhi: GossipSubDhi,
|
|
Dscore: GossipSubDscore,
|
|
Dout: GossipSubDout,
|
|
Dlazy: GossipSubDlazy,
|
|
|
|
// these must be pulled in to resolve races in tests... sigh.
|
|
directConnectTicks: GossipSubDirectConnectTicks,
|
|
opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
|
|
|
|
fanoutTTL: GossipSubFanoutTTL,
|
|
|
|
tagTracer: newTagTracer(h.ConnManager()),
|
|
}
|
|
|
|
// use the withInternalTracer option to hook up the tag tracer
|
|
opts = append(opts, withInternalTracer(rt.tagTracer))
|
|
return NewPubSub(ctx, h, rt, opts...)
|
|
}
|
|
|
|
// WithPeerScore is a gossipsub router option that enables peer scoring.
|
|
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option {
|
|
return func(ps *PubSub) error {
|
|
gs, ok := ps.rt.(*GossipSubRouter)
|
|
if !ok {
|
|
return fmt.Errorf("pubsub router is not gossipsub")
|
|
}
|
|
|
|
// sanity check: validate the score parameters
|
|
err := params.validate()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// sanity check: validate the threshold values
|
|
err = thresholds.validate()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
gs.score = newPeerScore(params)
|
|
gs.gossipThreshold = thresholds.GossipThreshold
|
|
gs.publishThreshold = thresholds.PublishThreshold
|
|
gs.graylistThreshold = thresholds.GraylistThreshold
|
|
gs.acceptPXThreshold = thresholds.AcceptPXThreshold
|
|
gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold
|
|
|
|
gs.gossipTracer = newGossipTracer()
|
|
|
|
// hook the tracer
|
|
if ps.tracer != nil {
|
|
ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer)
|
|
} else {
|
|
ps.tracer = &pubsubTracer{
|
|
internal: []internalTracer{gs.score, gs.gossipTracer},
|
|
pid: ps.host.ID(),
|
|
msgID: ps.msgID,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithFloodPublish is a gossipsub router option that enables flood publishing.
|
|
// When this is enabled, published messages are forwarded to all peers with score >=
|
|
// to publishThreshold
|
|
func WithFloodPublish(floodPublish bool) Option {
|
|
return func(ps *PubSub) error {
|
|
gs, ok := ps.rt.(*GossipSubRouter)
|
|
if !ok {
|
|
return fmt.Errorf("pubsub router is not gossipsub")
|
|
}
|
|
|
|
gs.floodPublish = floodPublish
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE.
|
|
// This should generally be enabled in bootstrappers and well connected/trusted nodes
|
|
// used for bootstrapping.
|
|
func WithPeerExchange(doPX bool) Option {
|
|
return func(ps *PubSub) error {
|
|
gs, ok := ps.rt.(*GossipSubRouter)
|
|
if !ok {
|
|
return fmt.Errorf("pubsub router is not gossipsub")
|
|
}
|
|
|
|
gs.doPX = doPX
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithDirectPeers is a gossipsub router option that specifies peers with direct
|
|
// peering agreements. These peers are connected outside of the mesh, with all (valid)
|
|
// message unconditionally forwarded to them. The router will maintain open connections
|
|
// to these peers. Note that the peering agreement should be reciprocal with direct peers
|
|
// symmetrically configured at both ends.
|
|
func WithDirectPeers(pis []peer.AddrInfo) Option {
|
|
return func(ps *PubSub) error {
|
|
gs, ok := ps.rt.(*GossipSubRouter)
|
|
if !ok {
|
|
return fmt.Errorf("pubsub router is not gossipsub")
|
|
}
|
|
|
|
direct := make(map[peer.ID]struct{})
|
|
for _, pi := range pis {
|
|
direct[pi.ID] = struct{}{}
|
|
ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
|
|
}
|
|
|
|
gs.direct = direct
|
|
|
|
if gs.tagTracer != nil {
|
|
gs.tagTracer.direct = direct
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithDirectConnectTicks is a gossipsub router option that sets the number of
|
|
// heartbeat ticks between attempting to reconnect direct peers that are not
|
|
// currently connected. A "tick" is based on the heartbeat interval, which is
|
|
// 1s by default. The default value for direct connect ticks is 300.
|
|
func WithDirectConnectTicks(t uint64) Option {
|
|
return func(ps *PubSub) error {
|
|
gs, ok := ps.rt.(*GossipSubRouter)
|
|
if !ok {
|
|
return fmt.Errorf("pubsub router is not gossipsub")
|
|
}
|
|
gs.directConnectTicks = t
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// GossipSubRouter is a router that implements the gossipsub protocol.
|
|
// For each topic we have joined, we maintain an overlay through which
|
|
// messages flow; this is the mesh map.
|
|
// For each topic we publish to without joining, we maintain a list of peers
|
|
// to use for injecting our messages in the overlay with stable routes; this
|
|
// is the fanout map. Fanout peer lists are expired if we don't publish any
|
|
// messages to their topic for GossipSubFanoutTTL.
|
|
type GossipSubRouter struct {
|
|
p *PubSub
|
|
peers map[peer.ID]protocol.ID // peer protocols
|
|
direct map[peer.ID]struct{} // direct peers
|
|
mesh map[string]map[peer.ID]struct{} // topic meshes
|
|
fanout map[string]map[peer.ID]struct{} // topic fanout
|
|
lastpub map[string]int64 // last publish time for fanout topics
|
|
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
|
|
control map[peer.ID]*pb.ControlMessage // pending control messages
|
|
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
|
|
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
|
|
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
|
|
backoff map[string]map[peer.ID]time.Time // prune backoff
|
|
connect chan connectInfo // px connection requests
|
|
|
|
mcache *MessageCache
|
|
tracer *pubsubTracer
|
|
score *peerScore
|
|
gossipTracer *gossipTracer
|
|
tagTracer *tagTracer
|
|
|
|
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
|
|
// nodes.
|
|
doPX bool
|
|
|
|
// threshold for accepting PX from a peer; this should be positive and limited to scores
|
|
// attainable by bootstrappers and trusted nodes
|
|
acceptPXThreshold float64
|
|
|
|
// threshold for peer score to emit/accept gossip
|
|
// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
|
|
// When there is no score, this value is 0.
|
|
gossipThreshold float64
|
|
|
|
// flood publish score threshold; we only publish to peers with score >= to the threshold
|
|
// when using flood publishing or the peer is a fanout or floodsub peer.
|
|
publishThreshold float64
|
|
|
|
// threshold for peer score before we graylist the peer and silently ignore its RPCs
|
|
graylistThreshold float64
|
|
|
|
// threshold for median peer score before triggering opportunistic grafting
|
|
opportunisticGraftThreshold float64
|
|
|
|
// whether to use flood publishing
|
|
floodPublish bool
|
|
|
|
// number of heartbeats since the beginning of time; this allows us to amortize some resource
|
|
// clean up -- eg backoff clean up.
|
|
heartbeatTicks uint64
|
|
|
|
// overly parameter "constants"
|
|
// these are pulled from their global value or else the race detector is angry on travis
|
|
// it also allows us to change them per peer in tests, which is a plus
|
|
D, Dlo, Dhi, Dscore, Dout, Dlazy int
|
|
|
|
// tick "constants" for triggering direct connect and opportunistic grafting
|
|
// these are pulled from their global value or else the race detector is angry on travis
|
|
directConnectTicks, opportunisticGraftTicks uint64
|
|
|
|
// fanout expiry ttl "constant"
|
|
// this is pulled from its global value or else the race detector is angry on travis
|
|
fanoutTTL time.Duration
|
|
}
|
|
|
|
type connectInfo struct {
|
|
p peer.ID
|
|
spr *record.Envelope
|
|
}
|
|
|
|
func (gs *GossipSubRouter) Protocols() []protocol.ID {
|
|
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) Attach(p *PubSub) {
|
|
gs.p = p
|
|
gs.tracer = p.tracer
|
|
|
|
// start the scoring
|
|
gs.score.Start(gs)
|
|
|
|
// and the gossip tracing
|
|
gs.gossipTracer.Start(gs)
|
|
|
|
// and the tracer for connmgr tags
|
|
gs.tagTracer.Start(gs)
|
|
|
|
// start using the same msg ID function as PubSub for caching messages.
|
|
gs.mcache.SetMsgIdFn(p.msgID)
|
|
|
|
// start the heartbeat
|
|
go gs.heartbeatTimer()
|
|
|
|
// start the PX connectors
|
|
for i := 0; i < GossipSubConnectors; i++ {
|
|
go gs.connector()
|
|
}
|
|
|
|
// connect to direct peers
|
|
if len(gs.direct) > 0 {
|
|
go func() {
|
|
if GossipSubDirectConnectInitialDelay > 0 {
|
|
time.Sleep(GossipSubDirectConnectInitialDelay)
|
|
}
|
|
for p := range gs.direct {
|
|
gs.connect <- connectInfo{p: p}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
|
|
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
|
|
gs.tracer.AddPeer(p, proto)
|
|
gs.peers[p] = proto
|
|
|
|
// track the connection direction
|
|
outbound := false
|
|
conns := gs.p.host.Network().ConnsToPeer(p)
|
|
loop:
|
|
for _, c := range conns {
|
|
if c.Stat().Direction == network.DirOutbound {
|
|
// only count the connection if it has a pubsub stream
|
|
for _, s := range c.GetStreams() {
|
|
if s.Protocol() == proto {
|
|
outbound = true
|
|
break loop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
gs.outbound[p] = outbound
|
|
}
|
|
|
|
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
|
|
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
|
|
gs.tracer.RemovePeer(p)
|
|
delete(gs.peers, p)
|
|
for _, peers := range gs.mesh {
|
|
delete(peers, p)
|
|
}
|
|
for _, peers := range gs.fanout {
|
|
delete(peers, p)
|
|
}
|
|
delete(gs.gossip, p)
|
|
delete(gs.control, p)
|
|
delete(gs.outbound, p)
|
|
}
|
|
|
|
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|
// check all peers in the topic
|
|
tmap, ok := gs.p.topics[topic]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
fsPeers, gsPeers := 0, 0
|
|
// floodsub peers
|
|
for p := range tmap {
|
|
if gs.peers[p] == FloodSubID {
|
|
fsPeers++
|
|
}
|
|
}
|
|
|
|
// gossipsub peers
|
|
gsPeers = len(gs.mesh[topic])
|
|
|
|
if suggested == 0 {
|
|
suggested = gs.Dlo
|
|
}
|
|
|
|
if fsPeers+gsPeers >= suggested || gsPeers >= gs.Dhi {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) bool {
|
|
_, direct := gs.direct[p]
|
|
return direct || gs.score.Score(p) >= gs.graylistThreshold
|
|
}
|
|
|
|
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
|
|
ctl := rpc.GetControl()
|
|
if ctl == nil {
|
|
return
|
|
}
|
|
|
|
iwant := gs.handleIHave(rpc.from, ctl)
|
|
ihave := gs.handleIWant(rpc.from, ctl)
|
|
prune := gs.handleGraft(rpc.from, ctl)
|
|
gs.handlePrune(rpc.from, ctl)
|
|
|
|
if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 {
|
|
return
|
|
}
|
|
|
|
out := rpcWithControl(ihave, nil, iwant, nil, prune)
|
|
gs.sendRPC(rpc.from, out)
|
|
}
|
|
|
|
func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
|
|
// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
|
|
score := gs.score.Score(p)
|
|
if score < gs.gossipThreshold {
|
|
log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", p, score)
|
|
return nil
|
|
}
|
|
|
|
// IHAVE flood protection
|
|
gs.peerhave[p]++
|
|
if gs.peerhave[p] > GossipSubMaxIHaveMessages {
|
|
log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p])
|
|
return nil
|
|
}
|
|
|
|
if gs.iasked[p] >= GossipSubMaxIHaveLength {
|
|
log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p])
|
|
return nil
|
|
}
|
|
|
|
iwant := make(map[string]struct{})
|
|
for _, ihave := range ctl.GetIhave() {
|
|
topic := ihave.GetTopicID()
|
|
_, ok := gs.mesh[topic]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
for _, mid := range ihave.GetMessageIDs() {
|
|
if gs.p.seenMessage(mid) {
|
|
continue
|
|
}
|
|
iwant[mid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
if len(iwant) == 0 {
|
|
return nil
|
|
}
|
|
|
|
iask := len(iwant)
|
|
if iask+gs.iasked[p] > GossipSubMaxIHaveLength {
|
|
iask = GossipSubMaxIHaveLength - gs.iasked[p]
|
|
}
|
|
|
|
log.Debugf("IHAVE: Asking for %d out of %d messages from %s", iask, len(iwant), p)
|
|
|
|
iwantlst := make([]string, 0, len(iwant))
|
|
for mid := range iwant {
|
|
iwantlst = append(iwantlst, mid)
|
|
}
|
|
|
|
// ask in random order
|
|
shuffleStrings(iwantlst)
|
|
|
|
// truncate to the messages we are actually asking for and update the iasked counter
|
|
iwantlst = iwantlst[:iask]
|
|
gs.iasked[p] += iask
|
|
|
|
gs.gossipTracer.AddPromise(p, iwantlst)
|
|
|
|
return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
|
|
// we don't respond to IWANT requests from any peer whose score is below the gossip threshold
|
|
score := gs.score.Score(p)
|
|
if score < gs.gossipThreshold {
|
|
log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", p, score)
|
|
return nil
|
|
}
|
|
|
|
ihave := make(map[string]*pb.Message)
|
|
for _, iwant := range ctl.GetIwant() {
|
|
for _, mid := range iwant.GetMessageIDs() {
|
|
msg, count, ok := gs.mcache.GetForPeer(mid, p)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if count > GossipSubGossipRetransmission {
|
|
log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid)
|
|
continue
|
|
}
|
|
|
|
ihave[mid] = msg
|
|
}
|
|
}
|
|
|
|
if len(ihave) == 0 {
|
|
return nil
|
|
}
|
|
|
|
log.Debugf("IWANT: Sending %d messages to %s", len(ihave), p)
|
|
|
|
msgs := make([]*pb.Message, 0, len(ihave))
|
|
for _, msg := range ihave {
|
|
msgs = append(msgs, msg)
|
|
}
|
|
|
|
return msgs
|
|
}
|
|
|
|
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
|
|
var prune []string
|
|
|
|
doPX := gs.doPX
|
|
score := gs.score.Score(p)
|
|
now := time.Now()
|
|
|
|
for _, graft := range ctl.GetGraft() {
|
|
topic := graft.GetTopicID()
|
|
peers, ok := gs.mesh[topic]
|
|
if !ok {
|
|
// don't do PX when there is an unknown topic to avoid leaking our peers
|
|
doPX = false
|
|
// spam hardening: ignore GRAFTs for unknown topics
|
|
continue
|
|
}
|
|
|
|
// check if it is already in the mesh; if so do nothing (we might have concurrent grafting)
|
|
_, inMesh := peers[p]
|
|
if inMesh {
|
|
continue
|
|
}
|
|
|
|
// we don't GRAFT to/from direct peers; complain loudly if this happens
|
|
_, direct := gs.direct[p]
|
|
if direct {
|
|
log.Warnf("GRAFT: ignoring request from direct peer %s", p)
|
|
// this is possibly a bug from non-reciprocal configuration; send a PRUNE
|
|
prune = append(prune, topic)
|
|
// but don't PX
|
|
doPX = false
|
|
continue
|
|
}
|
|
|
|
// make sure we are not backing off that peer
|
|
expire, backoff := gs.backoff[topic][p]
|
|
if backoff && now.Before(expire) {
|
|
log.Debugf("GRAFT: ignoring backed off peer %s", p)
|
|
// add behavioural penalty
|
|
gs.score.AddPenalty(p, 1)
|
|
// no PX
|
|
doPX = false
|
|
// check the flood cutoff -- is the GRAFT coming too fast?
|
|
floodCutoff := expire.Add(GossipSubGraftFloodThreshold - GossipSubPruneBackoff)
|
|
if now.Before(floodCutoff) {
|
|
// extra penalty
|
|
gs.score.AddPenalty(p, 1)
|
|
}
|
|
// refresh the backoff
|
|
gs.addBackoff(p, topic)
|
|
prune = append(prune, topic)
|
|
continue
|
|
}
|
|
|
|
// check the score
|
|
if score < 0 {
|
|
// we don't GRAFT peers with negative score
|
|
log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", p, score, topic)
|
|
// we do send them PRUNE however, because it's a matter of protocol correctness
|
|
prune = append(prune, topic)
|
|
// but we won't PX to them
|
|
doPX = false
|
|
// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
|
|
gs.addBackoff(p, topic)
|
|
continue
|
|
}
|
|
|
|
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
|
|
// from peers with outbound connections; this is a defensive check to restrict potential
|
|
// mesh takeover attacks combined with love bombing
|
|
if len(peers) >= gs.Dhi && !gs.outbound[p] {
|
|
prune = append(prune, topic)
|
|
gs.addBackoff(p, topic)
|
|
continue
|
|
}
|
|
|
|
log.Debugf("GRAFT: add mesh link from %s in %s", p, topic)
|
|
gs.tracer.Graft(p, topic)
|
|
peers[p] = struct{}{}
|
|
}
|
|
|
|
if len(prune) == 0 {
|
|
return nil
|
|
}
|
|
|
|
cprune := make([]*pb.ControlPrune, 0, len(prune))
|
|
for _, topic := range prune {
|
|
cprune = append(cprune, gs.makePrune(p, topic, doPX))
|
|
}
|
|
|
|
return cprune
|
|
}
|
|
|
|
func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
|
|
score := gs.score.Score(p)
|
|
|
|
for _, prune := range ctl.GetPrune() {
|
|
topic := prune.GetTopicID()
|
|
peers, ok := gs.mesh[topic]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
|
|
gs.tracer.Prune(p, topic)
|
|
delete(peers, p)
|
|
// is there a backoff specified by the peer? if so obey it.
|
|
backoff := prune.GetBackoff()
|
|
if backoff > 0 {
|
|
gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second)
|
|
} else {
|
|
gs.addBackoff(p, topic)
|
|
}
|
|
|
|
px := prune.GetPeers()
|
|
if len(px) > 0 {
|
|
// we ignore PX from peers with insufficient score
|
|
if score < gs.acceptPXThreshold {
|
|
log.Debugf("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]", p, score, topic)
|
|
continue
|
|
}
|
|
|
|
gs.pxConnect(px)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
|
|
gs.doAddBackoff(p, topic, GossipSubPruneBackoff)
|
|
}
|
|
|
|
func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
|
|
backoff, ok := gs.backoff[topic]
|
|
if !ok {
|
|
backoff = make(map[peer.ID]time.Time)
|
|
gs.backoff[topic] = backoff
|
|
}
|
|
expire := time.Now().Add(interval)
|
|
if backoff[p].Before(expire) {
|
|
backoff[p] = expire
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
|
|
if len(peers) > GossipSubPrunePeers {
|
|
shufflePeerInfo(peers)
|
|
peers = peers[:GossipSubPrunePeers]
|
|
}
|
|
|
|
toconnect := make([]connectInfo, 0, len(peers))
|
|
|
|
for _, pi := range peers {
|
|
p := peer.ID(pi.PeerID)
|
|
|
|
_, connected := gs.peers[p]
|
|
if connected {
|
|
continue
|
|
}
|
|
|
|
var spr *record.Envelope
|
|
if pi.SignedPeerRecord != nil {
|
|
// the peer sent us a signed record; ensure that it is valid
|
|
envelope, r, err := record.ConsumeEnvelope(pi.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
|
|
if err != nil {
|
|
log.Warnf("error unmarshalling peer record obtained through px: %s", err)
|
|
continue
|
|
}
|
|
rec, ok := r.(*peer.PeerRecord)
|
|
if !ok {
|
|
log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord")
|
|
continue
|
|
}
|
|
if rec.PeerID != p {
|
|
log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", rec.PeerID, p)
|
|
continue
|
|
}
|
|
spr = envelope
|
|
}
|
|
|
|
toconnect = append(toconnect, connectInfo{p, spr})
|
|
}
|
|
|
|
if len(toconnect) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, ci := range toconnect {
|
|
select {
|
|
case gs.connect <- ci:
|
|
default:
|
|
log.Debugf("ignoring peer connection attempt; too many pending connections")
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) connector() {
|
|
for {
|
|
select {
|
|
case ci := <-gs.connect:
|
|
if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
|
|
continue
|
|
}
|
|
|
|
log.Debugf("connecting to %s", ci.p)
|
|
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
|
|
if ok && ci.spr != nil {
|
|
_, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL)
|
|
if err != nil {
|
|
log.Debugf("error processing peer record: %s", err)
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout)
|
|
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
|
|
cancel()
|
|
if err != nil {
|
|
log.Debugf("error connecting to %s: %s", ci.p, err)
|
|
}
|
|
|
|
case <-gs.p.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) Publish(msg *Message) {
|
|
gs.mcache.Put(msg.Message)
|
|
from := msg.ReceivedFrom
|
|
|
|
tosend := make(map[peer.ID]struct{})
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
// any peers in the topic?
|
|
tmap, ok := gs.p.topics[topic]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if gs.floodPublish && from == gs.p.host.ID() {
|
|
for p := range tmap {
|
|
_, direct := gs.direct[p]
|
|
if direct || gs.score.Score(p) >= gs.publishThreshold {
|
|
tosend[p] = struct{}{}
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// direct peers
|
|
for p := range gs.direct {
|
|
_, inTopic := tmap[p]
|
|
if inTopic {
|
|
tosend[p] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// floodsub peers
|
|
for p := range tmap {
|
|
if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold {
|
|
tosend[p] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// gossipsub peers
|
|
gmap, ok := gs.mesh[topic]
|
|
if !ok {
|
|
// we are not in the mesh for topic, use fanout peers
|
|
gmap, ok = gs.fanout[topic]
|
|
if !ok || len(gmap) == 0 {
|
|
// we don't have any, pick some with score above the publish threshold
|
|
peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
|
|
return gs.score.Score(p) >= gs.publishThreshold
|
|
})
|
|
|
|
if len(peers) > 0 {
|
|
gmap = peerListToMap(peers)
|
|
gs.fanout[topic] = gmap
|
|
}
|
|
}
|
|
gs.lastpub[topic] = time.Now().UnixNano()
|
|
}
|
|
|
|
for p := range gmap {
|
|
tosend[p] = struct{}{}
|
|
}
|
|
}
|
|
|
|
out := rpcWithMessages(msg.Message)
|
|
for pid := range tosend {
|
|
if pid == from || pid == peer.ID(msg.GetFrom()) {
|
|
continue
|
|
}
|
|
|
|
gs.sendRPC(pid, out)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) Join(topic string) {
|
|
gmap, ok := gs.mesh[topic]
|
|
if ok {
|
|
return
|
|
}
|
|
|
|
log.Debugf("JOIN %s", topic)
|
|
gs.tracer.Join(topic)
|
|
|
|
gmap, ok = gs.fanout[topic]
|
|
if ok {
|
|
// these peers have a score above the publish threshold, which may be negative
|
|
// so drop the ones with a negative score
|
|
for p := range gmap {
|
|
if gs.score.Score(p) < 0 {
|
|
delete(gmap, p)
|
|
}
|
|
}
|
|
|
|
if len(gmap) < gs.D {
|
|
// we need more peers; eager, as this would get fixed in the next heartbeat
|
|
more := gs.getPeers(topic, gs.D-len(gmap), func(p peer.ID) bool {
|
|
// filter our current peers, direct peers, and peers with negative scores
|
|
_, inMesh := gmap[p]
|
|
_, direct := gs.direct[p]
|
|
return !inMesh && !direct && gs.score.Score(p) >= 0
|
|
})
|
|
for _, p := range more {
|
|
gmap[p] = struct{}{}
|
|
}
|
|
}
|
|
gs.mesh[topic] = gmap
|
|
delete(gs.fanout, topic)
|
|
delete(gs.lastpub, topic)
|
|
} else {
|
|
peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
|
|
// filter direct peers and peers with negative score
|
|
_, direct := gs.direct[p]
|
|
return !direct && gs.score.Score(p) >= 0
|
|
})
|
|
gmap = peerListToMap(peers)
|
|
gs.mesh[topic] = gmap
|
|
}
|
|
|
|
for p := range gmap {
|
|
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
|
|
gs.tracer.Graft(p, topic)
|
|
gs.sendGraft(p, topic)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) Leave(topic string) {
|
|
gmap, ok := gs.mesh[topic]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
log.Debugf("LEAVE %s", topic)
|
|
gs.tracer.Leave(topic)
|
|
|
|
delete(gs.mesh, topic)
|
|
|
|
for p := range gmap {
|
|
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
|
|
gs.tracer.Prune(p, topic)
|
|
gs.sendPrune(p, topic)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
|
|
graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}}
|
|
out := rpcWithControl(nil, nil, nil, graft, nil)
|
|
gs.sendRPC(p, out)
|
|
}
|
|
|
|
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
|
|
prune := []*pb.ControlPrune{gs.makePrune(p, topic, true)}
|
|
out := rpcWithControl(nil, nil, nil, nil, prune)
|
|
gs.sendRPC(p, out)
|
|
}
|
|
|
|
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
|
|
// do we own the RPC?
|
|
own := false
|
|
|
|
// piggyback control message retries
|
|
ctl, ok := gs.control[p]
|
|
if ok {
|
|
out = copyRPC(out)
|
|
own = true
|
|
gs.piggybackControl(p, out, ctl)
|
|
delete(gs.control, p)
|
|
}
|
|
|
|
// piggyback gossip
|
|
ihave, ok := gs.gossip[p]
|
|
if ok {
|
|
if !own {
|
|
out = copyRPC(out)
|
|
own = true
|
|
}
|
|
gs.piggybackGossip(p, out, ihave)
|
|
delete(gs.gossip, p)
|
|
}
|
|
|
|
mch, ok := gs.p.peers[p]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// If we're below the max message size, go ahead and send
|
|
if out.Size() < gs.p.maxMessageSize {
|
|
gs.doSendRPC(out, p, mch)
|
|
return
|
|
}
|
|
|
|
// If we're too big, fragment into multiple RPCs and send each sequentially
|
|
outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize)
|
|
if err != nil {
|
|
gs.doDropRPC(out, p, fmt.Sprintf("unable to fragment RPC: %s", err))
|
|
return
|
|
}
|
|
|
|
for _, rpc := range outRPCs {
|
|
gs.doSendRPC(rpc, p, mch)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
|
|
log.Warnf("dropping message to peer %s: %s", p.Pretty(), reason)
|
|
gs.tracer.DropRPC(rpc, p)
|
|
// push control messages that need to be retried
|
|
ctl := rpc.GetControl()
|
|
if ctl != nil {
|
|
gs.pushControl(p, ctl)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
|
|
select {
|
|
case mch <- rpc:
|
|
gs.tracer.SendRPC(rpc, p)
|
|
default:
|
|
gs.doDropRPC(rpc, p, "queue full")
|
|
}
|
|
}
|
|
|
|
func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
|
|
if rpc.Size() < limit {
|
|
return []*RPC{rpc}, nil
|
|
}
|
|
|
|
c := (rpc.Size() / limit) + 1
|
|
rpcs := make([]*RPC, 1, c)
|
|
rpcs[0] = &RPC{RPC: pb.RPC{}, from: rpc.from}
|
|
|
|
// outRPC returns the current RPC message if it will fit sizeToAdd more bytes
|
|
// otherwise, it will create a new RPC message and add it to the list.
|
|
// if withCtl is true, the returned message will have a non-nil empty Control message.
|
|
outRPC := func(sizeToAdd int, withCtl bool) *RPC {
|
|
current := rpcs[len(rpcs)-1]
|
|
// check if we can fit the new data, plus an extra byte for the protobuf field tag
|
|
if current.Size()+sizeToAdd+1 < limit {
|
|
if withCtl && current.Control == nil {
|
|
current.Control = &pb.ControlMessage{}
|
|
}
|
|
return current
|
|
}
|
|
var ctl *pb.ControlMessage
|
|
if withCtl {
|
|
ctl = &pb.ControlMessage{}
|
|
}
|
|
next := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
|
|
rpcs = append(rpcs, next)
|
|
return next
|
|
}
|
|
|
|
for _, msg := range rpc.GetPublish() {
|
|
s := msg.Size()
|
|
// if an individual message is too large, we can't fragment it and have to fail entirely
|
|
if s > limit {
|
|
return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit)
|
|
}
|
|
out := outRPC(s, false)
|
|
out.Publish = append(out.Publish, msg)
|
|
}
|
|
|
|
for _, sub := range rpc.GetSubscriptions() {
|
|
out := outRPC(sub.Size(), false)
|
|
out.Subscriptions = append(out.Subscriptions, sub)
|
|
}
|
|
|
|
ctl := rpc.GetControl()
|
|
if ctl == nil {
|
|
// if there were no control messages, we're done
|
|
return rpcs, nil
|
|
}
|
|
// if all the control messages fit into one RPC, we just add it to the end and return
|
|
ctlOut := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
|
|
if ctlOut.Size() < limit {
|
|
rpcs = append(rpcs, ctlOut)
|
|
return rpcs, nil
|
|
}
|
|
|
|
// we need to split up the control messages into multiple RPCs
|
|
for _, graft := range ctl.Graft {
|
|
out := outRPC(graft.Size(), true)
|
|
out.Control.Graft = append(out.Control.Graft, graft)
|
|
}
|
|
for _, prune := range ctl.Prune {
|
|
out := outRPC(prune.Size(), true)
|
|
out.Control.Prune = append(out.Control.Prune, prune)
|
|
}
|
|
|
|
// An individual IWANT or IHAVE message could be larger than the limit if we have
|
|
// a lot of message IDs. fragmentMessageIds will split them into buckets that
|
|
// fit within the limit, with some overhead for the control messages themselves
|
|
for _, iwant := range ctl.Iwant {
|
|
const protobufOverhead = 6
|
|
idBuckets := fragmentMessageIds(iwant.MessageIDs, limit-protobufOverhead)
|
|
for _, ids := range idBuckets {
|
|
iwant := &pb.ControlIWant{MessageIDs: ids}
|
|
out := outRPC(iwant.Size(), true)
|
|
out.Control.Iwant = append(out.Control.Iwant, iwant)
|
|
}
|
|
}
|
|
for _, ihave := range ctl.Ihave {
|
|
const protobufOverhead = 6
|
|
idBuckets := fragmentMessageIds(ihave.MessageIDs, limit-protobufOverhead)
|
|
for _, ids := range idBuckets {
|
|
ihave := &pb.ControlIHave{MessageIDs: ids}
|
|
out := outRPC(ihave.Size(), true)
|
|
out.Control.Ihave = append(out.Control.Ihave, ihave)
|
|
}
|
|
}
|
|
return rpcs, nil
|
|
}
|
|
|
|
func fragmentMessageIds(msgIds []string, limit int) [][]string {
|
|
// account for two bytes of protobuf overhead per array element
|
|
const protobufOverhead = 2
|
|
|
|
out := [][]string{{}}
|
|
var currentBucket int
|
|
var bucketLen int
|
|
for i := 0; i < len(msgIds); i++ {
|
|
size := len(msgIds[i]) + protobufOverhead
|
|
if size > limit {
|
|
// pathological case where a single message ID exceeds the limit.
|
|
log.Warnf("message ID length %d exceeds limit %d, removing from outgoing gossip", size, limit)
|
|
continue
|
|
}
|
|
bucketLen += size
|
|
if bucketLen > limit {
|
|
out = append(out, []string{})
|
|
currentBucket++
|
|
bucketLen = size
|
|
}
|
|
out[currentBucket] = append(out[currentBucket], msgIds[i])
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (gs *GossipSubRouter) heartbeatTimer() {
|
|
time.Sleep(GossipSubHeartbeatInitialDelay)
|
|
select {
|
|
case gs.p.eval <- gs.heartbeat:
|
|
case <-gs.p.ctx.Done():
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(GossipSubHeartbeatInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
select {
|
|
case gs.p.eval <- gs.heartbeat:
|
|
case <-gs.p.ctx.Done():
|
|
return
|
|
}
|
|
case <-gs.p.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) heartbeat() {
|
|
defer log.EventBegin(gs.p.ctx, "heartbeat").Done()
|
|
|
|
gs.heartbeatTicks++
|
|
|
|
tograft := make(map[peer.ID][]string)
|
|
toprune := make(map[peer.ID][]string)
|
|
noPX := make(map[peer.ID]bool)
|
|
|
|
// clean up expired backoffs
|
|
gs.clearBackoff()
|
|
|
|
// clean up iasked counters
|
|
gs.clearIHaveCounters()
|
|
|
|
// apply IWANT request penalties
|
|
gs.applyIwantPenalties()
|
|
|
|
// ensure direct peers are connected
|
|
gs.directConnect()
|
|
|
|
// cache scores throughout the heartbeat
|
|
scores := make(map[peer.ID]float64)
|
|
score := func(p peer.ID) float64 {
|
|
s, ok := scores[p]
|
|
if !ok {
|
|
s = gs.score.Score(p)
|
|
scores[p] = s
|
|
}
|
|
return s
|
|
}
|
|
|
|
// maintain the mesh for topics we have joined
|
|
for topic, peers := range gs.mesh {
|
|
prunePeer := func(p peer.ID) {
|
|
gs.tracer.Prune(p, topic)
|
|
delete(peers, p)
|
|
gs.addBackoff(p, topic)
|
|
topics := toprune[p]
|
|
toprune[p] = append(topics, topic)
|
|
}
|
|
|
|
graftPeer := func(p peer.ID) {
|
|
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
|
|
gs.tracer.Graft(p, topic)
|
|
peers[p] = struct{}{}
|
|
topics := tograft[p]
|
|
tograft[p] = append(topics, topic)
|
|
}
|
|
|
|
// drop all peers with negative score, without PX
|
|
for p := range peers {
|
|
if score(p) < 0 {
|
|
log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, score(p), topic)
|
|
prunePeer(p)
|
|
noPX[p] = true
|
|
}
|
|
}
|
|
|
|
// do we have enough peers?
|
|
if l := len(peers); l < gs.Dlo {
|
|
backoff := gs.backoff[topic]
|
|
ineed := gs.D - l
|
|
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
|
// filter our current and direct peers, peers we are backing off, and peers with negative score
|
|
_, inMesh := peers[p]
|
|
_, doBackoff := backoff[p]
|
|
_, direct := gs.direct[p]
|
|
return !inMesh && !doBackoff && !direct && score(p) >= 0
|
|
})
|
|
|
|
for _, p := range plst {
|
|
graftPeer(p)
|
|
}
|
|
}
|
|
|
|
// do we have too many peers?
|
|
if len(peers) > gs.Dhi {
|
|
plst := peerMapToList(peers)
|
|
|
|
// sort by score (but shuffle first for the case we don't use the score)
|
|
shufflePeers(plst)
|
|
sort.Slice(plst, func(i, j int) bool {
|
|
return score(plst[i]) > score(plst[j])
|
|
})
|
|
|
|
// We keep the first D_score peers by score and the remaining up to D randomly
|
|
// under the constraint that we keep D_out peers in the mesh (if we have that many)
|
|
shufflePeers(plst[gs.Dscore:])
|
|
|
|
// count the outbound peers we are keeping
|
|
outbound := 0
|
|
for _, p := range plst[:gs.D] {
|
|
if gs.outbound[p] {
|
|
outbound++
|
|
}
|
|
}
|
|
|
|
// if it's less than D_out, bubble up some outbound peers from the random selection
|
|
if outbound < gs.Dout {
|
|
rotate := func(i int) {
|
|
// rotate the plst to the right and put the ith peer in the front
|
|
p := plst[i]
|
|
for j := i; j > 0; j-- {
|
|
plst[j] = plst[j-1]
|
|
}
|
|
plst[0] = p
|
|
}
|
|
|
|
// first bubble up all outbound peers already in the selection to the front
|
|
if outbound > 0 {
|
|
ihave := outbound
|
|
for i := 1; i < gs.D && ihave > 0; i++ {
|
|
p := plst[i]
|
|
if gs.outbound[p] {
|
|
rotate(i)
|
|
ihave--
|
|
}
|
|
}
|
|
}
|
|
|
|
// now bubble up enough outbound peers outside the selection to the front
|
|
ineed := gs.Dout - outbound
|
|
for i := gs.D; i < len(plst) && ineed > 0; i++ {
|
|
p := plst[i]
|
|
if gs.outbound[p] {
|
|
rotate(i)
|
|
ineed--
|
|
}
|
|
}
|
|
}
|
|
|
|
// prune the excess peers
|
|
for _, p := range plst[gs.D:] {
|
|
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
|
prunePeer(p)
|
|
}
|
|
}
|
|
|
|
// do we have enough outboud peers?
|
|
if len(peers) >= gs.Dlo {
|
|
// count the outbound peers we have
|
|
outbound := 0
|
|
for p := range peers {
|
|
if gs.outbound[p] {
|
|
outbound++
|
|
}
|
|
}
|
|
|
|
// if it's less than D_out, select some peers with outbound connections and graft them
|
|
if outbound < gs.Dout {
|
|
ineed := gs.Dout - outbound
|
|
backoff := gs.backoff[topic]
|
|
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
|
// filter our current and direct peers, peers we are backing off, and peers with negative score
|
|
_, inMesh := peers[p]
|
|
_, doBackoff := backoff[p]
|
|
_, direct := gs.direct[p]
|
|
return !inMesh && !doBackoff && !direct && gs.outbound[p] && score(p) >= 0
|
|
})
|
|
|
|
for _, p := range plst {
|
|
graftPeer(p)
|
|
}
|
|
}
|
|
}
|
|
|
|
// should we try to improve the mesh with opportunistic grafting?
|
|
if gs.heartbeatTicks%gs.opportunisticGraftTicks == 0 && len(peers) > 1 {
|
|
// Opportunistic grafting works as follows: we check the median score of peers in the
|
|
// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
|
|
// random with score over the median.
|
|
// The intention is to (slowly) improve an underperforming mesh by introducing good
|
|
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
|
|
// situations where we are stuck with poor peers and also recover from churn of good peers.
|
|
|
|
// now compute the median peer score in the mesh
|
|
plst := peerMapToList(peers)
|
|
sort.Slice(plst, func(i, j int) bool {
|
|
return score(plst[i]) < score(plst[j])
|
|
})
|
|
medianIndex := len(peers) / 2
|
|
medianScore := scores[plst[medianIndex]]
|
|
|
|
// if the median score is below the threshold, select a better peer (if any) and GRAFT
|
|
if medianScore < gs.opportunisticGraftThreshold {
|
|
backoff := gs.backoff[topic]
|
|
plst = gs.getPeers(topic, GossipSubOpportunisticGraftPeers, func(p peer.ID) bool {
|
|
_, inMesh := peers[p]
|
|
_, doBackoff := backoff[p]
|
|
_, direct := gs.direct[p]
|
|
return !inMesh && !doBackoff && !direct && score(p) > medianScore
|
|
})
|
|
|
|
for _, p := range plst {
|
|
log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", p, topic)
|
|
graftPeer(p)
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2nd arg are mesh peers excluded from gossip. We already push
|
|
// messages to them, so its redundant to gossip IHAVEs.
|
|
gs.emitGossip(topic, peers)
|
|
}
|
|
|
|
// expire fanout for topics we haven't published to in a while
|
|
now := time.Now().UnixNano()
|
|
for topic, lastpub := range gs.lastpub {
|
|
if lastpub+int64(gs.fanoutTTL) < now {
|
|
delete(gs.fanout, topic)
|
|
delete(gs.lastpub, topic)
|
|
}
|
|
}
|
|
|
|
// maintain our fanout for topics we are publishing but we have not joined
|
|
for topic, peers := range gs.fanout {
|
|
// check whether our peers are still in the topic and have a score above the publish threshold
|
|
for p := range peers {
|
|
_, ok := gs.p.topics[topic][p]
|
|
if !ok || score(p) < gs.publishThreshold {
|
|
delete(peers, p)
|
|
}
|
|
}
|
|
|
|
// do we need more peers?
|
|
if len(peers) < gs.D {
|
|
ineed := gs.D - len(peers)
|
|
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
|
// filter our current and direct peers and peers with score above the publish threshold
|
|
_, inFanout := peers[p]
|
|
_, direct := gs.direct[p]
|
|
return !inFanout && !direct && score(p) >= gs.publishThreshold
|
|
})
|
|
|
|
for _, p := range plst {
|
|
peers[p] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// 2nd arg are fanout peers excluded from gossip. We already push
|
|
// messages to them, so its redundant to gossip IHAVEs.
|
|
gs.emitGossip(topic, peers)
|
|
}
|
|
|
|
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
|
|
gs.sendGraftPrune(tograft, toprune, noPX)
|
|
|
|
// flush all pending gossip that wasn't piggybacked above
|
|
gs.flush()
|
|
|
|
// advance the message history window
|
|
gs.mcache.Shift()
|
|
}
|
|
|
|
func (gs *GossipSubRouter) clearIHaveCounters() {
|
|
if len(gs.peerhave) > 0 {
|
|
// throw away the old map and make a new one
|
|
gs.peerhave = make(map[peer.ID]int)
|
|
}
|
|
|
|
if len(gs.iasked) > 0 {
|
|
// throw away the old map and make a new one
|
|
gs.iasked = make(map[peer.ID]int)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) applyIwantPenalties() {
|
|
for p, count := range gs.gossipTracer.GetBrokenPromises() {
|
|
log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count)
|
|
gs.score.AddPenalty(p, count)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) clearBackoff() {
|
|
// we only clear once every 15 ticks to avoid iterating over the map(s) too much
|
|
if gs.heartbeatTicks%15 != 0 {
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
for topic, backoff := range gs.backoff {
|
|
for p, expire := range backoff {
|
|
if expire.Before(now) {
|
|
delete(backoff, p)
|
|
}
|
|
}
|
|
if len(backoff) == 0 {
|
|
delete(gs.backoff, topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) directConnect() {
|
|
// we donly do this every some ticks to allow pending connections to complete and account
|
|
// for restarts/downtime
|
|
if gs.heartbeatTicks%gs.directConnectTicks != 0 {
|
|
return
|
|
}
|
|
|
|
var toconnect []peer.ID
|
|
for p := range gs.direct {
|
|
_, connected := gs.peers[p]
|
|
if !connected {
|
|
toconnect = append(toconnect, p)
|
|
}
|
|
}
|
|
|
|
if len(toconnect) > 0 {
|
|
go func() {
|
|
for _, p := range toconnect {
|
|
gs.connect <- connectInfo{p: p}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
|
|
for p, topics := range tograft {
|
|
graft := make([]*pb.ControlGraft, 0, len(topics))
|
|
for _, topic := range topics {
|
|
graft = append(graft, &pb.ControlGraft{TopicID: &topic})
|
|
}
|
|
|
|
var prune []*pb.ControlPrune
|
|
pruning, ok := toprune[p]
|
|
if ok {
|
|
delete(toprune, p)
|
|
prune = make([]*pb.ControlPrune, 0, len(pruning))
|
|
for _, topic := range pruning {
|
|
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
|
|
}
|
|
}
|
|
|
|
out := rpcWithControl(nil, nil, nil, graft, prune)
|
|
gs.sendRPC(p, out)
|
|
}
|
|
|
|
for p, topics := range toprune {
|
|
prune := make([]*pb.ControlPrune, 0, len(topics))
|
|
for _, topic := range topics {
|
|
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
|
|
}
|
|
|
|
out := rpcWithControl(nil, nil, nil, nil, prune)
|
|
gs.sendRPC(p, out)
|
|
}
|
|
|
|
}
|
|
|
|
// emitGossip emits IHAVE gossip advertising items in the message cache window
|
|
// of this topic.
|
|
func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}) {
|
|
mids := gs.mcache.GetGossipIDs(topic)
|
|
if len(mids) == 0 {
|
|
return
|
|
}
|
|
|
|
// shuffle to emit in random order
|
|
shuffleStrings(mids)
|
|
|
|
// if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list
|
|
if len(mids) > GossipSubMaxIHaveLength {
|
|
// we do the truncation (with shuffling) per peer below
|
|
log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len(mids))
|
|
}
|
|
|
|
// Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy.
|
|
// First we collect the peers above gossipThreshold that are not in the exclude set
|
|
// and then randomly select from that set.
|
|
// We also exclude direct peers, as there is no reason to emit gossip to them.
|
|
peers := make([]peer.ID, 0, len(gs.p.topics[topic]))
|
|
for p := range gs.p.topics[topic] {
|
|
_, inExclude := exclude[p]
|
|
_, direct := gs.direct[p]
|
|
if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold {
|
|
peers = append(peers, p)
|
|
}
|
|
}
|
|
|
|
target := gs.Dlazy
|
|
factor := int(GossipSubGossipFactor * float64(len(peers)))
|
|
if factor > target {
|
|
target = factor
|
|
}
|
|
|
|
if target > len(peers) {
|
|
target = len(peers)
|
|
} else {
|
|
shufflePeers(peers)
|
|
}
|
|
peers = peers[:target]
|
|
|
|
// Emit the IHAVE gossip to the selected peers.
|
|
for _, p := range peers {
|
|
peerMids := mids
|
|
if len(mids) > GossipSubMaxIHaveLength {
|
|
// we do this per peer so that we emit a different set for each peer.
|
|
// we have enough redundancy in the system that this will significantly increase the message
|
|
// coverage when we do truncate.
|
|
peerMids = make([]string, GossipSubMaxIHaveLength)
|
|
shuffleStrings(mids)
|
|
copy(peerMids, mids)
|
|
}
|
|
gs.enqueueGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: peerMids})
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) flush() {
|
|
// send gossip first, which will also piggyback pending control
|
|
for p, ihave := range gs.gossip {
|
|
delete(gs.gossip, p)
|
|
out := rpcWithControl(nil, ihave, nil, nil, nil)
|
|
gs.sendRPC(p, out)
|
|
}
|
|
|
|
// send the remaining control messages that wasn't merged with gossip
|
|
for p, ctl := range gs.control {
|
|
delete(gs.control, p)
|
|
out := rpcWithControl(nil, nil, nil, ctl.Graft, ctl.Prune)
|
|
gs.sendRPC(p, out)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) enqueueGossip(p peer.ID, ihave *pb.ControlIHave) {
|
|
gossip := gs.gossip[p]
|
|
gossip = append(gossip, ihave)
|
|
gs.gossip[p] = gossip
|
|
}
|
|
|
|
func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) {
|
|
ctl := out.GetControl()
|
|
if ctl == nil {
|
|
ctl = &pb.ControlMessage{}
|
|
out.Control = ctl
|
|
}
|
|
|
|
ctl.Ihave = ihave
|
|
}
|
|
|
|
func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
|
|
// remove IHAVE/IWANT from control message, gossip is not retried
|
|
ctl.Ihave = nil
|
|
ctl.Iwant = nil
|
|
if ctl.Graft != nil || ctl.Prune != nil {
|
|
gs.control[p] = ctl
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) {
|
|
// check control message for staleness first
|
|
var tograft []*pb.ControlGraft
|
|
var toprune []*pb.ControlPrune
|
|
|
|
for _, graft := range ctl.GetGraft() {
|
|
topic := graft.GetTopicID()
|
|
peers, ok := gs.mesh[topic]
|
|
if !ok {
|
|
continue
|
|
}
|
|
_, ok = peers[p]
|
|
if ok {
|
|
tograft = append(tograft, graft)
|
|
}
|
|
}
|
|
|
|
for _, prune := range ctl.GetPrune() {
|
|
topic := prune.GetTopicID()
|
|
peers, ok := gs.mesh[topic]
|
|
if !ok {
|
|
toprune = append(toprune, prune)
|
|
continue
|
|
}
|
|
_, ok = peers[p]
|
|
if !ok {
|
|
toprune = append(toprune, prune)
|
|
}
|
|
}
|
|
|
|
if len(tograft) == 0 && len(toprune) == 0 {
|
|
return
|
|
}
|
|
|
|
xctl := out.Control
|
|
if xctl == nil {
|
|
xctl = &pb.ControlMessage{}
|
|
out.Control = xctl
|
|
}
|
|
|
|
if len(tograft) > 0 {
|
|
xctl.Graft = append(xctl.Graft, tograft...)
|
|
}
|
|
if len(toprune) > 0 {
|
|
xctl.Prune = append(xctl.Prune, toprune...)
|
|
}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
|
|
if gs.peers[p] == GossipSubID_v10 {
|
|
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
|
|
return &pb.ControlPrune{TopicID: &topic}
|
|
}
|
|
|
|
backoff := uint64(GossipSubPruneBackoff / time.Second)
|
|
var px []*pb.PeerInfo
|
|
if doPX {
|
|
// select peers for Peer eXchange
|
|
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
|
|
return p != xp && gs.score.Score(xp) >= 0
|
|
})
|
|
|
|
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
|
|
px = make([]*pb.PeerInfo, 0, len(peers))
|
|
for _, p := range peers {
|
|
// see if we have a signed peer record to send back; if we don't, just send
|
|
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
|
|
// unsigned address records through px anyway.
|
|
var recordBytes []byte
|
|
if ok {
|
|
spr := cab.GetPeerRecord(p)
|
|
var err error
|
|
if spr != nil {
|
|
recordBytes, err = spr.Marshal()
|
|
if err != nil {
|
|
log.Warnf("error marshaling signed peer record for %s: %s", p, err)
|
|
}
|
|
}
|
|
}
|
|
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes})
|
|
}
|
|
}
|
|
|
|
return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff}
|
|
}
|
|
|
|
func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
|
|
tmap, ok := gs.p.topics[topic]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
peers := make([]peer.ID, 0, len(tmap))
|
|
for p := range tmap {
|
|
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) {
|
|
peers = append(peers, p)
|
|
}
|
|
}
|
|
|
|
shufflePeers(peers)
|
|
|
|
if count > 0 && len(peers) > count {
|
|
peers = peers[:count]
|
|
}
|
|
|
|
return peers
|
|
}
|
|
|
|
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
|
|
pmap := make(map[peer.ID]struct{})
|
|
for _, p := range peers {
|
|
pmap[p] = struct{}{}
|
|
}
|
|
return pmap
|
|
}
|
|
|
|
func peerMapToList(peers map[peer.ID]struct{}) []peer.ID {
|
|
plst := make([]peer.ID, 0, len(peers))
|
|
for p := range peers {
|
|
plst = append(plst, p)
|
|
}
|
|
return plst
|
|
}
|
|
|
|
func shufflePeers(peers []peer.ID) {
|
|
for i := range peers {
|
|
j := rand.Intn(i + 1)
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
}
|
|
}
|
|
|
|
func shufflePeerInfo(peers []*pb.PeerInfo) {
|
|
for i := range peers {
|
|
j := rand.Intn(i + 1)
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
}
|
|
}
|
|
|
|
func shuffleStrings(lst []string) {
|
|
for i := range lst {
|
|
j := rand.Intn(i + 1)
|
|
lst[i], lst[j] = lst[j], lst[i]
|
|
}
|
|
}
|