diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 800565d44..3efa395e9 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -28,7 +28,8 @@ import ./errors as pubsub_errors, ../../peerid, ../../peerinfo, ../../errors, - ../../utility + ../../utility, + ../../utils/semaphore import metrics import stew/results @@ -80,6 +81,11 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]) +#The number of simultaneous transmissions. A smaller number can speedup message reception and relaying +#ideally this should be an adaptive number, increasing for low bandwidth peers and decreasing for high bandwidth peers +const + DefaultMaxSimultaneousTx* = 2 + type InitializationError* = object of LPError @@ -128,6 +134,7 @@ type rng*: ref HmacDrbgContext knownTopics*: HashSet[string] + semTxLimit: AsyncSemaphore method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = ## handle peer disconnects @@ -311,7 +318,7 @@ method getOrCreatePeer*( p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize) + let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, addr p.semTxLimit) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer @@ -510,6 +517,7 @@ method initPubSub*(p: PubSub) p.observers = new(seq[PubSubObserver]) if p.msgIdProvider == nil: p.msgIdProvider = defaultMsgIdProvider + p.semTxLimit = newAsyncSemaphore(DefaultMaxSimultaneousTx) method addValidator*(p: PubSub, topic: varargs[string], diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ce5bdc1af..771e87059 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -19,7 +19,8 @@ import rpc/[messages, message, protobuf], ../../stream/connection, ../../crypto/crypto, ../../protobuf/minprotobuf, - ../../utility + ../../utility, + ../../utils/semaphore export peerid, connection, deques @@ -91,6 +92,7 @@ type behaviourPenalty*: float64 # the eventual penalty score overheadRateLimitOpt*: Opt[TokenBucket] + semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions rpcmessagequeue: RpcMessageQueue maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue. disconnected: bool @@ -311,23 +313,35 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} = debug "No send connection", p, msg = shortLog(msg) return - trace "sending encoded msg to peer", conn, encoded = shortLog(msg) - await sendMsgContinue(conn, conn.writeLp(msg)) - -proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] = +proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] {.async.}= + if p.sendConn == nil or p.sendConn.closed(): + await sendMsgSlow(p, msg) + if p.sendConn != nil and not p.sendConn.closed(): - # Fast path that avoids copying msg (which happens for {.async.}) let conn = p.sendConn - trace "sending encoded msg to peer", conn, encoded = shortLog(msg) - let f = conn.writeLp(msg) - if not f.completed(): - sendMsgContinue(conn, f) - else: - f - else: - sendMsgSlow(p, msg) + if msg.len < 2000: #ideally we should only forward idontwant messages through this fast path + let f = conn.writeLp(msg) + await f or sleepAsync(5.milliseconds) + if not f.completed: + asyncSpawn sendMsgContinue(conn, f) + else: + await p.semTxLimit[].acquire() + try: + let f = conn.writeLp(msg) + #ideally sleep time should be based on peer bandwidth and message size + await f or sleepAsync(450.milliseconds) + + if not f.completed: + asyncSpawn sendMsgContinue(conn, f) + p.semTxLimit[].release() + + except CatchableError as exc: + p.semTxLimit[].release() + await conn.close() + + proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] = ## Asynchronously sends an encoded message to a specified `PubSubPeer`. ## @@ -499,6 +513,7 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, + sem: ptr AsyncSemaphore, maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue, overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = @@ -516,3 +531,4 @@ proc new*( result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[SaltedId])) result.startSendNonPriorityTask() + result.semTxLimit = sem