better exception handling

This commit is contained in:
Dmitriy Ryajov 2020-05-23 10:50:29 -06:00
parent d83ce4c932
commit 93e5805c01
3 changed files with 32 additions and 43 deletions

View File

@ -118,7 +118,7 @@ proc internalCleanup(p: PubSub, conn: Connection) {.async.} =
var peer = p.getPeer(conn.peerInfo, p.codec) var peer = p.getPeer(conn.peerInfo, p.codec)
await conn.closeEvent.wait() await conn.closeEvent.wait()
trace "connection closed, cleaning up peer", peer = conn.peerInfo.id trace "pubsub conn closed, cleaning up peer", peer = conn.peerInfo.id
await p.cleanUpHelper(peer) await p.cleanUpHelper(peer)
method handleConn*(p: PubSub, method handleConn*(p: PubSub,

View File

@ -78,52 +78,42 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
await conn.close() await conn.close()
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
try: for m in msgs.items:
for m in msgs.items: trace "sending msgs to peer", toPeer = p.id, msgs = msgs
trace "sending msgs to peer", toPeer = p.id, msgs = msgs let encoded = encodeRpcMsg(m)
let encoded = encodeRpcMsg(m) # trigger hooks
# trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0:
if not(isNil(p.observers)) and p.observers[].len > 0: var mm = m
var mm = m for obs in p.observers[]:
for obs in p.observers[]: obs.onSend(p, mm)
obs.onSend(p, mm)
if encoded.buffer.len <= 0: if encoded.buffer.len <= 0:
trace "empty message, skipping", peer = p.id trace "empty message, skipping", peer = p.id
return return
let digest = $(sha256.digest(encoded.buffer)) let digest = $(sha256.digest(encoded.buffer))
if digest in p.sentRpcCache: if digest in p.sentRpcCache:
trace "message already sent to peer, skipping", peer = p.id trace "message already sent to peer, skipping", peer = p.id
continue continue
proc sendToRemote() {.async.} = proc sendToRemote() {.async.} =
trace "about send message", peer = p.id, try:
encoded = digest trace "about to send message", peer = p.id,
encoded = digest
await p.onConnect.wait() await p.onConnect.wait()
try: trace "sending encoded msgs to peer", peer = p.id,
trace "sending encoded msgs to peer", peer = p.id, encoded = encoded.buffer.shortLog
encoded = encoded.buffer.shortLog await p.sendConn.writeLp(encoded.buffer)
await p.sendConn.writeLp(encoded.buffer) p.sentRpcCache.put(digest)
p.sentRpcCache.put(digest) except CatchableError as exc:
except CatchableError as exc: trace "unable to send to remote", exc = exc.msg
trace "unable to send to remote", exc = exc.msg p.sendConn = nil
if not(isNil(p.sendConn)): p.onConnect.clear()
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
# if no connection has been set, # if no connection has been set,
# queue messages untill a connection # queue messages untill a connection
# becomes available # becomes available
asyncCheck sendToRemote() asyncCheck sendToRemote()
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.send", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
proc sendMsg*(p: PubSubPeer, proc sendMsg*(p: PubSubPeer,
peerId: PeerID, peerId: PeerID,

View File

@ -244,7 +244,6 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
var pb = initProtoBuffer(msg) var pb = initProtoBuffer(msg)
result.subscriptions = newSeq[SubOpts]()
while true: while true:
# decode SubOpts array # decode SubOpts array
var field = pb.enterSubMessage() var field = pb.enterSubMessage()