2019-09-10 02:15:52 +00:00
|
|
|
## Nim-LibP2P
|
2019-09-24 17:48:23 +00:00
|
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
2019-09-10 02:15:52 +00:00
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
2020-08-19 09:41:38 +00:00
|
|
|
import std/[sequtils, strutils, tables, hashes, sets]
|
2020-06-11 18:09:34 +00:00
|
|
|
import chronos, chronicles, nimcrypto/sha2, metrics
|
2019-12-10 20:50:35 +00:00
|
|
|
import rpc/[messages, message, protobuf],
|
2020-07-04 02:29:37 +00:00
|
|
|
../../peerid,
|
2019-09-24 16:16:39 +00:00
|
|
|
../../peerinfo,
|
2020-06-19 17:29:43 +00:00
|
|
|
../../stream/connection,
|
2019-09-12 05:46:08 +00:00
|
|
|
../../crypto/crypto,
|
2020-03-23 06:03:36 +00:00
|
|
|
../../protobuf/minprotobuf,
|
|
|
|
../../utility
|
2019-09-10 02:15:52 +00:00
|
|
|
|
|
|
|
logScope:
|
2020-06-10 08:48:01 +00:00
|
|
|
topics = "pubsubpeer"
|
2019-09-10 02:15:52 +00:00
|
|
|
|
2020-08-04 23:27:59 +00:00
|
|
|
when defined(libp2p_expensive_metrics):
|
|
|
|
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
|
|
|
|
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
|
|
|
|
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
|
|
|
|
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
2020-06-12 02:20:58 +00:00
|
|
|
|
2019-09-10 02:15:52 +00:00
|
|
|
type
|
2020-04-30 13:22:31 +00:00
|
|
|
PubSubObserver* = ref object
|
2020-05-29 16:46:27 +00:00
|
|
|
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
|
|
|
|
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
|
2019-09-10 02:15:52 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.}
|
|
|
|
|
2020-04-30 13:22:31 +00:00
|
|
|
PubSubPeer* = ref object of RootObj
|
2020-09-01 07:33:03 +00:00
|
|
|
getConn*: GetConn # callback to establish a new send connection
|
2020-08-12 00:05:49 +00:00
|
|
|
codec*: string # the protocol that this peer joined from
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
sendConn: Connection # cached send connection
|
2020-09-01 07:33:03 +00:00
|
|
|
connections*: seq[Connection] # connections to this peer
|
2020-08-12 00:05:49 +00:00
|
|
|
peerId*: PeerID
|
2020-04-30 13:22:31 +00:00
|
|
|
handler*: RPCHandler
|
|
|
|
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
|
2020-08-17 19:39:39 +00:00
|
|
|
dialLock: AsyncLock
|
2020-07-16 12:53:45 +00:00
|
|
|
|
2020-07-19 03:37:45 +00:00
|
|
|
score*: float64
|
2020-08-14 08:07:26 +00:00
|
|
|
iWantBudget*: int
|
2020-08-16 16:20:50 +00:00
|
|
|
iHaveBudget*: int
|
|
|
|
outbound*: bool # if this is an outbound connection
|
|
|
|
appScore*: float64 # application specific score
|
|
|
|
behaviourPenalty*: float64 # the eventual penalty score
|
2020-07-19 03:37:45 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
|
2019-09-10 02:15:52 +00:00
|
|
|
|
2020-08-12 03:04:54 +00:00
|
|
|
chronicles.formatIt(PubSubPeer): $it.peerId
|
2020-07-18 09:09:40 +00:00
|
|
|
|
2020-07-13 13:32:38 +00:00
|
|
|
func hash*(p: PubSubPeer): Hash =
|
|
|
|
# int is either 32/64, so intptr basically, pubsubpeer is a ref
|
|
|
|
cast[pointer](p).hash
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
func shortLog*(p: PubSubPeer): string =
|
|
|
|
if p.isNil: "PubSubPeer(nil)"
|
|
|
|
else: shortLog(p.peerId)
|
|
|
|
chronicles.formatIt(PubSubPeer): shortLog(it)
|
2019-12-06 02:16:18 +00:00
|
|
|
|
2020-07-04 02:29:37 +00:00
|
|
|
proc connected*(p: PubSubPeer): bool =
|
2020-08-12 00:05:49 +00:00
|
|
|
not p.sendConn.isNil and not
|
|
|
|
(p.sendConn.closed or p.sendConn.atEof)
|
2020-07-08 00:33:05 +00:00
|
|
|
|
2020-05-29 16:46:27 +00:00
|
|
|
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
|
|
|
|
# trigger hooks
|
|
|
|
if not(isNil(p.observers)) and p.observers[].len > 0:
|
|
|
|
for obs in p.observers[]:
|
2020-07-04 02:29:37 +00:00
|
|
|
if not(isNil(obs)): # TODO: should never be nil, but...
|
|
|
|
obs.onRecv(p, msg)
|
2020-05-29 16:46:27 +00:00
|
|
|
|
|
|
|
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
|
|
|
|
# trigger hooks
|
|
|
|
if not(isNil(p.observers)) and p.observers[].len > 0:
|
|
|
|
for obs in p.observers[]:
|
2020-07-04 02:29:37 +00:00
|
|
|
if not(isNil(obs)): # TODO: should never be nil, but...
|
|
|
|
obs.onSend(p, msg)
|
2020-05-29 16:46:27 +00:00
|
|
|
|
2019-12-17 05:24:03 +00:00
|
|
|
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
2020-08-12 00:05:49 +00:00
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
debug "starting pubsub read loop",
|
|
|
|
conn, peer = p, closed = conn.closed
|
2019-09-10 02:15:52 +00:00
|
|
|
try:
|
2020-05-25 14:33:24 +00:00
|
|
|
try:
|
2020-08-03 05:20:11 +00:00
|
|
|
while not conn.atEof:
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "waiting for data", conn, peer = p, closed = conn.closed
|
|
|
|
|
2020-08-12 00:05:49 +00:00
|
|
|
let data = await conn.readLp(64 * 1024)
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "read data from peer",
|
|
|
|
conn, peer = p, closed = conn.closed,
|
|
|
|
data = data.shortLog
|
2020-05-25 14:33:24 +00:00
|
|
|
|
2020-07-15 08:25:39 +00:00
|
|
|
var rmsg = decodeRpcMsg(data)
|
|
|
|
if rmsg.isErr():
|
2020-09-06 08:31:47 +00:00
|
|
|
notice "failed to decode msg from peer",
|
|
|
|
conn, peer = p, closed = conn.closed,
|
|
|
|
err = rmsg.error()
|
2020-07-15 08:25:39 +00:00
|
|
|
break
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "decoded msg from peer",
|
|
|
|
conn, peer = p, closed = conn.closed,
|
|
|
|
msg = rmsg.get().shortLog
|
2020-05-25 14:33:24 +00:00
|
|
|
# trigger hooks
|
2020-09-04 06:10:32 +00:00
|
|
|
p.recvObservers(rmsg.get())
|
2020-06-02 23:53:38 +00:00
|
|
|
|
2020-08-04 23:27:59 +00:00
|
|
|
when defined(libp2p_expensive_metrics):
|
2020-09-04 06:10:32 +00:00
|
|
|
for m in rmsg.get().messages:
|
2020-08-04 23:27:59 +00:00
|
|
|
for t in m.topicIDs:
|
|
|
|
# metrics
|
2020-09-06 08:31:47 +00:00
|
|
|
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
|
2020-06-11 18:09:34 +00:00
|
|
|
|
2020-09-04 06:10:32 +00:00
|
|
|
await p.handler(p, rmsg.get())
|
2020-05-25 14:33:24 +00:00
|
|
|
finally:
|
|
|
|
await conn.close()
|
|
|
|
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
if p.sendConn == conn:
|
|
|
|
p.sendConn = nil
|
|
|
|
|
2020-09-04 16:30:45 +00:00
|
|
|
except CancelledError:
|
|
|
|
# This is top-level procedure which will work as separate task, so it
|
|
|
|
# do not need to propogate CancelledError.
|
|
|
|
trace "Unexpected cancellation in PubSubPeer.handle"
|
2019-12-06 02:16:18 +00:00
|
|
|
except CatchableError as exc:
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "Exception occurred in PubSubPeer.handle",
|
|
|
|
conn, peer = p, closed = conn.closed, exc = exc.msg
|
2020-09-01 07:33:03 +00:00
|
|
|
finally:
|
2020-09-06 08:31:47 +00:00
|
|
|
debug "exiting pubsub read loop",
|
|
|
|
conn, peer = p, closed = conn.closed
|
2019-09-10 02:15:52 +00:00
|
|
|
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
|
2020-09-01 07:33:03 +00:00
|
|
|
## get a cached send connection or create a new one - will return nil if
|
|
|
|
## getting a new connection fails
|
|
|
|
##
|
|
|
|
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
block: # check if there's an existing connection that can be reused
|
|
|
|
let current = p.sendConn
|
|
|
|
|
|
|
|
if not current.isNil:
|
|
|
|
if not (current.closed() or current.atEof):
|
|
|
|
# The existing send connection looks like it might work - reuse it
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "Reusing existing connection", current
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
return current
|
|
|
|
|
|
|
|
# Send connection is set but broken - get rid of it
|
|
|
|
p.sendConn = nil
|
|
|
|
|
|
|
|
# Careful, p.sendConn might change after here!
|
|
|
|
await current.close() # TODO this might be unnecessary
|
|
|
|
|
2020-08-17 19:39:39 +00:00
|
|
|
try:
|
|
|
|
# Testing has demonstrated that when we perform concurrent meshsub dials
|
|
|
|
# and later close one of them, other implementations such as rust-libp2p
|
|
|
|
# become deaf to our messages (potentially due to the clean-up associated
|
|
|
|
# with closing connections). To prevent this, we use a lock that ensures
|
2020-09-01 07:33:03 +00:00
|
|
|
# that only a single dial will be performed for each peer and send the
|
|
|
|
# subscription table every time we reconnect.
|
2020-08-18 10:51:27 +00:00
|
|
|
#
|
|
|
|
# Nevertheless, this approach is still quite problematic because the gossip
|
|
|
|
# sends and their respective dials may be started from the mplex read loop.
|
|
|
|
# This may cause the read loop to get stuck which ultimately results in a
|
|
|
|
# deadlock when the other side tries to send us any other message that must
|
|
|
|
# be routed through mplex (it will be stuck on `pushTo`). Such messages
|
|
|
|
# naturally arise in the process of dialing itself.
|
|
|
|
#
|
|
|
|
# See https://github.com/status-im/nim-libp2p/issues/337
|
|
|
|
#
|
|
|
|
# One possible long-term solution is to avoid "blocking" the mplex read
|
|
|
|
# loop by making the gossip send non-blocking through the use of a queue.
|
2020-08-17 19:39:39 +00:00
|
|
|
await p.dialLock.acquire()
|
|
|
|
|
|
|
|
# Another concurrent dial may have populated p.sendConn
|
|
|
|
if p.sendConn != nil:
|
|
|
|
let current = p.sendConn
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
if not (current.closed() or current.atEof):
|
|
|
|
# The existing send connection looks like it might work - reuse it
|
|
|
|
debug "Reusing existing connection", current
|
|
|
|
return current
|
|
|
|
else:
|
|
|
|
p.sendConn = nil
|
2020-08-17 19:39:39 +00:00
|
|
|
|
|
|
|
# Grab a new send connection
|
2020-09-01 07:33:03 +00:00
|
|
|
let (newConn, handshake) = await p.getConn() # ...and here
|
2020-08-17 19:39:39 +00:00
|
|
|
if newConn.isNil:
|
|
|
|
return nil
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "Sending handshake", newConn, handshake = shortLog(handshake)
|
2020-09-01 07:33:03 +00:00
|
|
|
await newConn.writeLp(encodeRpcMsg(handshake))
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "Caching new send connection", newConn
|
2020-08-17 19:39:39 +00:00
|
|
|
p.sendConn = newConn
|
2020-09-04 16:30:45 +00:00
|
|
|
# Start a read loop on the new connection.
|
|
|
|
# All the errors are handled inside `handle()` procedure.
|
|
|
|
asyncSpawn p.handle(newConn)
|
2020-08-17 19:39:39 +00:00
|
|
|
return newConn
|
|
|
|
finally:
|
|
|
|
if p.dialLock.locked:
|
|
|
|
p.dialLock.release()
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
proc connectImpl*(p: PubSubPeer) {.async.} =
|
|
|
|
try:
|
|
|
|
discard await getSendConn(p)
|
|
|
|
except CatchableError as exc:
|
|
|
|
debug "Could not connect to pubsub peer", err = exc.msg
|
2020-08-12 00:05:49 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
proc connect*(p: PubSubPeer) =
|
|
|
|
asyncCheck(connectImpl(p))
|
2020-08-12 00:05:49 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
|
2020-08-12 00:05:49 +00:00
|
|
|
doAssert(not isNil(p), "pubsubpeer nil!")
|
2020-07-08 00:33:05 +00:00
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
2019-12-10 20:50:35 +00:00
|
|
|
|
2020-07-16 10:06:57 +00:00
|
|
|
# trigger send hooks
|
|
|
|
var mm = msg # hooks can modify the message
|
|
|
|
p.sendObservers(mm)
|
2020-05-29 16:46:27 +00:00
|
|
|
|
2020-07-16 10:06:57 +00:00
|
|
|
let encoded = encodeRpcMsg(mm)
|
|
|
|
if encoded.len <= 0:
|
2020-08-03 05:20:11 +00:00
|
|
|
info "empty message, skipping"
|
2020-07-16 10:06:57 +00:00
|
|
|
return
|
2019-12-06 02:16:18 +00:00
|
|
|
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
var conn: Connection
|
2020-08-12 00:05:49 +00:00
|
|
|
try:
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
conn = await p.getSendConn()
|
|
|
|
if conn == nil:
|
2020-09-09 17:12:08 +00:00
|
|
|
trace "Couldn't get send connection, dropping message", peer = p
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
return
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
|
2020-09-01 07:33:03 +00:00
|
|
|
await conn.writeLp(encoded)
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "sent pubsub message to remote", conn
|
2020-08-03 05:20:11 +00:00
|
|
|
|
2020-08-12 00:05:49 +00:00
|
|
|
when defined(libp2p_expensive_metrics):
|
|
|
|
for x in mm.messages:
|
|
|
|
for t in x.topicIDs:
|
|
|
|
# metrics
|
2020-09-06 08:31:47 +00:00
|
|
|
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
2020-07-16 10:06:57 +00:00
|
|
|
|
2020-07-09 17:16:46 +00:00
|
|
|
except CatchableError as exc:
|
2020-09-01 07:33:03 +00:00
|
|
|
# Because we detach the send call from the currently executing task using
|
|
|
|
# asyncCheck, no exceptions may leak out of it
|
2020-09-09 17:12:08 +00:00
|
|
|
trace "Unable to send to remote", conn, exc = exc.msg
|
remove send lock (#334)
* remove send lock
When mplex receives data it will block until a reader has processed the
data. Thus, when a large message is received, such as a gossipsub
subscription table, all of mplex will be blocked until all reading is
finished.
However, if at the same time a `dial` to establish a gossipsub send
connection is ongoing, that `dial` will be blocked because mplex is no
longer reading data - specifically, it might indeed be the connection
that's processing the previous data that is waiting for a send
connection.
There are other problems with the current code:
* If an exception is raised, it is not necessarily raised for the same
connection as `p.sendConn`, so resetting `p.sendConn` in the exception
handling is wrong
* `p.isConnected` is checked before taking the lock - thus, if it
returns false, a new dial will be started. If a new task enters `send`
before dial is finished, it will also determine `p.isConnected` is
false, then get stuck on the lock - when the previous task finishes and
releases the lock, the new task will _also_ dial and thus reset
`p.sendConn` causing a leak.
* prefer existing connection
simplifies flow
2020-08-17 10:38:27 +00:00
|
|
|
# Next time sendConn is used, it will be have its close flag set and thus
|
|
|
|
# will be recycled
|
|
|
|
if not isNil(conn):
|
2020-09-01 07:33:03 +00:00
|
|
|
await conn.close() # This will clean up the send connection
|
2020-07-16 10:06:57 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
if exc is CancelledError: # TODO not handled
|
2020-09-06 08:31:47 +00:00
|
|
|
debug "Send cancelled", peer = p
|
2020-09-01 07:33:03 +00:00
|
|
|
|
|
|
|
# We'll ask for a new send connection whenever possible
|
|
|
|
if p.sendConn == conn:
|
|
|
|
p.sendConn = nil
|
2020-07-16 10:06:57 +00:00
|
|
|
|
2020-09-01 07:33:03 +00:00
|
|
|
proc send*(p: PubSubPeer, msg: RPCMsg) =
|
|
|
|
asyncCheck sendImpl(p, msg)
|
2019-12-06 02:16:18 +00:00
|
|
|
|
2020-07-08 00:33:05 +00:00
|
|
|
proc `$`*(p: PubSubPeer): string =
|
2020-09-01 07:33:03 +00:00
|
|
|
$p.peerId
|
2020-07-08 00:33:05 +00:00
|
|
|
|
2020-08-12 00:05:49 +00:00
|
|
|
proc newPubSubPeer*(peerId: PeerID,
|
2020-09-01 07:33:03 +00:00
|
|
|
getConn: GetConn,
|
2020-08-12 00:05:49 +00:00
|
|
|
codec: string): PubSubPeer =
|
2019-09-10 02:15:52 +00:00
|
|
|
new result
|
2020-09-01 07:33:03 +00:00
|
|
|
result.getConn = getConn
|
2020-08-12 00:05:49 +00:00
|
|
|
result.codec = codec
|
|
|
|
result.peerId = peerId
|
2020-08-17 19:39:39 +00:00
|
|
|
result.dialLock = newAsyncLock()
|