From 1dc8405449535d8629f3a123eda819dc58255222 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 10 Mar 2018 11:21:50 +0200 Subject: [PATCH] more docs for gossipsub router, expire fanout peers when we haven't published in a while --- gossipsub.go | 24 +++++++++++++++++++++ gossipsub_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 66d1734..2648530 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -28,13 +28,18 @@ var ( // heartbeat interval GossipSubHeartbeatInterval = 1 * time.Second + + // fanout ttl + GossipSubFanoutTTL = 60 * time.Second ) +// NewGossipSub returns a new PubSub object using GossipSubRouter as the router func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &GossipSubRouter{ peers: make(map[peer.ID]protocol.ID), mesh: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}), + lastpub: make(map[string]int64), gossip: make(map[peer.ID][]*pb.ControlIHave), control: make(map[peer.ID]*pb.ControlMessage), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), @@ -42,11 +47,19 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er return NewPubSub(ctx, h, rt, opts...) } +// GossipSubRouter is a router that implements the gossipsub protocol. +// For each topic we have joined, we maintain an overlay through which +// messages flow; this is the mesh map. +// For each topic we publish to without joining, we maintain a list of peers +// to use for injecting our messages in the overlay with stable routes; this +// is the fanout map. Fanout peer lists are expired if we don't publish any +// messages to their topic for GossipSubFanoutTTL. type GossipSubRouter struct { p *PubSub peers map[peer.ID]protocol.ID // peer protocols mesh map[string]map[peer.ID]struct{} // topic meshes fanout map[string]map[peer.ID]struct{} // topic fanout + lastpub map[string]int64 // last pubish time for fanout topics gossip map[peer.ID][]*pb.ControlIHave // pending gossip control map[peer.ID]*pb.ControlMessage // pending control messages mcache *MessageCache @@ -215,6 +228,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { gs.fanout[topic] = gmap } } + gs.lastpub[topic] = time.Now().UnixNano() } for p := range gmap { @@ -242,6 +256,7 @@ func (gs *GossipSubRouter) Join(topic string) { if ok { gs.mesh[topic] = gmap delete(gs.fanout, topic) + delete(gs.lastpub, topic) } else { peers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) gmap = peerListToMap(peers) @@ -371,6 +386,15 @@ func (gs *GossipSubRouter) heartbeat() { gs.emitGossip(topic, peers) } + // expire fanout for topics we haven't published to in a while + now := time.Now().UnixNano() + for topic, lastpub := range gs.lastpub { + if lastpub+int64(GossipSubFanoutTTL) < now { + delete(gs.fanout, topic) + delete(gs.lastpub, topic) + } + } + // maintain our fanout for topics we are publishing but we have not joined for topic, peers := range gs.fanout { // check whether our peers are still in the topic diff --git a/gossipsub_test.go b/gossipsub_test.go index 83a37d1..8dae7ff 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -254,6 +254,61 @@ func TestGossipsubFanoutMaintenance(t *testing.T) { } } +func TestGossipsubFanoutExpiry(t *testing.T) { + GossipSubFanoutTTL = 1 * time.Second + defer func() { GossipSubFanoutTTL = 60 * time.Second }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 10) + + psubs := getGossipsubs(ctx, hosts) + + var msgs []*Subscription + for _, ps := range psubs[1:] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 5; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := 0 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } + + if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 { + t.Fatal("owner has no fanout") + } + + // wait for TTL to expire fanout peers in owner + time.Sleep(time.Second * 2) + + if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 { + t.Fatal("fanout hasn't expired") + } +} + func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()