decorate observers hooks with {.raises: [Defect].}

move hooks logic out into standalone procs

License: MIT
Signed-off-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Dmitriy Ryajov 2020-05-29 10:46:27 -06:00
parent 4df151a3a3
commit 86e1c8169c
2 changed files with 27 additions and 16 deletions

View File

@ -24,8 +24,8 @@ logScope:
type
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe.}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe.}
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj
proto: string # the protocol that this peer joined from
@ -51,6 +51,18 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
p.sendConn = conn
p.onConnect.fire()
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
obs.onRecv(p, msg)
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
obs.onSend(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "handling pubsub rpc", peer = p.id, closed = conn.closed
try:
@ -65,9 +77,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
var msg = decodeRpcMsg(data)
trace "decoded msg from peer", peer = p.id, msg = msg.shortLog
# trigger hooks
for obs in p.observers[]:
obs.onRecv(p, msg)
p.recvObservers(msg) # hooks can modify the message
await p.handler(p, @[msg])
p.recvdRpcCache.put(digest)
except CatchableError as exc:
@ -80,13 +92,12 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
for m in msgs.items:
trace "sending msgs to peer", toPeer = p.id, msgs = $msgs
let encoded = encodeRpcMsg(m)
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
var mm = m
for obs in p.observers[]:
obs.onSend(p, mm)
# trigger send hooks
var mm = m # hooks can modify the message
p.sendObservers(mm)
let encoded = encodeRpcMsg(mm)
if encoded.buffer.len <= 0:
trace "empty message, skipping", peer = p.id
return
@ -111,7 +122,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
p.onConnect.clear()
# if no connection has been set,
# queue messages untill a connection
# queue messages until a connection
# becomes available
asyncCheck sendToRemote()

View File

@ -60,11 +60,11 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
return
withExceptions:
var writen = 0
while not s.client.closed and writen < msg.len:
writen += await s.client.write(msg[writen..<msg.len])
var written = 0
while not s.client.closed and written < msg.len:
written += await s.client.write(msg[written..<msg.len])
if writen < msg.len:
if written < msg.len:
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
method closed*(s: ChronosStream): bool {.inline.} =