From 86e1c8169ceba6b5e1138128e8c6c76d0b2c4fc6 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 29 May 2020 10:46:27 -0600 Subject: [PATCH] decorate observers hooks with {.raises: [Defect].} move hooks logic out into standalone procs License: MIT Signed-off-by: Dmitriy Ryajov --- libp2p/protocols/pubsub/pubsubpeer.nim | 35 +++++++++++++++++--------- libp2p/stream/chronosstream.nim | 8 +++--- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 55794d8..0b7b412 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -24,8 +24,8 @@ logScope: type PubSubObserver* = ref object - onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe.} - onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe.} + onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} + onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} PubSubPeer* = ref object of RootObj proto: string # the protocol that this peer joined from @@ -51,6 +51,18 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = p.sendConn = conn p.onConnect.fire() +proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = + # trigger hooks + if not(isNil(p.observers)) and p.observers[].len > 0: + for obs in p.observers[]: + obs.onRecv(p, msg) + +proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = + # trigger hooks + if not(isNil(p.observers)) and p.observers[].len > 0: + for obs in p.observers[]: + obs.onSend(p, msg) + proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "handling pubsub rpc", peer = p.id, closed = conn.closed try: @@ -65,9 +77,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = var msg = decodeRpcMsg(data) trace "decoded msg from peer", peer = p.id, msg = msg.shortLog - # trigger hooks - for obs in p.observers[]: - obs.onRecv(p, msg) + + p.recvObservers(msg) # hooks can modify the message + await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) except CatchableError as exc: @@ -80,13 +92,12 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = for m in msgs.items: trace "sending msgs to peer", toPeer = p.id, msgs = $msgs - let encoded = encodeRpcMsg(m) - # trigger hooks - if not(isNil(p.observers)) and p.observers[].len > 0: - var mm = m - for obs in p.observers[]: - obs.onSend(p, mm) + # trigger send hooks + var mm = m # hooks can modify the message + p.sendObservers(mm) + + let encoded = encodeRpcMsg(mm) if encoded.buffer.len <= 0: trace "empty message, skipping", peer = p.id return @@ -111,7 +122,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = p.onConnect.clear() # if no connection has been set, - # queue messages untill a connection + # queue messages until a connection # becomes available asyncCheck sendToRemote() diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 2ebe529..609d0a4 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -60,11 +60,11 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = return withExceptions: - var writen = 0 - while not s.client.closed and writen < msg.len: - writen += await s.client.write(msg[writen..