remove send lock (#334)

* remove send lock

When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.

However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.

There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.

* prefer existing connection

simplifies flow
This commit is contained in:
Jacek Sieka 2020-08-17 12:38:27 +02:00 committed by GitHub
parent b12145dff7
commit f46bf0faa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 28 deletions

View File

@ -55,7 +55,6 @@ type
triggerSelf*: bool # trigger own local handler on publish triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification verifySignature*: bool # enable signature verification
sign*: bool # enable message signing sign*: bool # enable message signing
cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]] validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
@ -338,7 +337,6 @@ proc init*(
sign: sign, sign: sign,
peers: initTable[PeerID, PubSubPeer](), peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](), topics: initTable[string, Topic](),
cleanupLock: newAsyncLock(),
msgIdProvider: msgIdProvider) msgIdProvider: msgIdProvider)
result.initPubSub() result.initPubSub()

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[hashes, options, sequtils, strutils, tables] import std/[hashes, options, strutils, tables]
import chronos, chronicles, nimcrypto/sha2, metrics import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf], import rpc/[messages, message, protobuf],
timedcache, timedcache,
@ -39,14 +39,13 @@ type
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
switch*: Switch # switch instance to dial peers switch*: Switch # switch instance to dial peers
codec*: string # the protocol that this peer joined from codec*: string # the protocol that this peer joined from
sendConn: Connection sendConn: Connection # cached send connection
peerId*: PeerID peerId*: PeerID
handler*: RPCHandler handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer subscribed*: bool # are we subscribed to this peer
sendLock*: AsyncLock # send connection lock
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -117,11 +116,53 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "exiting pubsub peer read loop" debug "exiting pubsub peer read loop"
await conn.close() await conn.close()
if p.sendConn == conn:
p.sendConn = nil
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# get a cached send connection or create a new one
block: # check if there's an existing connection that can be reused
let current = p.sendConn
if not current.isNil:
if not (current.closed() or current.atEof):
# The existing send connection looks like it might work - reuse it
return current
# Send connection is set but broken - get rid of it
p.sendConn = nil
# Careful, p.sendConn might change after here!
await current.close() # TODO this might be unnecessary
# Grab a new send connection
let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here
if newConn == nil:
return p.sendConn # A concurrent attempt perhaps succeeded?
# Because of the awaits above, a concurrent `getSendConn` call might have
# set up a send connection already. We cannot take a lock here because
# it might block the reading of data from mplex which will cause its
# backpressure handling to stop reading from the socket and thus prevent the
# channel negotiation from finishing
if p.sendConn != nil and not(p.sendConn.closed or p.sendConn.atEof):
let current = p.sendConn
# Either the new or the old connection could potentially be closed - it's
# slightly easier to sequence the closing of the new connection because the
# old one might still be in use.
await newConn.close()
return current
p.sendConn = newConn
asyncCheck p.handle(newConn) # start a read loop on the new connection
return newConn
proc send*( proc send*(
p: PubSubPeer, p: PubSubPeer,
msg: RPCMsg, msg: RPCMsg,
@ -154,27 +195,17 @@ proc send*(
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return return
var conn: Connection
try: try:
trace "about to send message" trace "about to send message"
if not p.connected: conn = await p.getSendConn()
try:
await p.sendLock.acquire()
trace "no send connection, dialing peer"
# get a send connection if there is none
p.sendConn = await p.switch.dial(
p.peerId, p.codec)
if not p.connected:
raise newException(CatchableError, "unable to get send pubsub stream")
# install a reader on the send connection
asyncCheck p.handle(p.sendConn)
finally:
if p.sendLock.locked:
p.sendLock.release()
if conn == nil:
debug "Couldn't get send connection, dropping message"
return
trace "sending encoded msgs to peer" trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded).wait(timeout) await conn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote" trace "sent pubsub message to remote"
@ -186,9 +217,10 @@ proc send*(
except CatchableError as exc: except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)): # Next time sendConn is used, it will be have its close flag set and thus
await p.sendConn.close() # will be recycled
p.sendConn = nil if not isNil(conn):
await conn.close()
raise exc raise exc
@ -204,4 +236,3 @@ proc newPubSubPeer*(peerId: PeerID,
result.peerId = peerId result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes) result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes)
result.sendLock = newAsyncLock()

View File

@ -15,9 +15,7 @@ import utils,
../../libp2p/[errors, ../../libp2p/[errors,
switch, switch,
stream/connection, stream/connection,
stream/bufferstream,
crypto/crypto, crypto/crypto,
protocols/pubsub/pubsubpeer,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/floodsub, protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages, protocols/pubsub/rpc/messages,