From b867200feed7143df8737387d3620b15d44a8c0c Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 14:50:14 +0200 Subject: [PATCH] gossipsub: heartbeat timer --- gossipsub.go | 24 ++++++++++++++++++++++++ pubsub.go | 7 +++++++ 2 files changed, 31 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 46f50cf..d8dae5c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -2,6 +2,7 @@ package floodsub import ( "context" + "time" pb "github.com/libp2p/go-floodsub/pb" @@ -29,6 +30,7 @@ func (fs *GossipSubRouter) Protocols() []protocol.ID { func (fs *GossipSubRouter) Attach(p *PubSub) { fs.p = p + go fs.heartbeatTimer() } func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) { @@ -54,3 +56,25 @@ func (fs *GossipSubRouter) Join(topic string) { func (fs *GossipSubRouter) Leave(topic string) { } + +func (fs *GossipSubRouter) heartbeatTimer() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + select { + case fs.p.eval <- fs.heartbeat: + case <-fs.p.ctx.Done(): + return + } + case <-fs.p.ctx.Done(): + return + } + } +} + +func (fs *GossipSubRouter) heartbeat() { + +} diff --git a/pubsub.go b/pubsub.go index 4362b3e..3d0b3e8 100644 --- a/pubsub.go +++ b/pubsub.go @@ -82,6 +82,9 @@ type PubSub struct { // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} + // eval thunk in event loop + eval chan func() + peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache @@ -135,6 +138,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), validateThrottle: make(chan struct{}, defaultValidateThrottle), + eval: make(chan func()), myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), @@ -253,6 +257,9 @@ func (p *PubSub) processLoop(ctx context.Context) { case req := <-p.rmVal: p.rmValidator(req) + case thunk := <-p.eval: + thunk() + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return