implement flood publishing option

This commit is contained in:
vyzo 2020-03-03 15:31:16 +02:00
parent dc6af4bbe6
commit 4f6ca1b1b7
1 changed files with 54 additions and 8 deletions

View File

@ -74,7 +74,10 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
} }
// WithPeerScore is a gossipsub router option that enables peer scoring. // WithPeerScore is a gossipsub router option that enables peer scoring.
func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option { // gossipThreshold is the score threshold below which gossip propagation is supressed.
// publishThreshold is the score threshold below which we shouldn't publish when using flood
// publishing (also applies to fanout and floodsub peers).
func WithPeerScore(params *PeerScoreParams, gossipThreshold float64, publishThreshold float64) Option {
return func(ps *PubSub) error { return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter) gs, ok := ps.rt.(*GossipSubRouter)
if !ok { if !ok {
@ -83,6 +86,7 @@ func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option {
gs.score = newPeerScore(gs, params) gs.score = newPeerScore(gs, params)
gs.gossipThreshold = gossipThreshold gs.gossipThreshold = gossipThreshold
gs.publishThreshold = publishThreshold
// hook the tracer // hook the tracer
if ps.tracer != nil { if ps.tracer != nil {
@ -95,6 +99,22 @@ func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option {
} }
} }
// WithFloodPublish is a gossipsub router option that enables flood publishing.
// When this is enabled, published messages are forwarded to all peers with score >=
// to publishThreshold
func WithFloodPublish(floodPublish bool) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.floodPublish = floodPublish
return nil
}
}
// GossipSubRouter is a router that implements the gossipsub protocol. // GossipSubRouter is a router that implements the gossipsub protocol.
// For each topic we have joined, we maintain an overlay through which // For each topic we have joined, we maintain an overlay through which
// messages flow; this is the mesh map. // messages flow; this is the mesh map.
@ -120,6 +140,13 @@ type GossipSubRouter struct {
// If the peer score is below this threshold, we won't emit or accept gossip from the peer. // If the peer score is below this threshold, we won't emit or accept gossip from the peer.
// When there is no score, this value is 0. // When there is no score, this value is 0.
gossipThreshold float64 gossipThreshold float64
// flood publish score threshold; we only publish to peers with score >= to the threshold
// when using flood publishing or the peer is a fanout or floodsub peer.
publishThreshold float64
// whether to use flood publishing
floodPublish bool
} }
type connectInfo struct { type connectInfo struct {
@ -460,9 +487,18 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue continue
} }
if gs.floodPublish {
for p := range tmap {
if gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
}
continue
}
// floodsub peers // floodsub peers
for p := range tmap { for p := range tmap {
if gs.peers[p] == FloodSubID { if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{} tosend[p] = struct{}{}
} }
} }
@ -473,8 +509,10 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
// we are not in the mesh for topic, use fanout peers // we are not in the mesh for topic, use fanout peers
gmap, ok = gs.fanout[topic] gmap, ok = gs.fanout[topic]
if !ok || len(gmap) == 0 { if !ok || len(gmap) == 0 {
// we don't have any, pick some // we don't have any, pick some with score above the publish threshold
peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) peers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool {
return gs.score.Score(p) >= gs.publishThreshold
})
if len(peers) > 0 { if len(peers) > 0 {
gmap = peerListToMap(peers) gmap = peerListToMap(peers)
@ -510,6 +548,14 @@ func (gs *GossipSubRouter) Join(topic string) {
gmap, ok = gs.fanout[topic] gmap, ok = gs.fanout[topic]
if ok { if ok {
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
for p := range gmap {
if gs.score.Score(p) < 0 {
delete(gmap, p)
}
}
if len(gmap) < GossipSubD { if len(gmap) < GossipSubD {
// we need more peers; eager, as this would get fixed in the next heartbeat // we need more peers; eager, as this would get fixed in the next heartbeat
more := gs.getPeers(topic, GossipSubD-len(gmap), func(p peer.ID) bool { more := gs.getPeers(topic, GossipSubD-len(gmap), func(p peer.ID) bool {
@ -737,11 +783,11 @@ func (gs *GossipSubRouter) heartbeat() {
// maintain our fanout for topics we are publishing but we have not joined // maintain our fanout for topics we are publishing but we have not joined
for topic, peers := range gs.fanout { for topic, peers := range gs.fanout {
// check whether our peers are still in the topic and don't have a negative score // check whether our peers are still in the topic and have a score above the publish threshold
for p := range peers { for p := range peers {
_, ok := gs.p.topics[topic][p] _, ok := gs.p.topics[topic][p]
score := gs.score.Score(p) score := gs.score.Score(p)
if !ok || score < 0 { if !ok || score < gs.publishThreshold {
delete(peers, p) delete(peers, p)
} }
} }
@ -750,10 +796,10 @@ func (gs *GossipSubRouter) heartbeat() {
if len(peers) < GossipSubD { if len(peers) < GossipSubD {
ineed := GossipSubD - len(peers) ineed := GossipSubD - len(peers)
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current peers and peers with negative score // filter our current peers and peers with score above the publish threshold
_, ok := peers[p] _, ok := peers[p]
score := gs.score.Score(p) score := gs.score.Score(p)
return !ok && score >= 0 return !ok && score >= gs.publishThreshold
}) })
for _, p := range plst { for _, p := range plst {