more docs for gossipsub router, expire fanout peers when we haven't published in a while
This commit is contained in:
parent
b490d117f2
commit
1dc8405449
24
gossipsub.go
24
gossipsub.go
|
@ -28,13 +28,18 @@ var (
|
|||
|
||||
// heartbeat interval
|
||||
GossipSubHeartbeatInterval = 1 * time.Second
|
||||
|
||||
// fanout ttl
|
||||
GossipSubFanoutTTL = 60 * time.Second
|
||||
)
|
||||
|
||||
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router
|
||||
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||
rt := &GossipSubRouter{
|
||||
peers: make(map[peer.ID]protocol.ID),
|
||||
mesh: make(map[string]map[peer.ID]struct{}),
|
||||
fanout: make(map[string]map[peer.ID]struct{}),
|
||||
lastpub: make(map[string]int64),
|
||||
gossip: make(map[peer.ID][]*pb.ControlIHave),
|
||||
control: make(map[peer.ID]*pb.ControlMessage),
|
||||
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
||||
|
@ -42,11 +47,19 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
|||
return NewPubSub(ctx, h, rt, opts...)
|
||||
}
|
||||
|
||||
// GossipSubRouter is a router that implements the gossipsub protocol.
|
||||
// For each topic we have joined, we maintain an overlay through which
|
||||
// messages flow; this is the mesh map.
|
||||
// For each topic we publish to without joining, we maintain a list of peers
|
||||
// to use for injecting our messages in the overlay with stable routes; this
|
||||
// is the fanout map. Fanout peer lists are expired if we don't publish any
|
||||
// messages to their topic for GossipSubFanoutTTL.
|
||||
type GossipSubRouter struct {
|
||||
p *PubSub
|
||||
peers map[peer.ID]protocol.ID // peer protocols
|
||||
mesh map[string]map[peer.ID]struct{} // topic meshes
|
||||
fanout map[string]map[peer.ID]struct{} // topic fanout
|
||||
lastpub map[string]int64 // last pubish time for fanout topics
|
||||
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
|
||||
control map[peer.ID]*pb.ControlMessage // pending control messages
|
||||
mcache *MessageCache
|
||||
|
@ -215,6 +228,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
|||
gs.fanout[topic] = gmap
|
||||
}
|
||||
}
|
||||
gs.lastpub[topic] = time.Now().UnixNano()
|
||||
}
|
||||
|
||||
for p := range gmap {
|
||||
|
@ -242,6 +256,7 @@ func (gs *GossipSubRouter) Join(topic string) {
|
|||
if ok {
|
||||
gs.mesh[topic] = gmap
|
||||
delete(gs.fanout, topic)
|
||||
delete(gs.lastpub, topic)
|
||||
} else {
|
||||
peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true })
|
||||
gmap = peerListToMap(peers)
|
||||
|
@ -371,6 +386,15 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||
gs.emitGossip(topic, peers)
|
||||
}
|
||||
|
||||
// expire fanout for topics we haven't published to in a while
|
||||
now := time.Now().UnixNano()
|
||||
for topic, lastpub := range gs.lastpub {
|
||||
if lastpub+int64(GossipSubFanoutTTL) < now {
|
||||
delete(gs.fanout, topic)
|
||||
delete(gs.lastpub, topic)
|
||||
}
|
||||
}
|
||||
|
||||
// maintain our fanout for topics we are publishing but we have not joined
|
||||
for topic, peers := range gs.fanout {
|
||||
// check whether our peers are still in the topic
|
||||
|
|
|
@ -254,6 +254,61 @@ func TestGossipsubFanoutMaintenance(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGossipsubFanoutExpiry(t *testing.T) {
|
||||
GossipSubFanoutTTL = 1 * time.Second
|
||||
defer func() { GossipSubFanoutTTL = 60 * time.Second }()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hosts := getNetHosts(t, ctx, 10)
|
||||
|
||||
psubs := getGossipsubs(ctx, hosts)
|
||||
|
||||
var msgs []*Subscription
|
||||
for _, ps := range psubs[1:] {
|
||||
subch, err := ps.Subscribe("foobar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msgs = append(msgs, subch)
|
||||
}
|
||||
|
||||
denseConnect(t, hosts)
|
||||
|
||||
// wait for heartbeats to build mesh
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
|
||||
|
||||
owner := 0
|
||||
|
||||
psubs[owner].Publish("foobar", msg)
|
||||
|
||||
for _, sub := range msgs {
|
||||
got, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(sub.err)
|
||||
}
|
||||
if !bytes.Equal(msg, got.Data) {
|
||||
t.Fatal("got wrong message!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 {
|
||||
t.Fatal("owner has no fanout")
|
||||
}
|
||||
|
||||
// wait for TTL to expire fanout peers in owner
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 {
|
||||
t.Fatal("fanout hasn't expired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGossipsubGossip(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
Loading…
Reference in New Issue