diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 55794d873..0b7b41264 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -24,8 +24,8 @@ logScope: type PubSubObserver* = ref object - onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe.} - onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe.} + onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} + onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} PubSubPeer* = ref object of RootObj proto: string # the protocol that this peer joined from @@ -51,6 +51,18 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = p.sendConn = conn p.onConnect.fire() +proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = + # trigger hooks + if not(isNil(p.observers)) and p.observers[].len > 0: + for obs in p.observers[]: + obs.onRecv(p, msg) + +proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = + # trigger hooks + if not(isNil(p.observers)) and p.observers[].len > 0: + for obs in p.observers[]: + obs.onSend(p, msg) + proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "handling pubsub rpc", peer = p.id, closed = conn.closed try: @@ -65,9 +77,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = var msg = decodeRpcMsg(data) trace "decoded msg from peer", peer = p.id, msg = msg.shortLog - # trigger hooks - for obs in p.observers[]: - obs.onRecv(p, msg) + + p.recvObservers(msg) # hooks can modify the message + await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) except CatchableError as exc: @@ -80,13 +92,12 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = for m in msgs.items: trace "sending msgs to peer", toPeer = p.id, msgs = $msgs - let encoded = encodeRpcMsg(m) - # trigger hooks - if not(isNil(p.observers)) and p.observers[].len > 0: - var mm = m - for obs in p.observers[]: - obs.onSend(p, mm) + # trigger send hooks + var mm = m # hooks can modify the message + p.sendObservers(mm) + + let encoded = encodeRpcMsg(mm) if encoded.buffer.len <= 0: trace "empty message, skipping", peer = p.id return @@ -111,7 +122,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = p.onConnect.clear() # if no connection has been set, - # queue messages untill a connection + # queue messages until a connection # becomes available asyncCheck sendToRemote() diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 2ebe529f7..609d0a47d 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -60,11 +60,11 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = return withExceptions: - var writen = 0 - while not s.client.closed and writen < msg.len: - writen += await s.client.write(msg[writen..