fix: allow specifying custom protocols in GossipSub

This commit is contained in:
Richard Ramos 2021-03-15 16:09:07 -04:00
parent 6c1addf493
commit be77c152f7
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F

View File

@ -164,20 +164,21 @@ var (
)
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
func NewGossipSub(ctx context.Context, h host.Host, customProtocols []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &GossipSubRouter{
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
customProtocols: customProtocols,
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
// these are configured per router to allow variation in tests
D: GossipSubD,
@ -328,19 +329,20 @@ func WithDirectConnectTicks(t uint64) Option {
// is the fanout map. Fanout peer lists are expired if we don't publish any
// messages to their topic for GossipSubFanoutTTL.
type GossipSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
p *PubSub
customProtocols []protocol.ID
peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
mcache *MessageCache
tracer *pubsubTracer
@ -399,7 +401,7 @@ type connectInfo struct {
}
func (gs *GossipSubRouter) Protocols() []protocol.ID {
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
return append([]protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}, gs.customProtocols...)
}
func (gs *GossipSubRouter) Attach(p *PubSub) {