deduplicate inbound streams
This commit is contained in:
parent
352c6b993e
commit
8676a0e25b
20
comm.go
20
comm.go
|
@ -41,6 +41,17 @@ func (p *PubSub) getHelloPacket() *RPC {
|
|||
}
|
||||
|
||||
func (p *PubSub) handleNewStream(s network.Stream) {
|
||||
peer := s.Conn().RemotePeer()
|
||||
|
||||
p.inboundStreamsMx.Lock()
|
||||
other, dup := p.inboundStreams[peer]
|
||||
if dup {
|
||||
log.Debugf("duplicate inbound stream from %s; resetting other stream", peer)
|
||||
other.Reset()
|
||||
}
|
||||
p.inboundStreams[peer] = s
|
||||
p.inboundStreamsMx.Unlock()
|
||||
|
||||
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
|
||||
for {
|
||||
rpc := new(RPC)
|
||||
|
@ -54,10 +65,17 @@ func (p *PubSub) handleNewStream(s network.Stream) {
|
|||
// but it doesn't hurt to send it.
|
||||
s.Close()
|
||||
}
|
||||
|
||||
p.inboundStreamsMx.Lock()
|
||||
if p.inboundStreams[peer] == s {
|
||||
delete(p.inboundStreams, peer)
|
||||
}
|
||||
p.inboundStreamsMx.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
rpc.from = s.Conn().RemotePeer()
|
||||
rpc.from = peer
|
||||
select {
|
||||
case p.incoming <- rpc:
|
||||
case <-p.ctx.Done():
|
||||
|
|
|
@ -134,6 +134,9 @@ type PubSub struct {
|
|||
|
||||
peers map[peer.ID]chan *RPC
|
||||
|
||||
inboundStreamsMx sync.Mutex
|
||||
inboundStreams map[peer.ID]network.Stream
|
||||
|
||||
seenMessagesMx sync.Mutex
|
||||
seenMessages *timecache.TimeCache
|
||||
|
||||
|
@ -253,6 +256,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
|||
myRelays: make(map[string]int),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
peers: make(map[peer.ID]chan *RPC),
|
||||
inboundStreams: make(map[peer.ID]network.Stream),
|
||||
blacklist: NewMapBlacklist(),
|
||||
blacklistPeer: make(chan peer.ID),
|
||||
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
|
||||
|
|
Loading…
Reference in New Issue