mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-05-02 06:53:53 +00:00
add support for custom gossipsub protocols and feature tests
This commit is contained in:
parent
2ed84f4515
commit
0e387d79fb
18
gossipsub.go
18
gossipsub.go
@ -179,6 +179,9 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
|||||||
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
||||||
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
||||||
|
|
||||||
|
protos: GossipSubDefaultProtocols,
|
||||||
|
feature: GossipSubDefaultFeatures,
|
||||||
|
|
||||||
// these are configured per router to allow variation in tests
|
// these are configured per router to allow variation in tests
|
||||||
D: GossipSubD,
|
D: GossipSubD,
|
||||||
Dlo: GossipSubDlo,
|
Dlo: GossipSubDlo,
|
||||||
@ -342,6 +345,9 @@ type GossipSubRouter struct {
|
|||||||
backoff map[string]map[peer.ID]time.Time // prune backoff
|
backoff map[string]map[peer.ID]time.Time // prune backoff
|
||||||
connect chan connectInfo // px connection requests
|
connect chan connectInfo // px connection requests
|
||||||
|
|
||||||
|
protos []protocol.ID
|
||||||
|
feature GossipSubFeatureTest
|
||||||
|
|
||||||
mcache *MessageCache
|
mcache *MessageCache
|
||||||
tracer *pubsubTracer
|
tracer *pubsubTracer
|
||||||
score *peerScore
|
score *peerScore
|
||||||
@ -399,7 +405,7 @@ type connectInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) Protocols() []protocol.ID {
|
func (gs *GossipSubRouter) Protocols() []protocol.ID {
|
||||||
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
|
return gs.protos
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) Attach(p *PubSub) {
|
func (gs *GossipSubRouter) Attach(p *PubSub) {
|
||||||
@ -493,7 +499,7 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|||||||
fsPeers, gsPeers := 0, 0
|
fsPeers, gsPeers := 0, 0
|
||||||
// floodsub peers
|
// floodsub peers
|
||||||
for p := range tmap {
|
for p := range tmap {
|
||||||
if gs.peers[p] == FloodSubID {
|
if !gs.feature(GossipSubFeatureMesh, gs.peers[p]) {
|
||||||
fsPeers++
|
fsPeers++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -905,7 +911,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
|
|||||||
|
|
||||||
// floodsub peers
|
// floodsub peers
|
||||||
for p := range tmap {
|
for p := range tmap {
|
||||||
if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold {
|
if !gs.feature(GossipSubFeatureMesh, gs.peers[p]) && gs.score.Score(p) >= gs.publishThreshold {
|
||||||
tosend[p] = struct{}{}
|
tosend[p] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1615,7 +1621,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
|
|||||||
for p := range gs.p.topics[topic] {
|
for p := range gs.p.topics[topic] {
|
||||||
_, inExclude := exclude[p]
|
_, inExclude := exclude[p]
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold {
|
if !inExclude && !direct && gs.feature(GossipSubFeatureMesh, gs.peers[p]) && gs.score.Score(p) >= gs.gossipThreshold {
|
||||||
peers = append(peers, p)
|
peers = append(peers, p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1738,7 +1744,7 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
|
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
|
||||||
if gs.peers[p] == GossipSubID_v10 {
|
if !gs.feature(GossipSubFeaturePX, gs.peers[p]) {
|
||||||
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
|
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
|
||||||
return &pb.ControlPrune{TopicID: &topic}
|
return &pb.ControlPrune{TopicID: &topic}
|
||||||
}
|
}
|
||||||
@ -1783,7 +1789,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
|
|||||||
|
|
||||||
peers := make([]peer.ID, 0, len(tmap))
|
peers := make([]peer.ID, 0, len(tmap))
|
||||||
for p := range tmap {
|
for p := range tmap {
|
||||||
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) {
|
if gs.feature(GossipSubFeatureMesh, gs.peers[p]) && filter(p) {
|
||||||
peers = append(peers, p)
|
peers = append(peers, p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
52
gossipsub_feat.go
Normal file
52
gossipsub_feat.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package pubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GossipSubFeatureTest is a feature test function; it takes a feature and a protocol ID and
|
||||||
|
// should return true if the feature is supported by the protocol
|
||||||
|
type GossipSubFeatureTest = func(GossipSubFeature, protocol.ID) bool
|
||||||
|
|
||||||
|
// GossipSubFeature is a feature discriminant enum
|
||||||
|
type GossipSubFeature int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Protocol supports basic GossipSub Mesh -- gossipsub-v1.0 compatible
|
||||||
|
GossipSubFeatureMesh = iota
|
||||||
|
// Protocol supports Peer eXchange on prune -- gossipsub-v1.1 compatible
|
||||||
|
GossipSubFeaturePX
|
||||||
|
)
|
||||||
|
|
||||||
|
// GossipSubDefaultProtocols is the default gossipsub router protocol list
|
||||||
|
var GossipSubDefaultProtocols = []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
|
||||||
|
|
||||||
|
// GossipSubDefaultFeatures is the feature test function for the default gossipsub protocols
|
||||||
|
func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool {
|
||||||
|
switch feat {
|
||||||
|
case GossipSubFeatureMesh:
|
||||||
|
return proto == GossipSubID_v11 || proto == GossipSubID_v10
|
||||||
|
case GossipSubFeaturePX:
|
||||||
|
return proto == GossipSubID_v11
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithGossipSubProtocols is a gossipsub router option that configures a custom protocol list
|
||||||
|
// and feature test function
|
||||||
|
func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option {
|
||||||
|
return func(ps *PubSub) error {
|
||||||
|
gs, ok := ps.rt.(*GossipSubRouter)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("pubsub router is not gossipsub")
|
||||||
|
}
|
||||||
|
|
||||||
|
gs.protos = protos
|
||||||
|
gs.feature = feature
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user