Restore interop with Lighthouse by preventing concurrent meshsub dials

This commit is contained in:
Zahary Karadjov 2020-08-17 22:39:39 +03:00
parent 833a5b8e57
commit 60122a044c
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
1 changed files with 31 additions and 24 deletions

View File

@ -46,6 +46,7 @@ type
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer subscribed*: bool # are we subscribed to this peer
dialLock: AsyncLock
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -141,31 +142,36 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# Careful, p.sendConn might change after here! # Careful, p.sendConn might change after here!
await current.close() # TODO this might be unnecessary await current.close() # TODO this might be unnecessary
# Grab a new send connection try:
let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here # Testing has demonstrated that when we perform concurrent meshsub dials
if newConn == nil: # and later close one of them, other implementations such as rust-libp2p
return p.sendConn # A concurrent attempt perhaps succeeded? # become deaf to our messages (potentially due to the clean-up associated
# with closing connections). To prevent this, we use a lock that ensures
# that only a single dial will be performed for each peer:
await p.dialLock.acquire()
# Because of the awaits above, a concurrent `getSendConn` call might have # Another concurrent dial may have populated p.sendConn
# set up a send connection already. We cannot take a lock here because if p.sendConn != nil:
# it might block the reading of data from mplex which will cause its let current = p.sendConn
# backpressure handling to stop reading from the socket and thus prevent the if not current.isNil:
# channel negotiation from finishing if not (current.closed() or current.atEof):
if p.sendConn != nil and not(p.sendConn.closed or p.sendConn.atEof): # The existing send connection looks like it might work - reuse it
let current = p.sendConn trace "Reusing existing connection", oid = $current.oid
# Either the new or the old connection could potentially be closed - it's return current
# slightly easier to sequence the closing of the new connection because the
# old one might still be in use.
trace "Closing redundant connection",
oid = $current.oid, newConn = $newConn.oid
await newConn.close()
return current
trace "Caching new send connection", oid = $newConn.oid # Grab a new send connection
p.sendConn = newConn let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here
asyncCheck p.handle(newConn) # start a read loop on the new connection if newConn.isNil:
return nil
return newConn trace "Caching new send connection", oid = $newConn.oid
p.sendConn = newConn
asyncCheck p.handle(newConn) # start a read loop on the new connection
return newConn
finally:
if p.dialLock.locked:
p.dialLock.release()
proc send*( proc send*(
p: PubSubPeer, p: PubSubPeer,
@ -207,11 +213,11 @@ proc send*(
if conn == nil: if conn == nil:
debug "Couldn't get send connection, dropping message" debug "Couldn't get send connection, dropping message"
return return
trace "sending encoded msgs to peer" trace "sending encoded msgs to peer", connId = $conn.oid
await conn.writeLp(encoded).wait(timeout) await conn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote" trace "sent pubsub message to remote", connId = $conn.oid
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
for x in mm.messages: for x in mm.messages:
@ -240,3 +246,4 @@ proc newPubSubPeer*(peerId: PeerID,
result.peerId = peerId result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes) result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes)
result.dialLock = newAsyncLock()