gossipsub: heartbeat timer
This commit is contained in:
parent
d6104094c8
commit
b867200fee
24
gossipsub.go
24
gossipsub.go
|
@ -2,6 +2,7 @@ package floodsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
pb "github.com/libp2p/go-floodsub/pb"
|
pb "github.com/libp2p/go-floodsub/pb"
|
||||||
|
|
||||||
|
@ -29,6 +30,7 @@ func (fs *GossipSubRouter) Protocols() []protocol.ID {
|
||||||
|
|
||||||
func (fs *GossipSubRouter) Attach(p *PubSub) {
|
func (fs *GossipSubRouter) Attach(p *PubSub) {
|
||||||
fs.p = p
|
fs.p = p
|
||||||
|
go fs.heartbeatTimer()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) {
|
func (fs *GossipSubRouter) AddPeer(peer.ID, protocol.ID) {
|
||||||
|
@ -54,3 +56,25 @@ func (fs *GossipSubRouter) Join(topic string) {
|
||||||
func (fs *GossipSubRouter) Leave(topic string) {
|
func (fs *GossipSubRouter) Leave(topic string) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *GossipSubRouter) heartbeatTimer() {
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
select {
|
||||||
|
case fs.p.eval <- fs.heartbeat:
|
||||||
|
case <-fs.p.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-fs.p.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *GossipSubRouter) heartbeat() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -82,6 +82,9 @@ type PubSub struct {
|
||||||
// validateThrottle limits the number of active validation goroutines
|
// validateThrottle limits the number of active validation goroutines
|
||||||
validateThrottle chan struct{}
|
validateThrottle chan struct{}
|
||||||
|
|
||||||
|
// eval thunk in event loop
|
||||||
|
eval chan func()
|
||||||
|
|
||||||
peers map[peer.ID]chan *RPC
|
peers map[peer.ID]chan *RPC
|
||||||
seenMessages *timecache.TimeCache
|
seenMessages *timecache.TimeCache
|
||||||
|
|
||||||
|
@ -135,6 +138,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||||
addVal: make(chan *addValReq),
|
addVal: make(chan *addValReq),
|
||||||
rmVal: make(chan *rmValReq),
|
rmVal: make(chan *rmValReq),
|
||||||
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||||
|
eval: make(chan func()),
|
||||||
myTopics: make(map[string]map[*Subscription]struct{}),
|
myTopics: make(map[string]map[*Subscription]struct{}),
|
||||||
topics: make(map[string]map[peer.ID]struct{}),
|
topics: make(map[string]map[peer.ID]struct{}),
|
||||||
peers: make(map[peer.ID]chan *RPC),
|
peers: make(map[peer.ID]chan *RPC),
|
||||||
|
@ -253,6 +257,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
case req := <-p.rmVal:
|
case req := <-p.rmVal:
|
||||||
p.rmValidator(req)
|
p.rmValidator(req)
|
||||||
|
|
||||||
|
case thunk := <-p.eval:
|
||||||
|
thunk()
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Info("pubsub processloop shutting down")
|
log.Info("pubsub processloop shutting down")
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue