From 667691f784046c707f740fc8f8e39cc65ff9a628 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 7 Jan 2020 02:04:02 -0600 Subject: [PATCH] send messages in batches --- libp2p/protocols/pubsub/floodsub.nim | 10 ++++-- libp2p/protocols/pubsub/gossipsub.nim | 44 +++++++++++++++----------- libp2p/protocols/pubsub/pubsub.nim | 8 +++-- libp2p/protocols/pubsub/pubsubpeer.nim | 9 ++++-- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index bb99af097..786ecb090 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -56,7 +56,7 @@ method rpcHandler*(f: FloodSub, rpcMsgs: seq[RPCMsg]) {.async.} = await procCall PubSub(f).rpcHandler(peer, rpcMsgs) - for m in rpcMsgs: # for all RPC messages + for m in rpcMsgs: # for all RPC messages if m.messages.len > 0: # if there are any messages var toSendPeers: HashSet[string] = initHashSet[string]() for msg in m.messages: # for every message @@ -83,9 +83,11 @@ method rpcHandler*(f: FloodSub, await h(t, msg.data) # trigger user provided handler # forward the message to all peers interested in it + var sent: seq[Future[void]] for p in toSendPeers: if p in f.peers and f.peers[p].id != peer.id: - await f.peers[p].send(@[RPCMsg(messages: m.messages)]) + sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) + await allFutures(sent) method init(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -114,9 +116,11 @@ method publish*(f: FloodSub, trace "publishing on topic", name = topic let msg = newMessage(f.peerInfo, data, topic) + var sent: seq[Future[void]] for p in f.floodsub[topic]: trace "publishing message", name = topic, peer = p, data = data - await f.peers[p].send(@[RPCMsg(messages: @[msg])]) + sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) + await allFutures(sent) method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index dd64a6d5b..ab8c71657 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -40,19 +40,19 @@ const GossipSubHeartbeatInitialDelay* = 100.millis const GossipSubHeartbeatInterval* = 1.seconds # fanout ttl -const GossipSubFanoutTTL* = 60.seconds +const GossipSubFanoutTTL* = 1.minutes type GossipSub* = ref object of FloodSub - mesh*: Table[string, HashSet[string]] # meshes - topic to peer - fanout*: Table[string, HashSet[string]] # fanout - topic to peer + mesh*: Table[string, HashSet[string]] # meshes - topic to peer + fanout*: Table[string, HashSet[string]] # fanout - topic to peer gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers - lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics - gossip*: Table[string, seq[ControlIHave]] # pending gossip - control*: Table[string, ControlMessage] # pending control messages - mcache*: MCache # messages cache - heartbeatCancel*: Future[void] # cancelation future for heartbeat interval - heartbeatLock: AsyncLock + lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics + gossip*: Table[string, seq[ControlIHave]] # pending gossip + control*: Table[string, ControlMessage] # pending control messages + mcache*: MCache # messages cache + heartbeatCancel*: Future[void] # cancelation future for heartbeat interval + heartbeatLock: AsyncLock # hearbeat lock to prevent two concecutive concurent hearbeats # TODO: This belong in chronos, temporary left here until chronos is updated proc addInterval(every: Duration, cb: CallbackFunc, @@ -206,15 +206,21 @@ method rpcHandler(g: GossipSub, # forward the message to all peers interested in it for p in toSendPeers: - if p in g.peers and - g.peers[p].peerInfo.peerId != peer.peerInfo.peerId: + if p in g.peers: let id = g.peers[p].peerInfo.peerId - let msgs = m.messages.filterIt( + trace "about to forward message to peer", peerId = id + + if id != peer.peerInfo.peerId: + let msgs = m.messages.filterIt( # don't forward to message originator - id != it.fromPeerId() - ) - if msgs.len > 0: - await g.peers[p].send(@[RPCMsg(messages: msgs)]) + id != it.fromPeerId() + ) + + var sent: seq[Future[void]] + if msgs.len > 0: + trace "forwarding message to", peerId = id + sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) + await allFutures(sent) var respControl: ControlMessage if m.control.isSome: @@ -336,9 +342,9 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = result[id].ihave.add(ihave) proc heartbeat(g: GossipSub) {.async.} = + await g.heartbeatLock.acquire() trace "running heartbeat" - await g.heartbeatLock.acquire() await sleepAsync(GossipSubHeartbeatInitialDelay) for t in g.mesh.keys: @@ -391,13 +397,15 @@ method publish*(g: GossipSub, g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) let msg = newMessage(g.peerInfo, data, topic) + var sent: seq[Future[void]] for p in peers: if p == g.peerInfo.id: continue trace "publishing on topic", name = topic g.mcache.put(msg) - await g.peers[p].send(@[RPCMsg(messages: @[msg])]) + sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) + await allFutures(sent) method start*(g: GossipSub) {.async.} = ## start pubsub diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 0db63b005..e49946876 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -90,7 +90,9 @@ proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} = peer.refs.dec() # decrement refcount p.cleanupLock.release() -proc getPeer(p: PubSub, peerInfo: PeerInfo, proto: string): PubSubPeer = +proc getPeer(p: PubSub, + peerInfo: PeerInfo, + proto: string): PubSubPeer = if peerInfo.id in p.peers: result = p.peers[peerInfo.id] return @@ -234,8 +236,8 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = # TODO: add timeout to validator pending.add(p.validators[topic].mapIt(it(topic, message))) - await allFutures(pending) # await all futures - if pending.allIt(it.read()): # if there are failed + await allFutures(pending) + if pending.allIt(it.read()): # only if all passed result = true proc newPubSub*(p: typedesc[PubSub], diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index eb8c091d4..d88c88d0a 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -28,8 +28,8 @@ type peerInfo*: PeerInfo handler*: RPCHandler topics*: seq[string] - sentRpcCache: TimedCache[string] # a cache of already sent messages - recvdRpcCache: TimedCache[string] # a cache of already sent messages + sentRpcCache: TimedCache[string] # cache for already sent messages + recvdRpcCache: TimedCache[string] # cache for already received messages refs*: int # refcount of the connections this peer is handling onConnect: AsyncEvent @@ -65,6 +65,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = error "exception occured", exc = exc.msg finally: trace "exiting pubsub peer read loop", peer = p.id + if not conn.closed(): + await conn.close() proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = try: @@ -94,7 +96,8 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = p.onConnect.wait().addCallback do (udata: pointer): asyncCheck sendToRemote() - trace "enqueued message to send at a later time" + trace "enqueued message to send at a later time", peer = p.id, + encoded = encodedHex except CatchableError as exc: trace "exception occured", exc = exc.msg