From 8676a0e25bfb2d475c70116e70567dca7954108c Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 12 Jan 2021 14:52:00 +0200 Subject: [PATCH] deduplicate inbound streams --- comm.go | 20 +++++++++++++++++++- pubsub.go | 4 ++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/comm.go b/comm.go index 5512ffe..aefeed3 100644 --- a/comm.go +++ b/comm.go @@ -41,6 +41,17 @@ func (p *PubSub) getHelloPacket() *RPC { } func (p *PubSub) handleNewStream(s network.Stream) { + peer := s.Conn().RemotePeer() + + p.inboundStreamsMx.Lock() + other, dup := p.inboundStreams[peer] + if dup { + log.Debugf("duplicate inbound stream from %s; resetting other stream", peer) + other.Reset() + } + p.inboundStreams[peer] = s + p.inboundStreamsMx.Unlock() + r := protoio.NewDelimitedReader(s, p.maxMessageSize) for { rpc := new(RPC) @@ -54,10 +65,17 @@ func (p *PubSub) handleNewStream(s network.Stream) { // but it doesn't hurt to send it. s.Close() } + + p.inboundStreamsMx.Lock() + if p.inboundStreams[peer] == s { + delete(p.inboundStreams, peer) + } + p.inboundStreamsMx.Unlock() + return } - rpc.from = s.Conn().RemotePeer() + rpc.from = peer select { case p.incoming <- rpc: case <-p.ctx.Done(): diff --git a/pubsub.go b/pubsub.go index 887b79a..fffff93 100644 --- a/pubsub.go +++ b/pubsub.go @@ -134,6 +134,9 @@ type PubSub struct { peers map[peer.ID]chan *RPC + inboundStreamsMx sync.Mutex + inboundStreams map[peer.ID]network.Stream + seenMessagesMx sync.Mutex seenMessages *timecache.TimeCache @@ -253,6 +256,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option myRelays: make(map[string]int), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), + inboundStreams: make(map[peer.ID]network.Stream), blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration),