mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-16 14:58:11 +00:00
we use semaphores to limit simultaneous transmissions. can speedup message reception for large messages
This commit is contained in:
parent
84659af45b
commit
1e2d733c8d
@ -28,7 +28,8 @@ import ./errors as pubsub_errors,
|
||||
../../peerid,
|
||||
../../peerinfo,
|
||||
../../errors,
|
||||
../../utility
|
||||
../../utility,
|
||||
../../utils/semaphore
|
||||
|
||||
import metrics
|
||||
import stew/results
|
||||
@ -80,6 +81,11 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab
|
||||
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])
|
||||
|
||||
#The number of simultaneous transmissions. A smaller number can speedup message reception and relaying
|
||||
#ideally this should be an adaptive number, increasing for low bandwidth peers and decreasing for high bandwidth peers
|
||||
const
|
||||
DefaultMaxSimultaneousTx* = 2
|
||||
|
||||
type
|
||||
InitializationError* = object of LPError
|
||||
|
||||
@ -128,6 +134,7 @@ type
|
||||
rng*: ref HmacDrbgContext
|
||||
|
||||
knownTopics*: HashSet[string]
|
||||
semTxLimit: AsyncSemaphore
|
||||
|
||||
method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
||||
## handle peer disconnects
|
||||
@ -311,7 +318,7 @@ method getOrCreatePeer*(
|
||||
p.onPubSubPeerEvent(peer, event)
|
||||
|
||||
# create new pubsub peer
|
||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
|
||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, addr p.semTxLimit)
|
||||
debug "created new pubsub peer", peerId
|
||||
|
||||
p.peers[peerId] = pubSubPeer
|
||||
@ -510,6 +517,7 @@ method initPubSub*(p: PubSub)
|
||||
p.observers = new(seq[PubSubObserver])
|
||||
if p.msgIdProvider == nil:
|
||||
p.msgIdProvider = defaultMsgIdProvider
|
||||
p.semTxLimit = newAsyncSemaphore(DefaultMaxSimultaneousTx)
|
||||
|
||||
method addValidator*(p: PubSub,
|
||||
topic: varargs[string],
|
||||
|
@ -19,7 +19,8 @@ import rpc/[messages, message, protobuf],
|
||||
../../stream/connection,
|
||||
../../crypto/crypto,
|
||||
../../protobuf/minprotobuf,
|
||||
../../utility
|
||||
../../utility,
|
||||
../../utils/semaphore
|
||||
|
||||
export peerid, connection, deques
|
||||
|
||||
@ -91,6 +92,7 @@ type
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||
|
||||
semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions
|
||||
rpcmessagequeue: RpcMessageQueue
|
||||
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
|
||||
disconnected: bool
|
||||
@ -311,23 +313,35 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
debug "No send connection", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
|
||||
await sendMsgContinue(conn, conn.writeLp(msg))
|
||||
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] =
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] {.async.}=
|
||||
if p.sendConn == nil or p.sendConn.closed():
|
||||
await sendMsgSlow(p, msg)
|
||||
|
||||
if p.sendConn != nil and not p.sendConn.closed():
|
||||
# Fast path that avoids copying msg (which happens for {.async.})
|
||||
let conn = p.sendConn
|
||||
|
||||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
|
||||
let f = conn.writeLp(msg)
|
||||
if not f.completed():
|
||||
sendMsgContinue(conn, f)
|
||||
else:
|
||||
f
|
||||
else:
|
||||
sendMsgSlow(p, msg)
|
||||
|
||||
if msg.len < 2000: #ideally we should only forward idontwant messages through this fast path
|
||||
let f = conn.writeLp(msg)
|
||||
await f or sleepAsync(5.milliseconds)
|
||||
if not f.completed:
|
||||
asyncSpawn sendMsgContinue(conn, f)
|
||||
else:
|
||||
await p.semTxLimit[].acquire()
|
||||
try:
|
||||
let f = conn.writeLp(msg)
|
||||
#ideally sleep time should be based on peer bandwidth and message size
|
||||
await f or sleepAsync(450.milliseconds)
|
||||
|
||||
if not f.completed:
|
||||
asyncSpawn sendMsgContinue(conn, f)
|
||||
p.semTxLimit[].release()
|
||||
|
||||
except CatchableError as exc:
|
||||
p.semTxLimit[].release()
|
||||
await conn.close()
|
||||
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] =
|
||||
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
|
||||
##
|
||||
@ -499,6 +513,7 @@ proc new*(
|
||||
onEvent: OnEvent,
|
||||
codec: string,
|
||||
maxMessageSize: int,
|
||||
sem: ptr AsyncSemaphore,
|
||||
maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue,
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
|
||||
|
||||
@ -516,3 +531,4 @@ proc new*(
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
result.startSendNonPriorityTask()
|
||||
result.semTxLimit = sem
|
||||
|
Loading…
x
Reference in New Issue
Block a user