mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-03 17:40:38 +00:00
Merge remote-tracking branch 'origin/unstable' into publicaddressmapping
This commit is contained in:
commit
4946ee5fa9
15
.github/workflows/bumper.yml
vendored
15
.github/workflows/bumper.yml
vendored
@ -7,14 +7,21 @@ on:
|
|||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
bumpNimbus:
|
bumpProjects:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
target: [
|
||||||
|
{ repo: status-im/nimbus-eth2, branch: unstable },
|
||||||
|
{ repo: status-im/nwaku, branch: master },
|
||||||
|
{ repo: status-im/nim-codex, branch: main }
|
||||||
|
]
|
||||||
steps:
|
steps:
|
||||||
- name: Clone NBC
|
- name: Clone repo
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
repository: status-im/nimbus-eth2
|
repository: ${{ matrix.target.repo }}
|
||||||
ref: unstable
|
ref: ${{ matrix.target.branch }}
|
||||||
path: nbc
|
path: nbc
|
||||||
submodules: true
|
submodules: true
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import chronos, stew/byteutils
|
import chronos, stew/byteutils
|
||||||
import ../libp2p,
|
import ../libp2p,
|
||||||
../libp2p/protocols/relay/[relay, client]
|
../libp2p/protocols/connectivity/relay/[relay, client]
|
||||||
|
|
||||||
# Helper to create a circuit relay node
|
# Helper to create a circuit relay node
|
||||||
proc createCircuitRelaySwitch(r: Relay): Switch =
|
proc createCircuitRelaySwitch(r: Relay): Switch =
|
||||||
|
@ -26,8 +26,8 @@ import
|
|||||||
switch, peerid, peerinfo, stream/connection, multiaddress,
|
switch, peerid, peerinfo, stream/connection, multiaddress,
|
||||||
crypto/crypto, transports/[transport, tcptransport],
|
crypto/crypto, transports/[transport, tcptransport],
|
||||||
muxers/[muxer, mplex/mplex, yamux/yamux],
|
muxers/[muxer, mplex/mplex, yamux/yamux],
|
||||||
protocols/[identify, secure/secure, secure/noise, autonat],
|
protocols/[identify, secure/secure, secure/noise],
|
||||||
protocols/relay/[relay, client, rtransport],
|
protocols/connectivity/[autonat, relay/relay, relay/client, relay/rtransport],
|
||||||
connmanager, upgrademngrs/muxedupgrade,
|
connmanager, upgrademngrs/muxedupgrade,
|
||||||
nameresolving/nameresolver,
|
nameresolving/nameresolver,
|
||||||
errors, utility
|
errors, utility
|
||||||
|
@ -222,6 +222,40 @@ proc onionVB(vb: var VBuffer): bool =
|
|||||||
if vb.readArray(buf) == 12:
|
if vb.readArray(buf) == 12:
|
||||||
result = true
|
result = true
|
||||||
|
|
||||||
|
proc onion3StB(s: string, vb: var VBuffer): bool =
|
||||||
|
try:
|
||||||
|
var parts = s.split(':')
|
||||||
|
if len(parts) != 2:
|
||||||
|
return false
|
||||||
|
if len(parts[0]) != 56:
|
||||||
|
return false
|
||||||
|
var address = Base32Lower.decode(parts[0].toLowerAscii())
|
||||||
|
var nport = parseInt(parts[1])
|
||||||
|
if (nport > 0 and nport < 65536) and len(address) == 35:
|
||||||
|
address.setLen(37)
|
||||||
|
address[35] = cast[byte]((nport shr 8) and 0xFF)
|
||||||
|
address[36] = cast[byte](nport and 0xFF)
|
||||||
|
vb.writeArray(address)
|
||||||
|
result = true
|
||||||
|
except:
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc onion3BtS(vb: var VBuffer, s: var string): bool =
|
||||||
|
## ONION address bufferToString() implementation.
|
||||||
|
var buf: array[37, byte]
|
||||||
|
if vb.readArray(buf) == 37:
|
||||||
|
var nport = (cast[uint16](buf[35]) shl 8) or cast[uint16](buf[36])
|
||||||
|
s = Base32Lower.encode(buf.toOpenArray(0, 34))
|
||||||
|
s.add(":")
|
||||||
|
s.add($nport)
|
||||||
|
result = true
|
||||||
|
|
||||||
|
proc onion3VB(vb: var VBuffer): bool =
|
||||||
|
## ONION address validateBuffer() implementation.
|
||||||
|
var buf: array[37, byte]
|
||||||
|
if vb.readArray(buf) == 37:
|
||||||
|
result = true
|
||||||
|
|
||||||
proc unixStB(s: string, vb: var VBuffer): bool =
|
proc unixStB(s: string, vb: var VBuffer): bool =
|
||||||
## Unix socket name stringToBuffer() implementation.
|
## Unix socket name stringToBuffer() implementation.
|
||||||
if len(s) > 0:
|
if len(s) > 0:
|
||||||
@ -310,6 +344,11 @@ const
|
|||||||
bufferToString: onionBtS,
|
bufferToString: onionBtS,
|
||||||
validateBuffer: onionVB
|
validateBuffer: onionVB
|
||||||
)
|
)
|
||||||
|
TranscoderOnion3* = Transcoder(
|
||||||
|
stringToBuffer: onion3StB,
|
||||||
|
bufferToString: onion3BtS,
|
||||||
|
validateBuffer: onion3VB
|
||||||
|
)
|
||||||
TranscoderDNS* = Transcoder(
|
TranscoderDNS* = Transcoder(
|
||||||
stringToBuffer: dnsStB,
|
stringToBuffer: dnsStB,
|
||||||
bufferToString: dnsBtS,
|
bufferToString: dnsBtS,
|
||||||
@ -363,6 +402,10 @@ const
|
|||||||
mcodec: multiCodec("onion"), kind: Fixed, size: 10,
|
mcodec: multiCodec("onion"), kind: Fixed, size: 10,
|
||||||
coder: TranscoderOnion
|
coder: TranscoderOnion
|
||||||
),
|
),
|
||||||
|
MAProtocol(
|
||||||
|
mcodec: multiCodec("onion3"), kind: Fixed, size: 37,
|
||||||
|
coder: TranscoderOnion3
|
||||||
|
),
|
||||||
MAProtocol(
|
MAProtocol(
|
||||||
mcodec: multiCodec("ws"), kind: Marker, size: 0
|
mcodec: multiCodec("ws"), kind: Marker, size: 0
|
||||||
),
|
),
|
||||||
|
@ -203,6 +203,7 @@ const MultiCodecList = [
|
|||||||
("p2p-webrtc-star", 0x0113), # not in multicodec list
|
("p2p-webrtc-star", 0x0113), # not in multicodec list
|
||||||
("p2p-webrtc-direct", 0x0114), # not in multicodec list
|
("p2p-webrtc-direct", 0x0114), # not in multicodec list
|
||||||
("onion", 0x01BC),
|
("onion", 0x01BC),
|
||||||
|
("onion3", 0x01BD),
|
||||||
("p2p-circuit", 0x0122),
|
("p2p-circuit", 0x0122),
|
||||||
("libp2p-peer-record", 0x0301),
|
("libp2p-peer-record", 0x0301),
|
||||||
("dns", 0x35),
|
("dns", 0x35),
|
||||||
|
@ -58,6 +58,8 @@ type
|
|||||||
initiator*: bool # initiated remotely or locally flag
|
initiator*: bool # initiated remotely or locally flag
|
||||||
isOpen*: bool # has channel been opened
|
isOpen*: bool # has channel been opened
|
||||||
closedLocal*: bool # has channel been closed locally
|
closedLocal*: bool # has channel been closed locally
|
||||||
|
remoteReset*: bool # has channel been remotely reset
|
||||||
|
localReset*: bool # has channel been reset locally
|
||||||
msgCode*: MessageType # cached in/out message code
|
msgCode*: MessageType # cached in/out message code
|
||||||
closeCode*: MessageType # cached in/out close code
|
closeCode*: MessageType # cached in/out close code
|
||||||
resetCode*: MessageType # cached in/out reset code
|
resetCode*: MessageType # cached in/out reset code
|
||||||
@ -103,6 +105,7 @@ proc reset*(s: LPChannel) {.async, gcsafe.} =
|
|||||||
|
|
||||||
s.isClosed = true
|
s.isClosed = true
|
||||||
s.closedLocal = true
|
s.closedLocal = true
|
||||||
|
s.localReset = not s.remoteReset
|
||||||
|
|
||||||
trace "Resetting channel", s, len = s.len
|
trace "Resetting channel", s, len = s.len
|
||||||
|
|
||||||
@ -168,6 +171,14 @@ method readOnce*(s: LPChannel,
|
|||||||
## channels are blocked - in particular, this means that reading from one
|
## channels are blocked - in particular, this means that reading from one
|
||||||
## channel must not be done from within a callback / read handler of another
|
## channel must not be done from within a callback / read handler of another
|
||||||
## or the reads will lock each other.
|
## or the reads will lock each other.
|
||||||
|
if s.remoteReset:
|
||||||
|
raise newLPStreamResetError()
|
||||||
|
if s.localReset:
|
||||||
|
raise newLPStreamClosedError()
|
||||||
|
if s.atEof():
|
||||||
|
raise newLPStreamRemoteClosedError()
|
||||||
|
if s.conn.closed:
|
||||||
|
raise newLPStreamConnDownError()
|
||||||
try:
|
try:
|
||||||
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
|
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
|
||||||
when defined(libp2p_network_protocols_metrics):
|
when defined(libp2p_network_protocols_metrics):
|
||||||
@ -184,13 +195,17 @@ method readOnce*(s: LPChannel,
|
|||||||
# data has been lost in s.readBuf and there's no way to gracefully recover /
|
# data has been lost in s.readBuf and there's no way to gracefully recover /
|
||||||
# use the channel any more
|
# use the channel any more
|
||||||
await s.reset()
|
await s.reset()
|
||||||
raise exc
|
raise newLPStreamConnDownError(exc)
|
||||||
|
|
||||||
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
||||||
# prepareWrite is the slow path of writing a message - see conditions in
|
# prepareWrite is the slow path of writing a message - see conditions in
|
||||||
# write
|
# write
|
||||||
if s.closedLocal or s.conn.closed:
|
if s.remoteReset:
|
||||||
|
raise newLPStreamResetError()
|
||||||
|
if s.closedLocal:
|
||||||
raise newLPStreamClosedError()
|
raise newLPStreamClosedError()
|
||||||
|
if s.conn.closed:
|
||||||
|
raise newLPStreamConnDownError()
|
||||||
|
|
||||||
if msg.len == 0:
|
if msg.len == 0:
|
||||||
return
|
return
|
||||||
@ -235,7 +250,7 @@ proc completeWrite(
|
|||||||
trace "exception in lpchannel write handler", s, msg = exc.msg
|
trace "exception in lpchannel write handler", s, msg = exc.msg
|
||||||
await s.reset()
|
await s.reset()
|
||||||
await s.conn.close()
|
await s.conn.close()
|
||||||
raise exc
|
raise newLPStreamConnDownError(exc)
|
||||||
finally:
|
finally:
|
||||||
s.writes -= 1
|
s.writes -= 1
|
||||||
|
|
||||||
|
@ -183,6 +183,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
|||||||
of MessageType.CloseIn, MessageType.CloseOut:
|
of MessageType.CloseIn, MessageType.CloseOut:
|
||||||
await channel.pushEof()
|
await channel.pushEof()
|
||||||
of MessageType.ResetIn, MessageType.ResetOut:
|
of MessageType.ResetIn, MessageType.ResetOut:
|
||||||
|
channel.remoteReset = true
|
||||||
await channel.reset()
|
await channel.reset()
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
debug "Unexpected cancellation in mplex handler", m
|
debug "Unexpected cancellation in mplex handler", m
|
||||||
|
@ -153,6 +153,7 @@ type
|
|||||||
sendQueue: seq[ToSend]
|
sendQueue: seq[ToSend]
|
||||||
recvQueue: seq[byte]
|
recvQueue: seq[byte]
|
||||||
isReset: bool
|
isReset: bool
|
||||||
|
remoteReset: bool
|
||||||
closedRemotely: Future[void]
|
closedRemotely: Future[void]
|
||||||
closedLocally: bool
|
closedLocally: bool
|
||||||
receivedData: AsyncEvent
|
receivedData: AsyncEvent
|
||||||
@ -194,9 +195,11 @@ method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} =
|
|||||||
await channel.actuallyClose()
|
await channel.actuallyClose()
|
||||||
|
|
||||||
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
|
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
|
||||||
if not channel.isReset:
|
if channel.isReset:
|
||||||
|
return
|
||||||
trace "Reset channel"
|
trace "Reset channel"
|
||||||
channel.isReset = true
|
channel.isReset = true
|
||||||
|
channel.remoteReset = not isLocal
|
||||||
for (d, s, fut) in channel.sendQueue:
|
for (d, s, fut) in channel.sendQueue:
|
||||||
fut.fail(newLPStreamEOFError())
|
fut.fail(newLPStreamEOFError())
|
||||||
channel.sendQueue = @[]
|
channel.sendQueue = @[]
|
||||||
@ -235,7 +238,15 @@ method readOnce*(
|
|||||||
nbytes: int):
|
nbytes: int):
|
||||||
Future[int] {.async.} =
|
Future[int] {.async.} =
|
||||||
|
|
||||||
if channel.returnedEof: raise newLPStreamEOFError()
|
if channel.isReset:
|
||||||
|
raise if channel.remoteReset:
|
||||||
|
newLPStreamResetError()
|
||||||
|
elif channel.closedLocally:
|
||||||
|
newLPStreamClosedError()
|
||||||
|
else:
|
||||||
|
newLPStreamConnDownError()
|
||||||
|
if channel.returnedEof:
|
||||||
|
raise newLPStreamRemoteClosedError()
|
||||||
if channel.recvQueue.len == 0:
|
if channel.recvQueue.len == 0:
|
||||||
channel.receivedData.clear()
|
channel.receivedData.clear()
|
||||||
await channel.closedRemotely or channel.receivedData.wait()
|
await channel.closedRemotely or channel.receivedData.wait()
|
||||||
@ -313,8 +324,9 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
|||||||
channel.sendWindow.dec(toSend)
|
channel.sendWindow.dec(toSend)
|
||||||
try: await channel.conn.write(sendBuffer)
|
try: await channel.conn.write(sendBuffer)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
let connDown = newLPStreamConnDownError(exc)
|
||||||
for fut in futures.items():
|
for fut in futures.items():
|
||||||
fut.fail(exc)
|
fut.fail(connDown)
|
||||||
await channel.reset()
|
await channel.reset()
|
||||||
break
|
break
|
||||||
for fut in futures.items():
|
for fut in futures.items():
|
||||||
@ -323,8 +335,11 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
|||||||
|
|
||||||
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
|
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
|
||||||
result = newFuture[void]("Yamux Send")
|
result = newFuture[void]("Yamux Send")
|
||||||
|
if channel.remoteReset:
|
||||||
|
result.fail(newLPStreamResetError())
|
||||||
|
return result
|
||||||
if channel.closedLocally or channel.isReset:
|
if channel.closedLocally or channel.isReset:
|
||||||
result.fail(newLPStreamEOFError())
|
result.fail(newLPStreamClosedError())
|
||||||
return result
|
return result
|
||||||
if msg.len == 0:
|
if msg.len == 0:
|
||||||
result.complete()
|
result.complete()
|
||||||
@ -396,8 +411,9 @@ method close*(m: Yamux) {.async.} =
|
|||||||
m.isClosed = true
|
m.isClosed = true
|
||||||
|
|
||||||
trace "Closing yamux"
|
trace "Closing yamux"
|
||||||
for channel in m.channels.values:
|
let channels = toSeq(m.channels.values())
|
||||||
await channel.reset()
|
for channel in channels:
|
||||||
|
await channel.reset(true)
|
||||||
await m.connection.write(YamuxHeader.goAway(NormalTermination))
|
await m.connection.write(YamuxHeader.goAway(NormalTermination))
|
||||||
await m.connection.close()
|
await m.connection.close()
|
||||||
trace "Closed yamux"
|
trace "Closed yamux"
|
||||||
@ -453,6 +469,7 @@ method handle*(m: Yamux) {.async, gcsafe.} =
|
|||||||
m.flushed[header.streamId].dec(int(header.length))
|
m.flushed[header.streamId].dec(int(header.length))
|
||||||
if m.flushed[header.streamId] < 0:
|
if m.flushed[header.streamId] < 0:
|
||||||
raise newException(YamuxError, "Peer exhausted the recvWindow after reset")
|
raise newException(YamuxError, "Peer exhausted the recvWindow after reset")
|
||||||
|
if header.length > 0:
|
||||||
var buffer = newSeqUninitialized[byte](header.length)
|
var buffer = newSeqUninitialized[byte](header.length)
|
||||||
await m.connection.readExactly(addr buffer[0], int(header.length))
|
await m.connection.readExactly(addr buffer[0], int(header.length))
|
||||||
continue
|
continue
|
||||||
|
@ -14,13 +14,13 @@ else:
|
|||||||
|
|
||||||
import std/[options, sets, sequtils]
|
import std/[options, sets, sequtils]
|
||||||
import chronos, chronicles, stew/objects
|
import chronos, chronicles, stew/objects
|
||||||
import ./protocol,
|
import ../protocol,
|
||||||
../switch,
|
../../switch,
|
||||||
../multiaddress,
|
../../multiaddress,
|
||||||
../multicodec,
|
../../multicodec,
|
||||||
../peerid,
|
../../peerid,
|
||||||
../utils/semaphore,
|
../../utils/semaphore,
|
||||||
../errors
|
../../errors
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p autonat"
|
topics = "libp2p autonat"
|
@ -20,10 +20,10 @@ import ./relay,
|
|||||||
./messages,
|
./messages,
|
||||||
./rconn,
|
./rconn,
|
||||||
./utils,
|
./utils,
|
||||||
../../peerinfo,
|
../../../peerinfo,
|
||||||
../../switch,
|
../../../switch,
|
||||||
../../multiaddress,
|
../../../multiaddress,
|
||||||
../../stream/connection
|
../../../stream/connection
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
@ -14,8 +14,8 @@ else:
|
|||||||
|
|
||||||
import options, macros, sequtils
|
import options, macros, sequtils
|
||||||
import stew/objects
|
import stew/objects
|
||||||
import ../../peerinfo,
|
import ../../../peerinfo,
|
||||||
../../signed_envelope
|
../../../signed_envelope
|
||||||
|
|
||||||
# Circuit Relay V1 Message
|
# Circuit Relay V1 Message
|
||||||
|
|
@ -14,7 +14,7 @@ else:
|
|||||||
|
|
||||||
import chronos
|
import chronos
|
||||||
|
|
||||||
import ../../stream/connection
|
import ../../../stream/connection
|
||||||
|
|
||||||
type
|
type
|
||||||
RelayConnection* = ref object of Connection
|
RelayConnection* = ref object of Connection
|
@ -19,16 +19,16 @@ import chronos, chronicles
|
|||||||
import ./messages,
|
import ./messages,
|
||||||
./rconn,
|
./rconn,
|
||||||
./utils,
|
./utils,
|
||||||
../../peerinfo,
|
../../../peerinfo,
|
||||||
../../switch,
|
../../../switch,
|
||||||
../../multiaddress,
|
../../../multiaddress,
|
||||||
../../multicodec,
|
../../../multicodec,
|
||||||
../../stream/connection,
|
../../../stream/connection,
|
||||||
../../protocols/protocol,
|
../../../protocols/protocol,
|
||||||
../../transports/transport,
|
../../../transports/transport,
|
||||||
../../errors,
|
../../../errors,
|
||||||
../../utils/heartbeat,
|
../../../utils/heartbeat,
|
||||||
../../signed_envelope
|
../../../signed_envelope
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# * Eventually replace std/times by chronos/timer. Currently chronos/timer
|
# * Eventually replace std/times by chronos/timer. Currently chronos/timer
|
@ -19,9 +19,9 @@ import chronos, chronicles
|
|||||||
import ./client,
|
import ./client,
|
||||||
./rconn,
|
./rconn,
|
||||||
./utils,
|
./utils,
|
||||||
../../switch,
|
../../../switch,
|
||||||
../../stream/connection,
|
../../../stream/connection,
|
||||||
../../transports/transport
|
../../../transports/transport
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p relay relay-transport"
|
topics = "libp2p relay relay-transport"
|
@ -17,7 +17,7 @@ import options
|
|||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
|
|
||||||
import ./messages,
|
import ./messages,
|
||||||
../../stream/connection
|
../../../stream/connection
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p relay relay-utils"
|
topics = "libp2p relay relay-utils"
|
@ -79,7 +79,7 @@ method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
|
|||||||
&"Only one concurrent push allowed for stream {s.shortLog()}")
|
&"Only one concurrent push allowed for stream {s.shortLog()}")
|
||||||
|
|
||||||
if s.isClosed or s.pushedEof:
|
if s.isClosed or s.pushedEof:
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamClosedError()
|
||||||
|
|
||||||
if data.len == 0:
|
if data.len == 0:
|
||||||
return # Don't push 0-length buffers, these signal EOF
|
return # Don't push 0-length buffers, these signal EOF
|
||||||
|
@ -59,7 +59,18 @@ type
|
|||||||
LPStreamWriteError* = object of LPStreamError
|
LPStreamWriteError* = object of LPStreamError
|
||||||
par*: ref CatchableError
|
par*: ref CatchableError
|
||||||
LPStreamEOFError* = object of LPStreamError
|
LPStreamEOFError* = object of LPStreamError
|
||||||
LPStreamClosedError* = object of LPStreamError
|
|
||||||
|
# X | Read | Write
|
||||||
|
# Local close | Works | LPStreamClosedError
|
||||||
|
# Remote close | LPStreamRemoteClosedError | Works
|
||||||
|
# Local reset | LPStreamClosedError | LPStreamClosedError
|
||||||
|
# Remote reset | LPStreamResetError | LPStreamResetError
|
||||||
|
# Connection down | LPStreamConnDown | LPStreamConnDownError
|
||||||
|
|
||||||
|
LPStreamResetError* = object of LPStreamEOFError
|
||||||
|
LPStreamClosedError* = object of LPStreamEOFError
|
||||||
|
LPStreamRemoteClosedError* = object of LPStreamEOFError
|
||||||
|
LPStreamConnDownError* = object of LPStreamEOFError
|
||||||
|
|
||||||
InvalidVarintError* = object of LPStreamError
|
InvalidVarintError* = object of LPStreamError
|
||||||
MaxSizeError* = object of LPStreamError
|
MaxSizeError* = object of LPStreamError
|
||||||
@ -119,9 +130,22 @@ proc newLPStreamIncorrectDefect*(m: string): ref LPStreamIncorrectDefect =
|
|||||||
proc newLPStreamEOFError*(): ref LPStreamEOFError =
|
proc newLPStreamEOFError*(): ref LPStreamEOFError =
|
||||||
result = newException(LPStreamEOFError, "Stream EOF!")
|
result = newException(LPStreamEOFError, "Stream EOF!")
|
||||||
|
|
||||||
|
proc newLPStreamResetError*(): ref LPStreamResetError =
|
||||||
|
result = newException(LPStreamResetError, "Stream Reset!")
|
||||||
|
|
||||||
proc newLPStreamClosedError*(): ref LPStreamClosedError =
|
proc newLPStreamClosedError*(): ref LPStreamClosedError =
|
||||||
result = newException(LPStreamClosedError, "Stream Closed!")
|
result = newException(LPStreamClosedError, "Stream Closed!")
|
||||||
|
|
||||||
|
proc newLPStreamRemoteClosedError*(): ref LPStreamRemoteClosedError =
|
||||||
|
result = newException(LPStreamRemoteClosedError, "Stream Remotely Closed!")
|
||||||
|
|
||||||
|
proc newLPStreamConnDownError*(
|
||||||
|
parentException: ref Exception = nil): ref LPStreamConnDownError =
|
||||||
|
result = newException(
|
||||||
|
LPStreamConnDownError,
|
||||||
|
"Stream Underlying Connection Closed!",
|
||||||
|
parentException)
|
||||||
|
|
||||||
func shortLog*(s: LPStream): auto =
|
func shortLog*(s: LPStream): auto =
|
||||||
if s.isNil: "LPStream(nil)"
|
if s.isNil: "LPStream(nil)"
|
||||||
else: $s.oid
|
else: $s.oid
|
||||||
@ -165,6 +189,8 @@ proc readExactly*(s: LPStream,
|
|||||||
## Waits for `nbytes` to be available, then read
|
## Waits for `nbytes` to be available, then read
|
||||||
## them and return them
|
## them and return them
|
||||||
if s.atEof:
|
if s.atEof:
|
||||||
|
var ch: char
|
||||||
|
discard await s.readOnce(addr ch, 1)
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamEOFError()
|
||||||
|
|
||||||
if nbytes == 0:
|
if nbytes == 0:
|
||||||
@ -183,6 +209,10 @@ proc readExactly*(s: LPStream,
|
|||||||
if read == 0:
|
if read == 0:
|
||||||
doAssert s.atEof()
|
doAssert s.atEof()
|
||||||
trace "couldn't read all bytes, stream EOF", s, nbytes, read
|
trace "couldn't read all bytes, stream EOF", s, nbytes, read
|
||||||
|
# Re-readOnce to raise a more specific error than EOF
|
||||||
|
# Raise EOF if it doesn't raise anything(shouldn't happen)
|
||||||
|
discard await s.readOnce(addr pbuffer[read], nbytes - read)
|
||||||
|
warn "Read twice while at EOF"
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamEOFError()
|
||||||
|
|
||||||
if read < nbytes:
|
if read < nbytes:
|
||||||
@ -200,8 +230,7 @@ proc readLine*(s: LPStream,
|
|||||||
|
|
||||||
while true:
|
while true:
|
||||||
var ch: char
|
var ch: char
|
||||||
if (await readOnce(s, addr ch, 1)) == 0:
|
await readExactly(s, addr ch, 1)
|
||||||
raise newLPStreamEOFError()
|
|
||||||
|
|
||||||
if sep[state] == ch:
|
if sep[state] == ch:
|
||||||
inc(state)
|
inc(state)
|
||||||
@ -224,8 +253,7 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe, public.} =
|
|||||||
buffer: array[10, byte]
|
buffer: array[10, byte]
|
||||||
|
|
||||||
for i in 0..<len(buffer):
|
for i in 0..<len(buffer):
|
||||||
if (await conn.readOnce(addr buffer[i], 1)) == 0:
|
await conn.readExactly(addr buffer[i], 1)
|
||||||
raise newLPStreamEOFError()
|
|
||||||
|
|
||||||
var
|
var
|
||||||
varint: uint64
|
varint: uint64
|
||||||
|
@ -312,12 +312,10 @@ proc start*(s: Switch) {.async, gcsafe, public.} =
|
|||||||
|
|
||||||
await allFutures(startFuts)
|
await allFutures(startFuts)
|
||||||
|
|
||||||
for s in startFuts:
|
for fut in startFuts:
|
||||||
if s.failed:
|
if fut.failed:
|
||||||
# TODO: replace this exception with a `listenError` callback. See
|
await s.stop()
|
||||||
# https://github.com/status-im/nim-libp2p/pull/662 for more info.
|
raise fut.error
|
||||||
raise newException(transport.TransportError,
|
|
||||||
"Failed to start one transport", s.error)
|
|
||||||
|
|
||||||
for t in s.transports: # for each transport
|
for t in s.transports: # for each transport
|
||||||
if t.addrs.len > 0 or t.running:
|
if t.addrs.len > 0 or t.running:
|
||||||
|
@ -25,6 +25,14 @@ template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
|
|||||||
nextHeartbeat += interval
|
nextHeartbeat += interval
|
||||||
let now = Moment.now()
|
let now = Moment.now()
|
||||||
if nextHeartbeat < now:
|
if nextHeartbeat < now:
|
||||||
info "Missed heartbeat", heartbeat = name, delay = now - nextHeartbeat
|
let
|
||||||
nextHeartbeat = now + interval
|
delay = now - nextHeartbeat
|
||||||
|
itv = interval
|
||||||
|
if delay > itv:
|
||||||
|
info "Missed multiple heartbeats", heartbeat = name,
|
||||||
|
delay = delay, hinterval = itv
|
||||||
|
else:
|
||||||
|
debug "Missed heartbeat", heartbeat = name,
|
||||||
|
delay = delay, hinterval = itv
|
||||||
|
nextHeartbeat = now + itv
|
||||||
await sleepAsync(nextHeartbeat - now)
|
await sleepAsync(nextHeartbeat - now)
|
||||||
|
@ -3,7 +3,7 @@ import chronos, chronicles, stew/byteutils
|
|||||||
import helpers
|
import helpers
|
||||||
import ../libp2p
|
import ../libp2p
|
||||||
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
|
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
|
||||||
import ../libp2p/protocols/relay/[relay, client, utils]
|
import ../libp2p/protocols/connectivity/relay/[relay, client, utils]
|
||||||
|
|
||||||
type
|
type
|
||||||
SwitchCreator = proc(
|
SwitchCreator = proc(
|
||||||
|
@ -3,7 +3,7 @@ import chronos
|
|||||||
import
|
import
|
||||||
../libp2p/[
|
../libp2p/[
|
||||||
builders,
|
builders,
|
||||||
protocols/autonat
|
protocols/connectivity/autonat
|
||||||
],
|
],
|
||||||
./helpers
|
./helpers
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ import stublogger
|
|||||||
|
|
||||||
import helpers, commoninterop
|
import helpers, commoninterop
|
||||||
import ../libp2p
|
import ../libp2p
|
||||||
import ../libp2p/crypto/crypto, ../libp2p/protocols/relay/[relay, client]
|
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/[relay, client]
|
||||||
|
|
||||||
proc switchMplexCreator(
|
proc switchMplexCreator(
|
||||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||||
|
@ -119,7 +119,7 @@ suite "Mplex":
|
|||||||
# should still allow reading until buffer EOF
|
# should still allow reading until buffer EOF
|
||||||
await chann.readExactly(addr data[3], 3)
|
await chann.readExactly(addr data[3], 3)
|
||||||
|
|
||||||
expect LPStreamEOFError:
|
expect LPStreamRemoteClosedError:
|
||||||
# this should fail now
|
# this should fail now
|
||||||
await chann.readExactly(addr data[0], 3)
|
await chann.readExactly(addr data[0], 3)
|
||||||
|
|
||||||
@ -143,7 +143,7 @@ suite "Mplex":
|
|||||||
let readFut = chann.readExactly(addr data[3], 3)
|
let readFut = chann.readExactly(addr data[3], 3)
|
||||||
await allFutures(closeFut, readFut)
|
await allFutures(closeFut, readFut)
|
||||||
|
|
||||||
expect LPStreamEOFError:
|
expect LPStreamRemoteClosedError:
|
||||||
await chann.readExactly(addr data[0], 6) # this should fail now
|
await chann.readExactly(addr data[0], 6) # this should fail now
|
||||||
|
|
||||||
await chann.close()
|
await chann.close()
|
||||||
@ -174,7 +174,7 @@ suite "Mplex":
|
|||||||
var buf: array[1, byte]
|
var buf: array[1, byte]
|
||||||
check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read
|
check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read
|
||||||
|
|
||||||
expect LPStreamEOFError:
|
expect LPStreamClosedError:
|
||||||
await chann.pushData(@[byte(1)])
|
await chann.pushData(@[byte(1)])
|
||||||
|
|
||||||
await chann.close()
|
await chann.close()
|
||||||
@ -190,7 +190,7 @@ suite "Mplex":
|
|||||||
|
|
||||||
await chann.reset()
|
await chann.reset()
|
||||||
var data = newSeq[byte](1)
|
var data = newSeq[byte](1)
|
||||||
expect LPStreamEOFError:
|
expect LPStreamClosedError:
|
||||||
await chann.readExactly(addr data[0], 1)
|
await chann.readExactly(addr data[0], 1)
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
@ -205,7 +205,7 @@ suite "Mplex":
|
|||||||
let fut = chann.readExactly(addr data[0], 1)
|
let fut = chann.readExactly(addr data[0], 1)
|
||||||
|
|
||||||
await chann.reset()
|
await chann.reset()
|
||||||
expect LPStreamEOFError:
|
expect LPStreamClosedError:
|
||||||
await fut
|
await fut
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
@ -22,6 +22,8 @@ const
|
|||||||
"/ip6zone/x/ip6/fe80::1/udp/1234/quic",
|
"/ip6zone/x/ip6/fe80::1/udp/1234/quic",
|
||||||
"/onion/timaq4ygg2iegci7:1234",
|
"/onion/timaq4ygg2iegci7:1234",
|
||||||
"/onion/timaq4ygg2iegci7:80/http",
|
"/onion/timaq4ygg2iegci7:80/http",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:80/http",
|
||||||
"/udp/0",
|
"/udp/0",
|
||||||
"/tcp/0",
|
"/tcp/0",
|
||||||
"/sctp/0",
|
"/sctp/0",
|
||||||
@ -79,6 +81,12 @@ const
|
|||||||
"/onion/timaq4ygg2iegci7:-1",
|
"/onion/timaq4ygg2iegci7:-1",
|
||||||
"/onion/timaq4ygg2iegci7",
|
"/onion/timaq4ygg2iegci7",
|
||||||
"/onion/timaq4ygg2iegci@:666",
|
"/onion/timaq4ygg2iegci@:666",
|
||||||
|
"/onion3/9ww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:80",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd7:80",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:0",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:-1",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyy@:666",
|
||||||
"/udp/1234/sctp",
|
"/udp/1234/sctp",
|
||||||
"/udp/1234/udt/1234",
|
"/udp/1234/udt/1234",
|
||||||
"/udp/1234/utp/1234",
|
"/udp/1234/utp/1234",
|
||||||
@ -170,6 +178,12 @@ const
|
|||||||
"/onion/timaq4ygg2iegci7:-1",
|
"/onion/timaq4ygg2iegci7:-1",
|
||||||
"/onion/timaq4ygg2iegci7",
|
"/onion/timaq4ygg2iegci7",
|
||||||
"/onion/timaq4ygg2iegci@:666",
|
"/onion/timaq4ygg2iegci@:666",
|
||||||
|
"/onion3/9ww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:80",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd7:80",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:0",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:-1",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd",
|
||||||
|
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyy@:666",
|
||||||
"/udp/1234/sctp",
|
"/udp/1234/sctp",
|
||||||
"/udp/1234/udt/1234",
|
"/udp/1234/udt/1234",
|
||||||
"/udp/1234/utp/1234",
|
"/udp/1234/utp/1234",
|
||||||
@ -376,3 +390,15 @@ suite "MultiAddress test suite":
|
|||||||
$ma[1..2].get() == "/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC"
|
$ma[1..2].get() == "/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC"
|
||||||
$ma[^3..^1].get() == "/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio"
|
$ma[^3..^1].get() == "/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio"
|
||||||
ma[5..7].isErr()
|
ma[5..7].isErr()
|
||||||
|
|
||||||
|
test "[](MultiCodec) test":
|
||||||
|
let onionMAStr = "/onion3/torchdeedp3i2jigzjdmfpn5ttjhthh5wbmda2rr3jvqjg5p77c54dqd:80"
|
||||||
|
let ma = MultiAddress.init(onionMAStr).get()
|
||||||
|
check $(ma[multiCodec("onion3")].tryGet()) == onionMAStr
|
||||||
|
|
||||||
|
let onionMAWithTcpStr = "/onion3/torchdeedp3i2jigzjdmfpn5ttjhthh5wbmda2rr3jvqjg5p77c54dqd:80/tcp/80"
|
||||||
|
let maWithTcp = MultiAddress.init(onionMAWithTcpStr).get()
|
||||||
|
check $(maWithTcp[multiCodec("onion3")].tryGet()) == onionMAStr
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -2,11 +2,11 @@
|
|||||||
|
|
||||||
import options, bearssl, chronos
|
import options, bearssl, chronos
|
||||||
import stew/byteutils
|
import stew/byteutils
|
||||||
import ../libp2p/[protocols/relay/relay,
|
import ../libp2p/[protocols/connectivity/relay/relay,
|
||||||
protocols/relay/client,
|
protocols/connectivity/relay/client,
|
||||||
protocols/relay/messages,
|
protocols/connectivity/relay/messages,
|
||||||
protocols/relay/utils,
|
protocols/connectivity/relay/utils,
|
||||||
protocols/relay/rtransport,
|
protocols/connectivity/relay/rtransport,
|
||||||
multiaddress,
|
multiaddress,
|
||||||
peerinfo,
|
peerinfo,
|
||||||
peerid,
|
peerid,
|
||||||
|
@ -2,10 +2,10 @@
|
|||||||
|
|
||||||
import bearssl, chronos, options
|
import bearssl, chronos, options
|
||||||
import ../libp2p
|
import ../libp2p
|
||||||
import ../libp2p/[protocols/relay/relay,
|
import ../libp2p/[protocols/connectivity/relay/relay,
|
||||||
protocols/relay/messages,
|
protocols/connectivity/relay/messages,
|
||||||
protocols/relay/utils,
|
protocols/connectivity/relay/utils,
|
||||||
protocols/relay/client]
|
protocols/connectivity/relay/client]
|
||||||
import ./helpers
|
import ./helpers
|
||||||
import std/times
|
import std/times
|
||||||
import stew/byteutils
|
import stew/byteutils
|
||||||
|
@ -1035,3 +1035,12 @@ suite "Switch":
|
|||||||
await conn.close()
|
await conn.close()
|
||||||
await src.stop()
|
await src.stop()
|
||||||
await dst.stop()
|
await dst.stop()
|
||||||
|
|
||||||
|
asyncTest "switch failing to start stops properly":
|
||||||
|
let switch = newStandardSwitch(
|
||||||
|
addrs = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet(), MultiAddress.init("/ip4/1.1.1.1/tcp/0").tryGet()]
|
||||||
|
)
|
||||||
|
|
||||||
|
expect LPError:
|
||||||
|
await switch.start()
|
||||||
|
# test is that this doesn't leak
|
||||||
|
@ -152,3 +152,36 @@ suite "Yamux":
|
|||||||
expect(LPStreamEOFError): await wrFut[i]
|
expect(LPStreamEOFError): await wrFut[i]
|
||||||
writerBlocker.complete()
|
writerBlocker.complete()
|
||||||
await streamA.close()
|
await streamA.close()
|
||||||
|
|
||||||
|
suite "Exception testing":
|
||||||
|
asyncTest "Local & Remote close":
|
||||||
|
mSetup()
|
||||||
|
|
||||||
|
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
|
||||||
|
check (await conn.readLp(100)) == fromHex("1234")
|
||||||
|
await conn.close()
|
||||||
|
expect LPStreamClosedError: await conn.writeLp(fromHex("102030"))
|
||||||
|
check (await conn.readLp(100)) == fromHex("5678")
|
||||||
|
|
||||||
|
let streamA = await yamuxa.newStream()
|
||||||
|
await streamA.writeLp(fromHex("1234"))
|
||||||
|
expect LPStreamRemoteClosedError: discard await streamA.readLp(100)
|
||||||
|
await streamA.writeLp(fromHex("5678"))
|
||||||
|
await streamA.close()
|
||||||
|
|
||||||
|
asyncTest "Local & Remote reset":
|
||||||
|
mSetup()
|
||||||
|
let blocker = newFuture[void]()
|
||||||
|
|
||||||
|
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
|
||||||
|
await blocker
|
||||||
|
expect LPStreamResetError: discard await conn.readLp(100)
|
||||||
|
expect LPStreamResetError: await conn.writeLp(fromHex("1234"))
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
let streamA = await yamuxa.newStream()
|
||||||
|
await yamuxa.close()
|
||||||
|
expect LPStreamClosedError: await streamA.writeLp(fromHex("1234"))
|
||||||
|
expect LPStreamClosedError: discard await streamA.readLp(100)
|
||||||
|
blocker.complete()
|
||||||
|
await streamA.close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user