properly cleanup up failed peers

This commit is contained in:
Dmitriy Ryajov 2020-08-08 14:51:14 -06:00
parent 5ab8fe8ced
commit 25f41dff6c
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
7 changed files with 64 additions and 43 deletions

View File

@ -33,12 +33,7 @@ method subscribeTopic*(f: FloodSub,
subscribe: bool, subscribe: bool,
peerId: PeerID) {.gcsafe, async.} = peerId: PeerID) {.gcsafe, async.} =
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
let peer = f.peers.getOrDefault(peerId) let peer = f.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!", peer = $peerId
return
if topic notin f.floodsub: if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]() f.floodsub[topic] = initHashSet[PubSubPeer]()
@ -51,15 +46,20 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
f.floodsub[topic].excl(peer) f.floodsub[topic].excl(peer)
method unsubscribePeer*(f: FloodSub, peer: PubSubPeer) = method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects ## handle peer disconnects
## ##
procCall PubSub(f).unsubscribePeer(peer) trace "unsubscribing floodsub peer", peer = $peer
let pubSubPeer = f.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
for t in toSeq(f.floodsub.keys): for t in toSeq(f.floodsub.keys):
if t in f.floodsub: if t in f.floodsub:
f.floodsub[t].excl(peer) f.floodsub[t].excl(pubSubPeer)
procCall PubSub(f).unsubscribePeer(peer)
method rpcHandler*(f: FloodSub, method rpcHandler*(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
@ -76,7 +76,7 @@ method rpcHandler*(f: FloodSub,
if msgId notin f.seen: if msgId notin f.seen:
f.seen.put(msgId) # add the message to the seen cache f.seen.put(msgId) # add the message to the seen cache
if f.verifySignature and not msg.verify(peer.peer): if f.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification"
continue continue

View File

@ -231,7 +231,7 @@ proc heartbeat(g: GossipSub) {.async.} =
let peers = g.getGossipPeers() let peers = g.getGossipPeers()
var sent: seq[Future[void]] var sent: seq[Future[void]]
for peer, control in peers: for peer, control in peers:
g.peers.withValue(peer.peer, pubsubPeer) do: g.peers.withValue(peer.peerId, pubsubPeer) do:
sent &= pubsubPeer[].send(RPCMsg(control: some(control))) sent &= pubsubPeer[].send(RPCMsg(control: some(control)))
checkFutures(await allFinished(sent)) checkFutures(await allFinished(sent))
@ -243,33 +243,38 @@ proc heartbeat(g: GossipSub) {.async.} =
await sleepAsync(GossipSubHeartbeatInterval) await sleepAsync(GossipSubHeartbeatInterval)
method unsubscribePeer*(g: GossipSub, peer: PubSubPeer) = method unsubscribePeer*(g: GossipSub, peer: PeerID) =
## handle peer disconnects ## handle peer disconnects
## ##
procCall FloodSub(g).unsubscribePeer(peer) trace "unsubscribing gossipsub peer", peer = $peer
let pubSubPeer = g.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
for t in toSeq(g.gossipsub.keys): for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer) g.gossipsub.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t]) .set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys): for t in toSeq(g.mesh.keys):
g.mesh.removePeer(t, peer) g.mesh.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t]) .set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys): for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer) g.fanout.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t]) .set(g.fanout.peers(t).int64, labelValues = [t])
procCall FloodSub(g).unsubscribePeer(peer)
method subscribeTopic*(g: GossipSub, method subscribeTopic*(g: GossipSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
@ -398,7 +403,7 @@ method rpcHandler*(g: GossipSub,
g.seen.put(msgId) # add the message to the seen cache g.seen.put(msgId) # add the message to the seen cache
if g.verifySignature and not msg.verify(peer.peer): if g.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification"
continue continue

View File

@ -16,7 +16,7 @@ type
proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool = proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool =
let peers = toSeq(t.getOrDefault(topic)) let peers = toSeq(t.getOrDefault(topic))
peers.any do (peer: PubSubPeer) -> bool: peers.any do (peer: PubSubPeer) -> bool:
peer.peer == peerId peerId == peerId
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added, # returns true if the peer was added,

View File

@ -57,15 +57,19 @@ type
sign*: bool # enable message signing sign*: bool # enable message signing
cleanupLock: AsyncLock cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]] validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64 msgSeqno*: uint64
lifetimeFut*: Future[void] # pubsub liftime future
method unsubscribePeer*(p: PubSub, peer: PubSubPeer) {.base.} = method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
## handle peer disconnects ## handle peer disconnects
## ##
peer.subscribed = false trace "unsubscribing pubsub peer", peer = $peerId
if peerId in p.peers:
p.peers.del(peerId)
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
@ -93,11 +97,12 @@ method rpcHandler*(p: PubSub,
if m.subscriptions.len > 0: # if there are any subscriptions if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic trace "about to subscribe to topic", topicId = s.topic
await p.subscribeTopic(s.topic, s.subscribe, peer.peer) await p.subscribeTopic(s.topic, s.subscribe, peer.peerId)
proc getOrCreatePeer(p: PubSub, proc getOrCreatePeer*(
peer: PeerID, p: PubSub,
proto: string): PubSubPeer = peer: PeerID,
proto: string): PubSubPeer =
if peer in p.peers: if peer in p.peers:
return p.peers[peer] return p.peers[peer]
@ -109,7 +114,6 @@ proc getOrCreatePeer(p: PubSub,
pubSubPeer.observers = p.observers pubSubPeer.observers = p.observers
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
return pubSubPeer return pubSubPeer
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
@ -136,7 +140,6 @@ method handleConn*(p: PubSub,
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
if p.topics.len > 0: if p.topics.len > 0:
await p.sendSubs(peer, toSeq(p.topics.keys), true) await p.sendSubs(peer, toSeq(p.topics.keys), true)
@ -222,12 +225,15 @@ proc publishHelper*(p: PubSub,
# send messages and cleanup failed peers # send messages and cleanup failed peers
var sent: seq[tuple[id: PeerID, fut: Future[void]]] var sent: seq[tuple[id: PeerID, fut: Future[void]]]
for sendPeer in sendPeers: for sendPeer in sendPeers:
if sendPeer.isNil:
continue
# avoid sending to self # avoid sending to self
if sendPeer.peer == p.peerInfo.peerId: if sendPeer.peerId == p.peerInfo.peerId:
continue continue
trace "sending messages to peer", peer = sendPeer.id, msgs trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.peer, fut: sendPeer.send(RPCMsg(messages: msgs), timeout))) sent.add((id: sendPeer.peerId, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))
var published: seq[PeerID] var published: seq[PeerID]
var failed: seq[PeerID] var failed: seq[PeerID]
@ -237,6 +243,7 @@ proc publishHelper*(p: PubSub,
if f.len > 0: if f.len > 0:
if s.failed: if s.failed:
trace "sending messages to peer failed", peer = f[0].id trace "sending messages to peer failed", peer = f[0].id
p.unsubscribePeer(f[0].id)
failed.add(f[0].id) failed.add(f[0].id)
else: else:
trace "sending messages to peer succeeded", peer = f[0].id trace "sending messages to peer succeeded", peer = f[0].id

View File

@ -40,7 +40,7 @@ type
switch*: Switch # switch instance to dial peers switch*: Switch # switch instance to dial peers
codec*: string # the protocol that this peer joined from codec*: string # the protocol that this peer joined from
sendConn: Connection sendConn: Connection
peer*: PeerID peerId*: PeerID
handler*: RPCHandler handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
@ -54,10 +54,13 @@ func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref # int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash cast[pointer](p).hash
proc id*(p: PubSubPeer): string = p.peer.pretty proc id*(p: PubSubPeer): string =
doAssert(not p.isNil, "nil pubsubpeer")
p.peerId.pretty
proc connected*(p: PubSubPeer): bool = proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn)) and not p.sendConn.closed not(isNil(p.sendConn)) and not
(p.sendConn.closed or p.sendConn.atEof)
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks # trigger hooks
@ -155,16 +158,21 @@ proc send*(
try: try:
await p.sendLock.acquire() await p.sendLock.acquire()
trace "no send connection, dialing peer" trace "no send connection, dialing peer"
# get a send connection if there is none
p.sendConn = await p.switch.dial( p.sendConn = await p.switch.dial(
p.peer, p.codec) p.peerId, p.codec)
.wait(DefaultSendTimeout) # get a send connection if there is none
asyncCheck p.handle(p.sendConn) # install a reader on the send connection if not p.connected:
raise newException(CatchableError, "unable to get send pubsub stream")
# install a reader on the send connection
asyncCheck p.handle(p.sendConn)
finally: finally:
if p.sendLock.locked: if p.sendLock.locked:
p.sendLock.release() p.sendLock.release()
trace "sending encoded msgs to peer" trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded).wait(DefaultSendTimeout) await p.sendConn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote" trace "sent pubsub message to remote"
@ -218,13 +226,13 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
proc `$`*(p: PubSubPeer): string = proc `$`*(p: PubSubPeer): string =
p.id p.id
proc newPubSubPeer*(peer: PeerID, proc newPubSubPeer*(peerId: PeerID,
switch: Switch, switch: Switch,
codec: string): PubSubPeer = codec: string): PubSubPeer =
new result new result
result.switch = switch result.switch = switch
result.codec = codec result.codec = codec
result.peer = peer result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes) result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes)
result.sendLock = newAsyncLock() result.sendLock = newAsyncLock()

View File

@ -45,7 +45,7 @@ template withExceptions(body: untyped) =
raise exc raise exc
except TransportIncompleteError: except TransportIncompleteError:
# for all intents and purposes this is an EOF # for all intents and purposes this is an EOF
raise newLPStreamEOFError() raise newLPStreamIncompleteError()
except TransportLimitError: except TransportLimitError:
raise newLPStreamLimitError() raise newLPStreamLimitError()
except TransportUseClosedError: except TransportUseClosedError:

View File

@ -268,7 +268,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc internalConnect(s: Switch, proc internalConnect(s: Switch,
peerId: PeerID, peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} = addrs: seq[MultiAddress]): Future[Connection] {.async.} =
logScope: peer = peerId logScope:
peer = peerId
if s.peerInfo.peerId == peerId: if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!") raise newException(CatchableError, "can't dial self!")
@ -326,11 +327,11 @@ proc internalConnect(s: Switch,
libp2p_failed_upgrade.inc() libp2p_failed_upgrade.inc()
raise exc raise exc
doAssert not isNil(upgraded), "checked in upgradeOutgoing" doAssert not isNil(upgraded), "connection died after upgradeOutgoing"
s.connManager.storeOutgoing(upgraded) s.connManager.storeOutgoing(upgraded)
trace "dial successful", trace "dial successful",
oid = $conn.oid, oid = $upgraded.oid,
peerInfo = shortLog(upgraded.peerInfo) peerInfo = shortLog(upgraded.peerInfo)
conn = upgraded conn = upgraded