send messages in batches

This commit is contained in:
Dmitriy Ryajov 2020-01-07 02:04:02 -06:00
parent cd8961cfb9
commit 667691f784
4 changed files with 44 additions and 27 deletions

View File

@ -83,9 +83,11 @@ method rpcHandler*(f: FloodSub,
await h(t, msg.data) # trigger user provided handler await h(t, msg.data) # trigger user provided handler
# forward the message to all peers interested in it # forward the message to all peers interested in it
var sent: seq[Future[void]]
for p in toSendPeers: for p in toSendPeers:
if p in f.peers and f.peers[p].id != peer.id: if p in f.peers and f.peers[p].id != peer.id:
await f.peers[p].send(@[RPCMsg(messages: m.messages)]) sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
await allFutures(sent)
method init(f: FloodSub) = method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
@ -114,9 +116,11 @@ method publish*(f: FloodSub,
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
let msg = newMessage(f.peerInfo, data, topic) let msg = newMessage(f.peerInfo, data, topic)
var sent: seq[Future[void]]
for p in f.floodsub[topic]: for p in f.floodsub[topic]:
trace "publishing message", name = topic, peer = p, data = data trace "publishing message", name = topic, peer = p, data = data
await f.peers[p].send(@[RPCMsg(messages: @[msg])]) sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
await allFutures(sent)
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =

View File

@ -40,7 +40,7 @@ const GossipSubHeartbeatInitialDelay* = 100.millis
const GossipSubHeartbeatInterval* = 1.seconds const GossipSubHeartbeatInterval* = 1.seconds
# fanout ttl # fanout ttl
const GossipSubFanoutTTL* = 60.seconds const GossipSubFanoutTTL* = 1.minutes
type type
GossipSub* = ref object of FloodSub GossipSub* = ref object of FloodSub
@ -52,7 +52,7 @@ type
control*: Table[string, ControlMessage] # pending control messages control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache mcache*: MCache # messages cache
heartbeatCancel*: Future[void] # cancelation future for heartbeat interval heartbeatCancel*: Future[void] # cancelation future for heartbeat interval
heartbeatLock: AsyncLock heartbeatLock: AsyncLock # hearbeat lock to prevent two concecutive concurent hearbeats
# TODO: This belong in chronos, temporary left here until chronos is updated # TODO: This belong in chronos, temporary left here until chronos is updated
proc addInterval(every: Duration, cb: CallbackFunc, proc addInterval(every: Duration, cb: CallbackFunc,
@ -206,15 +206,21 @@ method rpcHandler(g: GossipSub,
# forward the message to all peers interested in it # forward the message to all peers interested in it
for p in toSendPeers: for p in toSendPeers:
if p in g.peers and if p in g.peers:
g.peers[p].peerInfo.peerId != peer.peerInfo.peerId:
let id = g.peers[p].peerInfo.peerId let id = g.peers[p].peerInfo.peerId
trace "about to forward message to peer", peerId = id
if id != peer.peerInfo.peerId:
let msgs = m.messages.filterIt( let msgs = m.messages.filterIt(
# don't forward to message originator # don't forward to message originator
id != it.fromPeerId() id != it.fromPeerId()
) )
var sent: seq[Future[void]]
if msgs.len > 0: if msgs.len > 0:
await g.peers[p].send(@[RPCMsg(messages: msgs)]) trace "forwarding message to", peerId = id
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)]))
await allFutures(sent)
var respControl: ControlMessage var respControl: ControlMessage
if m.control.isSome: if m.control.isSome:
@ -336,9 +342,9 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
result[id].ihave.add(ihave) result[id].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
await g.heartbeatLock.acquire()
trace "running heartbeat" trace "running heartbeat"
await g.heartbeatLock.acquire()
await sleepAsync(GossipSubHeartbeatInitialDelay) await sleepAsync(GossipSubHeartbeatInitialDelay)
for t in g.mesh.keys: for t in g.mesh.keys:
@ -391,13 +397,15 @@ method publish*(g: GossipSub,
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
let msg = newMessage(g.peerInfo, data, topic) let msg = newMessage(g.peerInfo, data, topic)
var sent: seq[Future[void]]
for p in peers: for p in peers:
if p == g.peerInfo.id: if p == g.peerInfo.id:
continue continue
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
g.mcache.put(msg) g.mcache.put(msg)
await g.peers[p].send(@[RPCMsg(messages: @[msg])]) sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
await allFutures(sent)
method start*(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} =
## start pubsub ## start pubsub

View File

@ -90,7 +90,9 @@ proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
peer.refs.dec() # decrement refcount peer.refs.dec() # decrement refcount
p.cleanupLock.release() p.cleanupLock.release()
proc getPeer(p: PubSub, peerInfo: PeerInfo, proto: string): PubSubPeer = proc getPeer(p: PubSub,
peerInfo: PeerInfo,
proto: string): PubSubPeer =
if peerInfo.id in p.peers: if peerInfo.id in p.peers:
result = p.peers[peerInfo.id] result = p.peers[peerInfo.id]
return return
@ -234,8 +236,8 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
# TODO: add timeout to validator # TODO: add timeout to validator
pending.add(p.validators[topic].mapIt(it(topic, message))) pending.add(p.validators[topic].mapIt(it(topic, message)))
await allFutures(pending) # await all futures await allFutures(pending)
if pending.allIt(it.read()): # if there are failed if pending.allIt(it.read()): # only if all passed
result = true result = true
proc newPubSub*(p: typedesc[PubSub], proc newPubSub*(p: typedesc[PubSub],

View File

@ -28,8 +28,8 @@ type
peerInfo*: PeerInfo peerInfo*: PeerInfo
handler*: RPCHandler handler*: RPCHandler
topics*: seq[string] topics*: seq[string]
sentRpcCache: TimedCache[string] # a cache of already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # a cache of already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages
refs*: int # refcount of the connections this peer is handling refs*: int # refcount of the connections this peer is handling
onConnect: AsyncEvent onConnect: AsyncEvent
@ -65,6 +65,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
error "exception occured", exc = exc.msg error "exception occured", exc = exc.msg
finally: finally:
trace "exiting pubsub peer read loop", peer = p.id trace "exiting pubsub peer read loop", peer = p.id
if not conn.closed():
await conn.close()
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
try: try:
@ -94,7 +96,8 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
p.onConnect.wait().addCallback do (udata: pointer): p.onConnect.wait().addCallback do (udata: pointer):
asyncCheck sendToRemote() asyncCheck sendToRemote()
trace "enqueued message to send at a later time" trace "enqueued message to send at a later time", peer = p.id,
encoded = encodedHex
except CatchableError as exc: except CatchableError as exc:
trace "exception occured", exc = exc.msg trace "exception occured", exc = exc.msg