{.async: (raises).} for relay/utils.nim (#1058)

This commit is contained in:
Etan Kissling 2024-03-07 10:45:25 +01:00 committed by GitHub
parent ca01ee06a8
commit 61b299e411
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,5 +1,5 @@
# Nim-LibP2P # Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH # Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of # Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT)) # * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -21,36 +21,48 @@ const
RelayV2HopCodec* = "/libp2p/circuit/relay/0.2.0/hop" RelayV2HopCodec* = "/libp2p/circuit/relay/0.2.0/hop"
RelayV2StopCodec* = "/libp2p/circuit/relay/0.2.0/stop" RelayV2StopCodec* = "/libp2p/circuit/relay/0.2.0/stop"
proc sendStatus*(conn: Connection, code: StatusV1) {.async.} = proc sendStatus*(
conn: Connection,
code: StatusV1
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "send relay/v1 status", status = $code & "(" & $ord(code) & ")" trace "send relay/v1 status", status = $code & "(" & $ord(code) & ")"
let let
msg = RelayMessage(msgType: Opt.some(RelayType.Status), status: Opt.some(code)) msg = RelayMessage(
msgType: Opt.some(RelayType.Status), status: Opt.some(code))
pb = encode(msg) pb = encode(msg)
await conn.writeLp(pb.buffer) conn.writeLp(pb.buffer)
proc sendHopStatus*(conn: Connection, code: StatusV2) {.async.} = proc sendHopStatus*(
conn: Connection,
code: StatusV2
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "send hop relay/v2 status", status = $code & "(" & $ord(code) & ")" trace "send hop relay/v2 status", status = $code & "(" & $ord(code) & ")"
let let
msg = HopMessage(msgType: HopMessageType.Status, status: Opt.some(code)) msg = HopMessage(msgType: HopMessageType.Status, status: Opt.some(code))
pb = encode(msg) pb = encode(msg)
await conn.writeLp(pb.buffer) conn.writeLp(pb.buffer)
proc sendStopStatus*(conn: Connection, code: StatusV2) {.async.} = proc sendStopStatus*(
conn: Connection,
code: StatusV2
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "send stop relay/v2 status", status = $code & " (" & $ord(code) & ")" trace "send stop relay/v2 status", status = $code & " (" & $ord(code) & ")"
let let
msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code)) msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code))
pb = encode(msg) pb = encode(msg)
await conn.writeLp(pb.buffer) conn.writeLp(pb.buffer)
proc bridge*(connSrc: Connection, connDst: Connection) {.async.} = proc bridge*(
connSrc: Connection,
connDst: Connection) {.async: (raises: [CancelledError]).} =
const bufferSize = 4096 const bufferSize = 4096
var var
bufSrcToDst: array[bufferSize, byte] bufSrcToDst: array[bufferSize, byte]
bufDstToSrc: array[bufferSize, byte] bufDstToSrc: array[bufferSize, byte]
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.high + 1) futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.len)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.high + 1) futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.len)
bytesSendFromSrcToDst = 0 bytesSentFromSrcToDst = 0
bytesSendFromDstToSrc = 0 bytesSentFromDstToSrc = 0
bufRead: int bufRead: int
try: try:
@ -61,25 +73,25 @@ proc bridge*(connSrc: Connection, connDst: Connection) {.async.} =
if futSrc.finished(): if futSrc.finished():
bufRead = await futSrc bufRead = await futSrc
if bufRead > 0: if bufRead > 0:
bytesSendFromSrcToDst.inc(bufRead) bytesSentFromSrcToDst.inc(bufRead)
await connDst.write(@bufSrcToDst[0 ..< bufRead]) await connDst.write(@bufSrcToDst[0 ..< bufRead])
zeroMem(addr(bufSrcToDst), bufSrcToDst.high + 1) zeroMem(addr bufSrcToDst[0], bufSrcToDst.len)
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.high + 1) futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.len)
if futDst.finished(): if futDst.finished():
bufRead = await futDst bufRead = await futDst
if bufRead > 0: if bufRead > 0:
bytesSendFromDstToSrc += bufRead bytesSentFromDstToSrc += bufRead
await connSrc.write(bufDstToSrc[0 ..< bufRead]) await connSrc.write(bufDstToSrc[0 ..< bufRead])
zeroMem(addr(bufDstToSrc), bufDstToSrc.high + 1) zeroMem(addr bufDstToSrc[0], bufDstToSrc.len)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.high + 1) futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.len)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except LPStreamError as exc:
if connSrc.closed() or connSrc.atEof(): if connSrc.closed() or connSrc.atEof():
trace "relay src closed connection", src = connSrc.peerId trace "relay src closed connection", src = connSrc.peerId
if connDst.closed() or connDst.atEof(): if connDst.closed() or connDst.atEof():
trace "relay dst closed connection", dst = connDst.peerId trace "relay dst closed connection", dst = connDst.peerId
trace "relay error", exc=exc.msg trace "relay error", exc=exc.msg
trace "end relaying", bytesSendFromSrcToDst, bytesSendFromDstToSrc trace "end relaying", bytesSentFromSrcToDst, bytesSentFromDstToSrc
await futSrc.cancelAndWait() await futSrc.cancelAndWait()
await futDst.cancelAndWait() await futDst.cancelAndWait()