Connections and pubsub peers cleanup (#279)

* better peer tracking and cleanup

* check if peer and conn is nil

* test name

* make timeout more agressive

* rename method for better clarity
This commit is contained in:
Dmitriy Ryajov 2020-07-17 13:46:24 -06:00 committed by GitHub
parent ba071cafa6
commit 94196fee71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 82 additions and 59 deletions

View File

@ -97,13 +97,9 @@ method rpcHandler*(f: FloodSub,
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it
let (published, failed) = await f.sendHelper(toSendPeers, m.messages)
for p in failed:
let peer = f.peers.getOrDefault(p)
if not(isNil(peer)):
f.handleDisconnect(peer) # cleanup failed peers
let published = await f.publishHelper(toSendPeers, m.messages)
trace "forwared message to peers", peers = published.len
trace "forwared message to peers", peers = published
method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
@ -139,18 +135,15 @@ method publish*(f: FloodSub,
trace "publishing on topic", name = topic
inc f.msgSeqno
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
# start the future but do not wait yet
let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg])
for p in failed:
let peer = f.peers.getOrDefault(p)
if not isNil(peer):
f.handleDisconnect(peer) # cleanup failed peers
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg])
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published.len,
trace "published message to peers", peers = published,
msg = msg.shortLog()
return published.len
return published
method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} =

View File

@ -422,13 +422,9 @@ method rpcHandler*(g: GossipSub,
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it
let (published, failed) = await g.sendHelper(toSendPeers, m.messages)
for p in failed:
let peer = g.peers.getOrDefault(p)
if not isNil(peer):
g.handleDisconnect(peer) # cleanup failed peers
let published = await g.publishHelper(toSendPeers, m.messages)
trace "forwared message to peers", peers = published.len
trace "forwared message to peers", peers = published
var respControl: ControlMessage
if m.control.isSome:
@ -501,18 +497,13 @@ method publish*(g: GossipSub,
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
let (published, failed) = await g.sendHelper(peers, @[msg])
for p in failed:
let peer = g.peers.getOrDefault(p)
if not isNil(peer):
g.handleDisconnect(peer) # cleanup failed peers
if published.len > 0:
let published = await g.publishHelper(peers, @[msg])
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published.len,
trace "published message to peers", peers = published,
msg = msg.shortLog()
return published.len
return published
method start*(g: GossipSub) {.async.} =
trace "gossipsub start"

View File

@ -8,20 +8,22 @@
## those terms.
import tables, sequtils, sets
import pubsubpeer
import pubsubpeer, ../../peerid
type
PeerTable* = Table[string, HashSet[PubSubPeer]]
PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool =
# unefficient but used only in tests!
let peers = toSeq(t.getOrDefault(topic))
peers.any do (peer: PubSubPeer) -> bool:
peer.id == peerId
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added, false if it was already in the collection
not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer)
# returns true if the peer was added,
# false if it was already in the collection
not table.mgetOrPut(topic,
initHashSet[PubSubPeer]())
.containsOrIncl(peer)
func removePeer*(table: var PeerTable, topic: string, peer: PubSubPeer) =
table.withValue(topic, peers):

View File

@ -48,12 +48,13 @@ type
handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr
@ -63,8 +64,7 @@ type
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects
##
if not isNil(peer.peerInfo) and
peer.id in p.peers and not peer.inUse():
if not(isNil(peer)) and peer.peerInfo notin p.conns:
trace "deleting peer", peer = peer.id
p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id
@ -72,6 +72,24 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
proc onConnClose(p: PubSub, conn: Connection) {.async.} =
try:
let peer = conn.peerInfo
await conn.closeEvent.wait()
if peer in p.conns:
p.conns[peer].excl(conn)
if p.conns[peer].len <= 0:
p.conns.del(peer)
if peer.id in p.peers:
p.handleDisconnect(p.peers[peer.id])
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in onConnClose handler", exc = exc.msg
proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: seq[string],
@ -87,11 +105,14 @@ proc sendSubs*(p: PubSub,
await peer.sendSubOpts(topics, subscribe)
except CancelledError as exc:
p.handleDisconnect(peer)
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
raise exc
except CatchableError as exc:
trace "unable to send subscriptions", exc = exc.msg
p.handleDisconnect(peer)
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
method subscribeTopic*(p: PubSub,
topic: string,
@ -150,11 +171,19 @@ method handleConn*(p: PubSub,
await conn.close()
return
# track connection
p.conns.mgetOrPut(conn.peerInfo,
initHashSet[Connection]())
.incl(conn)
asyncCheck p.onConnClose(conn)
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)
let peer = p.getOrCreatePeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys)
if topics.len > 0:
await p.sendSubs(peer, topics, true)
@ -168,13 +197,20 @@ method handleConn*(p: PubSub,
except CatchableError as exc:
trace "exception ocurred in pubsub handle", exc = exc.msg
finally:
p.handleDisconnect(peer)
await conn.close()
method subscribePeer*(p: PubSub, conn: Connection) {.base.} =
if not(isNil(conn)):
let peer = p.getOrCreatePeer(conn.peerInfo, p.codec)
trace "subscribing to peer", peerId = conn.peerInfo.id
# track connection
p.conns.mgetOrPut(conn.peerInfo,
initHashSet[Connection]())
.incl(conn)
asyncCheck p.onConnClose(conn)
let peer = p.getOrCreatePeer(conn.peerInfo, p.codec)
if not peer.connected:
peer.conn = conn
@ -183,11 +219,9 @@ method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
let peer = p.peers[peerInfo.id]
trace "unsubscribing from peer", peerId = $peerInfo
if not(isNil(peer.conn)):
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
p.handleDisconnect(peer)
proc connected*(p: PubSub, peerInfo: PeerInfo): bool =
if peerInfo.id in p.peers:
let peer = p.peers[peerInfo.id]
@ -269,6 +303,18 @@ proc sendHelper*(p: PubSub,
return (published, failed)
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[int] {.async.} =
# send messages and cleanup failed peers
let (published, failed) = await p.sendHelper(sendPeers, msgs)
for f in failed:
let peer = p.peers.getOrDefault(f)
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
return published.len
method publish*(p: PubSub,
topic: string,
data: seq[byte]): Future[int] {.base, async.} =

View File

@ -40,7 +40,6 @@ type
recvdRpcCache: TimedCache[string] # cache for already received messages
onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
refs: int # how many active connections this peer has
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -50,9 +49,6 @@ func hash*(p: PubSubPeer): Hash =
proc id*(p: PubSubPeer): string = p.peerInfo.id
proc inUse*(p: PubSubPeer): bool =
p.refs > 0
proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn))
@ -61,7 +57,6 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
trace "attaching send connection for peer", peer = p.id
p.sendConn = conn
p.onConnect.fire()
p.refs.inc()
proc conn*(p: PubSubPeer): Connection =
p.sendConn
@ -86,7 +81,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop for peer", closed = conn.closed
try:
try:
p.refs.inc()
while not conn.closed:
trace "waiting for data", closed = conn.closed
let data = await conn.readLp(64 * 1024)
@ -124,8 +118,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc
finally:
p.refs.dec()
proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
logScope:
@ -172,7 +164,6 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
p.sendConn = nil
p.onConnect.clear()
p.refs.dec()
raise exc
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] =

View File

@ -39,7 +39,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
rng = newRng(),
inTimeout: Duration = 1.minutes,
outTimeout: Duration = 5.minutes): Switch =
outTimeout: Duration = 1.minutes): Switch =
proc createMplex(conn: Connection): Muxer =
Mplex.init(
conn,

View File

@ -211,7 +211,7 @@ suite "Mplex":
check:
waitFor(testResetWrite()) == true
test "timeout, channel should reset":
test "reset - channel should reset on timeout":
proc testResetWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let