diff --git a/gossipsub.go b/gossipsub.go index d398c8d..a1c1d26 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -74,7 +74,10 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er } // WithPeerScore is a gossipsub router option that enables peer scoring. -func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option { +// gossipThreshold is the score threshold below which gossip propagation is supressed. +// publishThreshold is the score threshold below which we shouldn't publish when using flood +// publishing (also applies to fanout and floodsub peers). +func WithPeerScore(params *PeerScoreParams, gossipThreshold float64, publishThreshold float64) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { @@ -83,6 +86,7 @@ func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option { gs.score = newPeerScore(gs, params) gs.gossipThreshold = gossipThreshold + gs.publishThreshold = publishThreshold // hook the tracer if ps.tracer != nil { @@ -95,6 +99,22 @@ func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option { } } +// WithFloodPublish is a gossipsub router option that enables flood publishing. +// When this is enabled, published messages are forwarded to all peers with score >= +// to publishThreshold +func WithFloodPublish(floodPublish bool) Option { + return func(ps *PubSub) error { + gs, ok := ps.rt.(*GossipSubRouter) + if !ok { + return fmt.Errorf("pubsub router is not gossipsub") + } + + gs.floodPublish = floodPublish + + return nil + } +} + // GossipSubRouter is a router that implements the gossipsub protocol. // For each topic we have joined, we maintain an overlay through which // messages flow; this is the mesh map. @@ -120,6 +140,13 @@ type GossipSubRouter struct { // If the peer score is below this threshold, we won't emit or accept gossip from the peer. // When there is no score, this value is 0. gossipThreshold float64 + + // flood publish score threshold; we only publish to peers with score >= to the threshold + // when using flood publishing or the peer is a fanout or floodsub peer. + publishThreshold float64 + + // whether to use flood publishing + floodPublish bool } type connectInfo struct { @@ -460,9 +487,18 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } + if gs.floodPublish { + for p := range tmap { + if gs.score.Score(p) >= gs.publishThreshold { + tosend[p] = struct{}{} + } + } + continue + } + // floodsub peers for p := range tmap { - if gs.peers[p] == FloodSubID { + if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold { tosend[p] = struct{}{} } } @@ -473,8 +509,10 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { // we are not in the mesh for topic, use fanout peers gmap, ok = gs.fanout[topic] if !ok || len(gmap) == 0 { - // we don't have any, pick some - peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + // we don't have any, pick some with score above the publish threshold + peers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool { + return gs.score.Score(p) >= gs.publishThreshold + }) if len(peers) > 0 { gmap = peerListToMap(peers) @@ -510,6 +548,14 @@ func (gs *GossipSubRouter) Join(topic string) { gmap, ok = gs.fanout[topic] if ok { + // these peers have a score above the publish threshold, which may be negative + // so drop the ones with a negative score + for p := range gmap { + if gs.score.Score(p) < 0 { + delete(gmap, p) + } + } + if len(gmap) < GossipSubD { // we need more peers; eager, as this would get fixed in the next heartbeat more := gs.getPeers(topic, GossipSubD-len(gmap), func(p peer.ID) bool { @@ -737,11 +783,11 @@ func (gs *GossipSubRouter) heartbeat() { // maintain our fanout for topics we are publishing but we have not joined for topic, peers := range gs.fanout { - // check whether our peers are still in the topic and don't have a negative score + // check whether our peers are still in the topic and have a score above the publish threshold for p := range peers { _, ok := gs.p.topics[topic][p] score := gs.score.Score(p) - if !ok || score < 0 { + if !ok || score < gs.publishThreshold { delete(peers, p) } } @@ -750,10 +796,10 @@ func (gs *GossipSubRouter) heartbeat() { if len(peers) < GossipSubD { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - // filter our current peers and peers with negative score + // filter our current peers and peers with score above the publish threshold _, ok := peers[p] score := gs.score.Score(p) - return !ok && score >= 0 + return !ok && score >= gs.publishThreshold }) for _, p := range plst {