* remove unused stream methods
* reimplement some of them with proc's
* remove broken tests
* Error->Defect for defect
* warning fixes
This commit is contained in:
Jacek Sieka 2020-05-06 18:31:47 +02:00 committed by GitHub
parent 6da4d2af48
commit 330da51819
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 131 additions and 447 deletions

View File

@ -17,6 +17,8 @@ import peerinfo,
varint, varint,
vbuffer vbuffer
export lpstream
logScope: logScope:
topic = "Connection" topic = "Connection"
@ -103,46 +105,18 @@ proc newConnection*(stream: LPStream): Connection =
## create a new Connection for the specified async reader/writer ## create a new Connection for the specified async reader/writer
result.init(stream) result.init(stream)
method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} =
s.stream.read(n)
method readExactly*(s: Connection, method readExactly*(s: Connection,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[void] {.gcsafe.} = Future[void] {.gcsafe.} =
s.stream.readExactly(pbytes, nbytes) s.stream.readExactly(pbytes, nbytes)
method readLine*(s: Connection,
limit = 0,
sep = "\r\n"):
Future[string] {.gcsafe.} =
s.stream.readLine(limit, sep)
method readOnce*(s: Connection, method readOnce*(s: Connection,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[int] {.gcsafe.} = Future[int] {.gcsafe.} =
s.stream.readOnce(pbytes, nbytes) s.stream.readOnce(pbytes, nbytes)
method readUntil*(s: Connection,
pbytes: pointer,
nbytes: int,
sep: seq[byte]):
Future[int] {.gcsafe.} =
s.stream.readUntil(pbytes, nbytes, sep)
method write*(s: Connection,
pbytes: pointer,
nbytes: int):
Future[void] {.gcsafe.} =
s.stream.write(pbytes, nbytes)
method write*(s: Connection,
msg: string,
msglen = -1):
Future[void] {.gcsafe.} =
s.stream.write(msg, msglen)
method write*(s: Connection, method write*(s: Connection,
msg: seq[byte], msg: seq[byte],
msglen = -1): msglen = -1):

View File

@ -16,7 +16,6 @@
# RFC @ https://tools.ietf.org/html/rfc7748 # RFC @ https://tools.ietf.org/html/rfc7748
import bearssl import bearssl
import nimcrypto/sysrand
const const
Curve25519KeySize* = 32 Curve25519KeySize* = 32

View File

@ -9,7 +9,8 @@
# https://tools.ietf.org/html/rfc5869 # https://tools.ietf.org/html/rfc5869
import math {.push raises: [Defect].}
import nimcrypto import nimcrypto
import bearssl import bearssl
@ -17,16 +18,23 @@ type
BearHKDFContext {.importc: "br_hkdf_context", header: "bearssl_kdf.h".} = object BearHKDFContext {.importc: "br_hkdf_context", header: "bearssl_kdf.h".} = object
HKDFResult*[len: static int] = array[len, byte] HKDFResult*[len: static int] = array[len, byte]
proc br_hkdf_init(ctx: ptr BearHKDFContext; hashClass: ptr HashClass; salt: pointer; len: csize) {.importc: "br_hkdf_init", header: "bearssl_kdf.h".} proc br_hkdf_init(ctx: ptr BearHKDFContext; hashClass: ptr HashClass; salt: pointer; len: csize_t) {.importc: "br_hkdf_init", header: "bearssl_kdf.h", raises: [].}
proc br_hkdf_inject(ctx: ptr BearHKDFContext; ikm: pointer; len: csize) {.importc: "br_hkdf_inject", header: "bearssl_kdf.h".} proc br_hkdf_inject(ctx: ptr BearHKDFContext; ikm: pointer; len: csize_t) {.importc: "br_hkdf_inject", header: "bearssl_kdf.h", raises: [].}
proc br_hkdf_flip(ctx: ptr BearHKDFContext) {.importc: "br_hkdf_flip", header: "bearssl_kdf.h".} proc br_hkdf_flip(ctx: ptr BearHKDFContext) {.importc: "br_hkdf_flip", header: "bearssl_kdf.h", raises: [].}
proc br_hkdf_produce(ctx: ptr BearHKDFContext; info: pointer; infoLen: csize; output: pointer; outputLen: csize) {.importc: "br_hkdf_produce", header: "bearssl_kdf.h".} proc br_hkdf_produce(ctx: ptr BearHKDFContext; info: pointer; infoLen: csize_t; output: pointer; outputLen: csize_t) {.importc: "br_hkdf_produce", header: "bearssl_kdf.h", raises: [].}
proc hkdf*[T: sha256; len: static int](_: type[T]; salt, ikm, info: openarray[byte]; outputs: var openarray[HKDFResult[len]]) = proc hkdf*[T: sha256; len: static int](_: type[T]; salt, ikm, info: openarray[byte]; outputs: var openarray[HKDFResult[len]]) =
var var
ctx: BearHKDFContext ctx: BearHKDFContext
br_hkdf_init(addr ctx, addr sha256Vtable, if salt.len > 0: unsafeaddr salt[0] else: nil, salt.len) br_hkdf_init(
br_hkdf_inject(addr ctx, if ikm.len > 0: unsafeaddr ikm[0] else: nil, ikm.len) addr ctx, addr sha256Vtable,
if salt.len > 0: unsafeaddr salt[0] else: nil, csize_t(salt.len))
br_hkdf_inject(
addr ctx, if ikm.len > 0: unsafeaddr ikm[0] else: nil, csize_t(ikm.len))
br_hkdf_flip(addr ctx) br_hkdf_flip(addr ctx)
for i in 0..outputs.high: for i in 0..outputs.high:
br_hkdf_produce(addr ctx, if info.len > 0: unsafeaddr info[0] else: nil, info.len, addr outputs[i][0], outputs[i].len) br_hkdf_produce(
addr ctx,
if info.len > 0: unsafeaddr info[0]
else: nil, csize_t(info.len),
addr outputs[i][0], csize_t(outputs[i].len))

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import strutils import strutils, stew/byteutils
import chronos, chronicles import chronos, chronicles
import connection, import connection,
vbuffer, vbuffer,
@ -36,8 +36,6 @@ type
MultistreamSelect* = ref object of RootObj MultistreamSelect* = ref object of RootObj
handlers*: seq[HandlerHolder] handlers*: seq[HandlerHolder]
codec*: string codec*: string
na: string
ls: string
MultistreamHandshakeException* = object of CatchableError MultistreamHandshakeException* = object of CatchableError
@ -48,8 +46,6 @@ proc newMultistreamHandshakeException*(): ref Exception {.inline.} =
proc newMultistream*(): MultistreamSelect = proc newMultistream*(): MultistreamSelect =
new result new result
result.codec = MSCodec result.codec = MSCodec
result.ls = Ls
result.na = Na
proc select*(m: MultistreamSelect, proc select*(m: MultistreamSelect,
conn: Connection, conn: Connection,
@ -71,7 +67,7 @@ proc select*(m: MultistreamSelect,
if proto.len() == 0: # no protocols, must be a handshake call if proto.len() == 0: # no protocols, must be a handshake call
return return
result = cast[string]((await conn.readLp())) # read the first proto result = string.fromBytes(await conn.readLp()) # read the first proto
trace "reading first requested proto" trace "reading first requested proto"
result.removeSuffix("\n") result.removeSuffix("\n")
if result == proto[0]: if result == proto[0]:
@ -82,7 +78,7 @@ proc select*(m: MultistreamSelect,
trace "selecting one of several protos" trace "selecting one of several protos"
for p in proto[1..<proto.len()]: for p in proto[1..<proto.len()]:
await conn.writeLp((p & "\n")) # select proto await conn.writeLp((p & "\n")) # select proto
result = cast[string]((await conn.readLp())) # read the first proto result = string.fromBytes(await conn.readLp()) # read the first proto
result.removeSuffix("\n") result.removeSuffix("\n")
if result == p: if result == p:
trace "selected protocol", protocol = result trace "selected protocol", protocol = result
@ -105,10 +101,10 @@ proc list*(m: MultistreamSelect,
if not await m.select(conn): if not await m.select(conn):
return return
await conn.write(m.ls) # send ls await conn.write(Ls) # send ls
var list = newSeq[string]() var list = newSeq[string]()
let ms = cast[string]((await conn.readLp())) let ms = string.fromBytes(await conn.readLp())
for s in ms.split("\n"): for s in ms.split("\n"):
if s.len() > 0: if s.len() > 0:
list.add(s) list.add(s)
@ -119,17 +115,17 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
trace "handle: starting multistream handling" trace "handle: starting multistream handling"
tryAndWarn "multistream handle": tryAndWarn "multistream handle":
while not conn.closed: while not conn.closed:
var ms = cast[string]((await conn.readLp())) var ms = string.fromBytes(await conn.readLp())
ms.removeSuffix("\n") ms.removeSuffix("\n")
trace "handle: got request for ", ms trace "handle: got request for ", ms
if ms.len() <= 0: if ms.len() <= 0:
trace "handle: invalid proto" trace "handle: invalid proto"
await conn.write(m.na) await conn.write(Na)
if m.handlers.len() == 0: if m.handlers.len() == 0:
trace "handle: sending `na` for protocol ", protocol = ms trace "handle: sending `na` for protocol ", protocol = ms
await conn.write(m.na) await conn.write(Na)
continue continue
case ms: case ms:
@ -150,7 +146,7 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
await h.protocol.handler(conn, ms) await h.protocol.handler(conn, ms)
return return
warn "no handlers for ", protocol = ms warn "no handlers for ", protocol = ms
await conn.write(m.na) await conn.write(Na)
trace "leaving multistream loop" trace "leaving multistream loop"
# we might be tempted to close conn here but that would be a bad idea! # we might be tempted to close conn here but that would be a bad idea!
# we indeed will reuse it later on # we indeed will reuse it later on

View File

@ -17,6 +17,8 @@ import types,
../../utility, ../../utility,
../../errors ../../errors
export lpstream
logScope: logScope:
topic = "MplexChannel" topic = "MplexChannel"
@ -136,11 +138,6 @@ template raiseEOF(): untyped =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamEOFError() raise newLPStreamEOFError()
method read*(s: LPChannel, n = -1): Future[seq[byte]] {.async.} =
raiseEOF()
result = (await procCall(read(BufferStream(s), n)))
await s.tryCleanup()
method readExactly*(s: LPChannel, method readExactly*(s: LPChannel,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
@ -149,14 +146,6 @@ method readExactly*(s: LPChannel,
await procCall readExactly(BufferStream(s), pbytes, nbytes) await procCall readExactly(BufferStream(s), pbytes, nbytes)
await s.tryCleanup() await s.tryCleanup()
method readLine*(s: LPChannel,
limit = 0,
sep = "\r\n"):
Future[string] {.async.} =
raiseEOF()
result = await procCall readLine(BufferStream(s), limit, sep)
await s.tryCleanup()
method readOnce*(s: LPChannel, method readOnce*(s: LPChannel,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
@ -165,14 +154,6 @@ method readOnce*(s: LPChannel,
result = await procCall readOnce(BufferStream(s), pbytes, nbytes) result = await procCall readOnce(BufferStream(s), pbytes, nbytes)
await s.tryCleanup() await s.tryCleanup()
method readUntil*(s: LPChannel,
pbytes: pointer, nbytes: int,
sep: seq[byte]):
Future[int] {.async.} =
raiseEOF()
result = await procCall readOnce(BufferStream(s), pbytes, nbytes)
await s.tryCleanup()
template writePrefix: untyped = template writePrefix: untyped =
if s.closedLocal or s.isReset: if s.closedLocal or s.isReset:
raise newLPStreamEOFError() raise newLPStreamEOFError()
@ -180,14 +161,6 @@ template writePrefix: untyped =
if s.isLazy and not s.isOpen: if s.isLazy and not s.isOpen:
await s.open() await s.open()
method write*(s: LPChannel, pbytes: pointer, nbytes: int) {.async.} =
writePrefix()
await procCall write(BufferStream(s), pbytes, nbytes)
method write*(s: LPChannel, msg: string, msglen = -1) {.async.} =
writePrefix()
await procCall write(BufferStream(s), msg, msglen)
method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} =
writePrefix() writePrefix()
await procCall write(BufferStream(s), msg, msglen) await procCall write(BufferStream(s), msg, msglen)

View File

@ -25,8 +25,6 @@ import ../muxer,
logScope: logScope:
topic = "Mplex" topic = "Mplex"
const DefaultRWTimeout = InfiniteDuration
type type
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
remote*: Table[uint64, LPChannel] remote*: Table[uint64, LPChannel]

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import sequtils, tables, options, sets, strutils import sequtils, tables, sets, strutils
import chronos, chronicles import chronos, chronicles
import pubsub, import pubsub,
pubsubpeer, pubsubpeer,
@ -15,8 +15,8 @@ import pubsub,
rpc/[messages, message], rpc/[messages, message],
../../crypto/crypto, ../../crypto/crypto,
../../connection, ../../connection,
../../peerinfo,
../../peer, ../../peer,
../../peerinfo,
../../utility, ../../utility,
../../errors ../../errors

View File

@ -13,8 +13,7 @@ import pubsubpeer,
rpc/messages, rpc/messages,
../protocol, ../protocol,
../../connection, ../../connection,
../../peerinfo, ../../peerinfo
../../peer
export PubSubPeer export PubSubPeer

View File

@ -10,10 +10,8 @@
import options import options
import chronicles import chronicles
import messages, import messages,
../../../protobuf/minprotobuf, ../../../utility,
../../../crypto/crypto, ../../../protobuf/minprotobuf
../../../peer,
../../../utility
proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} = proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, graft.topicID)) pb.write(initProtoField(1, graft.topicID))

View File

@ -520,7 +520,7 @@ method init*(p: Noise) {.gcsafe.} =
procCall Secure(p).init() procCall Secure(p).init()
p.codec = NoiseCodec p.codec = NoiseCodec
method secure*(p: Noise, conn: Connection): Future[Connection] {.async, gcsafe.} = proc secure*(p: Noise, conn: Connection): Future[Connection] {.async, gcsafe.} =
trace "Noise.secure called", initiator=p.outgoing trace "Noise.secure called", initiator=p.outgoing
try: try:
result = await p.handleConn(conn, p.outgoing) result = await p.handleConn(conn, p.outgoing)

View File

@ -14,7 +14,6 @@ import secure,
../../stream/lpstream, ../../stream/lpstream,
../../crypto/crypto, ../../crypto/crypto,
../../crypto/ecnist, ../../crypto/ecnist,
../../protobuf/minprotobuf,
../../peer, ../../peer,
../../utility ../../utility
export hmac, sha2, sha, hash, rijndael, bcmode export hmac, sha2, sha, hash, rijndael, bcmode

View File

@ -11,7 +11,6 @@ import options
import chronos, chronicles import chronos, chronicles
import ../protocol, import ../protocol,
../../stream/bufferstream, ../../stream/bufferstream,
../../crypto/crypto,
../../connection, ../../connection,
../../peerinfo, ../../peerinfo,
../../utility ../../utility

View File

@ -34,6 +34,8 @@ import deques, math, oids
import chronos, chronicles, metrics import chronos, chronicles, metrics
import ../stream/lpstream import ../stream/lpstream
export lpstream
const const
BufferStreamTrackerName* = "libp2p.bufferstream" BufferStreamTrackerName* = "libp2p.bufferstream"
DefaultBufferSize* = 1024 DefaultBufferSize* = 1024
@ -168,31 +170,6 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} =
finally: finally:
s.lock.release() s.lock.release()
method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async.} =
## Read all bytes (n <= 0) or exactly `n` bytes from buffer
##
## This procedure allocates buffer seq[byte] and return it as result.
##
when chronicles.enabledLogLevel == LogLevel.TRACE:
logScope:
stream_oid = $s.oid
trace "read()", requested_bytes = n
var size = if n > 0: n else: s.readBuf.len()
var index = 0
if s.readBuf.len() == 0:
await s.requestReadBytes()
while index < size:
while s.readBuf.len() > 0 and index < size:
result.add(s.popFirst())
inc(index)
trace "read()", read_bytes = index
if index < size:
await s.requestReadBytes()
method readExactly*(s: BufferStream, method readExactly*(s: BufferStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
@ -207,53 +184,21 @@ method readExactly*(s: BufferStream,
logScope: logScope:
stream_oid = $s.oid stream_oid = $s.oid
var buff: seq[byte] trace "read()", requested_bytes = nbytes
try:
buff = await s.read(nbytes)
except LPStreamEOFError as exc:
trace "Exception occurred", exc = exc.msg
if nbytes > buff.len():
raise newLPStreamIncompleteError()
copyMem(pbytes, addr buff[0], nbytes)
method readLine*(s: BufferStream,
limit = 0,
sep = "\r\n"):
Future[string] {.async.} =
## Read one line from read-only stream ``rstream``, where ``"line"`` is a
## sequence of bytes ending with ``sep`` (default is ``"\r\n"``).
##
## If EOF is received, and ``sep`` was not found, the method will return the
## partial read bytes.
##
## If the EOF was received and the internal buffer is empty, return an
## empty string.
##
## If ``limit`` more then 0, then result string will be limited to ``limit``
## bytes.
##
result = ""
var lim = if limit <= 0: -1 else: limit
var state = 0
var index = 0 var index = 0
index = 0 if s.readBuf.len() == 0:
while index < s.readBuf.len: await s.requestReadBytes()
let ch = char(s.readBuf[index])
if sep[state] == ch: let output = cast[ptr UncheckedArray[byte]](pbytes)
inc(state) while index < nbytes:
if state == len(sep): while s.readBuf.len() > 0 and index < nbytes:
s.shrink(index + 1) output[index] = s.popFirst()
break inc(index)
else: trace "readExactly()", read_bytes = index
state = 0
result.add(ch) if index < nbytes:
if len(result) == lim: await s.requestReadBytes()
s.shrink(index + 1)
break
inc(index)
method readOnce*(s: BufferStream, method readOnce*(s: BufferStream,
pbytes: pointer, pbytes: pointer,
@ -271,93 +216,6 @@ method readOnce*(s: BufferStream,
await s.readExactly(pbytes, len) await s.readExactly(pbytes, len)
result = len result = len
method readUntil*(s: BufferStream,
pbytes: pointer,
nbytes: int,
sep: seq[byte]):
Future[int] {.async.} =
## Read data from the read-only stream ``rstream`` until separator ``sep`` is
## found.
##
## On success, the data and separator will be removed from the internal
## buffer (consumed). Returned data will include the separator at the end.
##
## If EOF is received, and `sep` was not found, procedure will raise
## ``LPStreamIncompleteError``.
##
## If ``nbytes`` bytes has been received and `sep` was not found, procedure
## will raise ``LPStreamLimitError``.
##
## Procedure returns actual number of bytes read.
##
var
dest = cast[ptr UncheckedArray[byte]](pbytes)
state = 0
k = 0
let datalen = s.readBuf.len()
if datalen == 0 and s.readBuf.len() == 0:
raise newLPStreamIncompleteError()
var index = 0
while index < datalen:
let ch = s.readBuf[index]
if sep[state] == ch:
inc(state)
else:
state = 0
if k < nbytes:
dest[k] = ch
inc(k)
else:
raise newLPStreamLimitError()
if state == len(sep):
break
inc(index)
if state == len(sep):
s.shrink(index + 1)
result = k
else:
s.shrink(datalen)
method write*(s: BufferStream,
pbytes: pointer,
nbytes: int): Future[void] =
## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream
## ``rstream``.
##
## Return number of bytes actually consumed (discarded).
##
if isNil(s.writeHandler):
var retFuture = newFuture[void]("BufferStream.write(pointer)")
retFuture.fail(newNotWritableError())
return retFuture
var buf: seq[byte] = newSeq[byte](nbytes)
copyMem(addr buf[0], pbytes, nbytes)
result = s.writeHandler(buf)
method write*(s: BufferStream,
msg: string,
msglen = -1): Future[void] =
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
##
## String ``sbytes`` must not be zero-length.
##
## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream.
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
##
if isNil(s.writeHandler):
var retFuture = newFuture[void]("BufferStream.write(string)")
retFuture.fail(newNotWritableError())
return retFuture
var buf = ""
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
result = s.writeHandler(cast[seq[byte]](buf))
method write*(s: BufferStream, method write*(s: BufferStream,
msg: seq[byte], msg: seq[byte],
msglen = -1): Future[void] = msglen = -1): Future[void] =
@ -375,9 +233,7 @@ method write*(s: BufferStream,
retFuture.fail(newNotWritableError()) retFuture.fail(newNotWritableError())
return retFuture return retFuture
var buf: seq[byte] result = s.writeHandler(if msglen >= 0: msg[0..<msglen] else: msg)
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
result = s.writeHandler(buf)
proc pipe*(s: BufferStream, proc pipe*(s: BufferStream,
target: BufferStream): BufferStream = target: BufferStream): BufferStream =

View File

@ -36,17 +36,10 @@ template withExceptions(body: untyped) =
except TransportLimitError: except TransportLimitError:
raise newLPStreamLimitError() raise newLPStreamLimitError()
except TransportError as exc: except TransportError as exc:
raise newLPStreamIncorrectError(exc.msg) raise newLPStreamIncorrectDefect(exc.msg)
except AsyncStreamIncompleteError: except AsyncStreamIncompleteError:
raise newLPStreamIncompleteError() raise newLPStreamIncompleteError()
method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} =
if s.reader.atEof:
raise newLPStreamEOFError()
withExceptions:
result = await s.reader.read(n)
method readExactly*(s: ChronosStream, method readExactly*(s: ChronosStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): Future[void] {.async.} = nbytes: int): Future[void] {.async.} =
@ -56,13 +49,6 @@ method readExactly*(s: ChronosStream,
withExceptions: withExceptions:
await s.reader.readExactly(pbytes, nbytes) await s.reader.readExactly(pbytes, nbytes)
method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} =
if s.reader.atEof:
raise newLPStreamEOFError()
withExceptions:
result = await s.reader.readLine(limit, sep)
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.reader.atEof: if s.reader.atEof:
raise newLPStreamEOFError() raise newLPStreamEOFError()
@ -70,30 +56,6 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
withExceptions: withExceptions:
result = await s.reader.readOnce(pbytes, nbytes) result = await s.reader.readOnce(pbytes, nbytes)
method readUntil*(s: ChronosStream,
pbytes: pointer,
nbytes: int,
sep: seq[byte]): Future[int] {.async.} =
if s.reader.atEof:
raise newLPStreamEOFError()
withExceptions:
result = await s.reader.readUntil(pbytes, nbytes, sep)
method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} =
if s.writer.atEof:
raise newLPStreamEOFError()
withExceptions:
await s.writer.write(pbytes, nbytes)
method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} =
if s.writer.atEof:
raise newLPStreamEOFError()
withExceptions:
await s.writer.write(msg, msglen)
method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} = method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} =
if s.writer.atEof: if s.writer.atEof:
raise newLPStreamEOFError() raise newLPStreamEOFError()

View File

@ -19,7 +19,7 @@ type
LPStreamError* = object of CatchableError LPStreamError* = object of CatchableError
LPStreamIncompleteError* = object of LPStreamError LPStreamIncompleteError* = object of LPStreamError
LPStreamIncorrectError* = object of Defect LPStreamIncorrectDefect* = object of Defect
LPStreamLimitError* = object of LPStreamError LPStreamLimitError* = object of LPStreamError
LPStreamReadError* = object of LPStreamError LPStreamReadError* = object of LPStreamError
par*: ref Exception par*: ref Exception
@ -45,8 +45,8 @@ proc newLPStreamIncompleteError*(): ref Exception {.inline.} =
proc newLPStreamLimitError*(): ref Exception {.inline.} = proc newLPStreamLimitError*(): ref Exception {.inline.} =
result = newException(LPStreamLimitError, "Buffer limit reached") result = newException(LPStreamLimitError, "Buffer limit reached")
proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} = proc newLPStreamIncorrectDefect*(m: string): ref Exception {.inline.} =
result = newException(LPStreamIncorrectError, m) result = newException(LPStreamIncorrectDefect, m)
proc newLPStreamEOFError*(): ref Exception {.inline.} = proc newLPStreamEOFError*(): ref Exception {.inline.} =
result = newException(LPStreamEOFError, "Stream EOF!") result = newException(LPStreamEOFError, "Stream EOF!")
@ -54,24 +54,12 @@ proc newLPStreamEOFError*(): ref Exception {.inline.} =
method closed*(s: LPStream): bool {.base, inline.} = method closed*(s: LPStream): bool {.base, inline.} =
s.isClosed s.isClosed
method read*(s: LPStream,
n = -1):
Future[seq[byte]] {.base, async.} =
doAssert(false, "not implemented!")
method readExactly*(s: LPStream, method readExactly*(s: LPStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[void] {.base, async.} = Future[void] {.base, async.} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
method readLine*(s: LPStream,
limit = 0,
sep = "\r\n"):
Future[string]
{.base, async.} =
doAssert(false, "not implemented!")
method readOnce*(s: LPStream, method readOnce*(s: LPStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
@ -79,26 +67,56 @@ method readOnce*(s: LPStream,
{.base, async.} = {.base, async.} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
method readUntil*(s: LPStream, proc read*(s: LPStream, nbytes: int): Future[seq[byte]] {.async, deprecated: "readExactly".} =
pbytes: pointer, # This function is deprecated - it was broken and used inappropriately as
nbytes: int, # `readExacltly` in tests and code - tests still need refactoring to remove
sep: seq[byte]): # any calls
Future[int] # `read` without nbytes was also incorrectly implemented - it worked more
{.base, async.} = # like `readOnce` in that it would not wait for stream to close, in
doAssert(false, "not implemented!") # BufferStream in particular - both tests and implementation were broken
var ret = newSeq[byte](nbytes)
await readExactly(s, addr ret[0], ret.len)
return ret
method write*(s: LPStream, pbytes: pointer, nbytes: int) proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, deprecated: "todo".} =
{.base, async.} = # TODO replace with something that exploits buffering better
doAssert(false, "not implemented!") var lim = if limit <= 0: -1 else: limit
var state = 0
method write*(s: LPStream, msg: string, msglen = -1) try:
{.base, async.} = while true:
doAssert(false, "not implemented!") var ch: char
await readExactly(s, addr ch, 1)
if sep[state] == ch:
inc(state)
if state == len(sep):
break
else:
state = 0
if limit > 0:
let missing = min(state, lim - len(result) - 1)
result.add(sep[0 ..< missing])
else:
result.add(sep[0 ..< state])
result.add(ch)
if len(result) == lim:
break
except LPStreamIncompleteError, LPStreamReadError:
discard # EOF, in which case we should return whatever we read so far..
method write*(s: LPStream, msg: seq[byte], msglen = -1) method write*(s: LPStream, msg: seq[byte], msglen = -1)
{.base, async.} = {.base, async.} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
proc write*(s: LPStream, msg: string, msglen = -1): Future[void] =
let nbytes = if msglen >= 0: msglen else: msg.len
s.write(@(toOpenArrayByte(msg, 0, nbytes - 1)))
method close*(s: LPStream) method close*(s: LPStream)
{.base, async.} = {.base, async.} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos, chronicles, sequtils, sets import chronos, chronicles, sequtils
import transport, import transport,
../errors, ../errors,
../wire, ../wire,

View File

@ -1,3 +1,5 @@
{.used.}
import testgossipinternal, import testgossipinternal,
testfloodsub, testfloodsub,
testgossipsub, testgossipsub,

View File

@ -45,22 +45,6 @@ suite "BufferStream":
check: check:
waitFor(testPushTo()) == true waitFor(testPushTo()) == true
test "read":
proc testRead(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let buff = newBufferStream(writeHandler, 10)
check buff.len == 0
await buff.pushTo(cast[seq[byte]](@"12345"))
check @"12345" == cast[string](await buff.read())
result = true
await buff.close()
check:
waitFor(testRead()) == true
test "read with size": test "read with size":
proc testRead(): Future[bool] {.async.} = proc testRead(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
@ -99,32 +83,6 @@ suite "BufferStream":
check: check:
waitFor(testRead()) == true waitFor(testRead()) == true
test "read all from small buffer":
proc testRead(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard
let buff = newBufferStream(writeHandler, 4)
check buff.len == 0
proc reader() {.async.} =
var size = 0
while size != 5:
var msg = await buff.read()
size += msg.len
check size == 5
var fut = reader()
await buff.pushTo(cast[seq[byte]](@"12345"))
await fut
result = true
await buff.close()
check:
waitFor(testRead()) == true
test "readExactly": test "readExactly":
proc testReadExactly(): Future[bool] {.async.} = proc testReadExactly(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
@ -144,23 +102,6 @@ suite "BufferStream":
check: check:
waitFor(testReadExactly()) == true waitFor(testReadExactly()) == true
test "readLine":
proc testReadLine(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let buff = newBufferStream(writeHandler, 16)
check buff.len == 0
await buff.pushTo(cast[seq[byte]](@"12345\n67890"))
check buff.len == 11
check "12345" == await buff.readLine(0, "\n")
result = true
await buff.close()
check:
waitFor(testReadLine()) == true
test "readOnce": test "readOnce":
proc testReadOnce(): Future[bool] {.async.} = proc testReadOnce(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
@ -182,27 +123,6 @@ suite "BufferStream":
check: check:
waitFor(testReadOnce()) == true waitFor(testReadOnce()) == true
test "readUntil":
proc testReadUntil(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let buff = newBufferStream(writeHandler, 10)
check buff.len == 0
var data: seq[byte] = newSeq[byte](3)
await buff.pushTo(cast[seq[byte]](@"123$45"))
check buff.len == 6
let readFut = buff.readUntil(addr data[0], 5, @[byte('$')])
check (await readFut) == 4
check cast[string](data) == @['1', '2', '3']
result = true
await buff.close()
check:
waitFor(testReadUntil()) == true
test "write ptr": test "write ptr":
proc testWritePtr(): Future[bool] {.async.} = proc testWritePtr(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =

View File

@ -523,7 +523,6 @@ suite "Key interface test suite":
private2 = fromHex("5dab087e624a8a4b79e17f8b83800ee66f3bb1292618b6fd1c2f8b27ff88e0eb").intoCurve25519Key private2 = fromHex("5dab087e624a8a4b79e17f8b83800ee66f3bb1292618b6fd1c2f8b27ff88e0eb").intoCurve25519Key
p1Pub = private1.public() p1Pub = private1.public()
p2Pub = private2.public() p2Pub = private2.public()
p2Gen: Curve25519Key
check p1Pub.toHex == "8520F0098930A754748B7DDCB43EF75A0DBF3A0D26381AF4EBA4A98EAA9B4E6A" check p1Pub.toHex == "8520F0098930A754748B7DDCB43EF75A0DBF3A0D26381AF4EBA4A98EAA9B4E6A"
check p2Pub.toHex == "DE9EDB7D7B7DC1B4D35B61C2ECE435373F8343C85B78674DADFC7E146F882B4F" check p2Pub.toHex == "DE9EDB7D7B7DC1B4D35B61C2ECE435373F8343C85B78674DADFC7E146F882B4F"

View File

@ -1,5 +1,5 @@
import unittest, options import unittest, options
import chronos, strutils, sequtils import chronos, strutils
import ../libp2p/[protocols/identify, import ../libp2p/[protocols/identify,
multiaddress, multiaddress,
peerinfo, peerinfo,
@ -8,7 +8,6 @@ import ../libp2p/[protocols/identify,
multistream, multistream,
transports/transport, transports/transport,
transports/tcptransport, transports/tcptransport,
protocols/protocol,
crypto/crypto] crypto/crypto]
when defined(nimHasUsed): {.used.} when defined(nimHasUsed): {.used.}

View File

@ -1,4 +1,4 @@
import unittest, sequtils, sugar, strformat, options, strformat, random import unittest, strformat, strformat, random
import chronos, nimcrypto/utils, chronicles import chronos, nimcrypto/utils, chronicles
import ../libp2p/[errors, import ../libp2p/[errors,
connection, connection,
@ -6,7 +6,6 @@ import ../libp2p/[errors,
stream/bufferstream, stream/bufferstream,
transports/tcptransport, transports/tcptransport,
transports/transport, transports/transport,
protocols/identify,
multiaddress, multiaddress,
muxers/mplex/mplex, muxers/mplex/mplex,
muxers/mplex/coder, muxers/mplex/coder,
@ -470,8 +469,8 @@ suite "Mplex":
try: try:
await chann.pushTo(cast[seq[byte]]("Hello!")) await chann.pushTo(cast[seq[byte]]("Hello!"))
await chann.closedByRemote() await chann.closedByRemote()
discard await chann.read() # this should work, since there is data in the buffer discard await chann.read(6) # this should work, since there is data in the buffer
discard await chann.read() # this should throw discard await chann.read(6) # this should throw
finally: finally:
await chann.cleanUp() await chann.cleanUp()
await conn.close() await conn.close()
@ -616,7 +615,7 @@ suite "Mplex":
try: try:
await chann.reset() await chann.reset()
var data = await chann.read() var data = await chann.read(1)
doAssert(len(data) == 1) doAssert(len(data) == 1)
finally: finally:
await chann.cleanUp() await chann.cleanUp()

View File

@ -1,4 +1,4 @@
import unittest, strutils, sequtils, strformat, options import unittest, strutils, sequtils, strformat, stew/byteutils
import chronos import chronos
import ../libp2p/errors, import ../libp2p/errors,
../libp2p/connection, ../libp2p/connection,
@ -9,10 +9,7 @@ import ../libp2p/errors,
../libp2p/multiaddress, ../libp2p/multiaddress,
../libp2p/transports/transport, ../libp2p/transports/transport,
../libp2p/transports/tcptransport, ../libp2p/transports/tcptransport,
../libp2p/protocols/protocol, ../libp2p/protocols/protocol
../libp2p/crypto/crypto,
../libp2p/peerinfo,
../libp2p/peer
when defined(nimHasUsed): {.used.} when defined(nimHasUsed): {.used.}
@ -54,9 +51,6 @@ method readExactly*(s: TestSelectStream,
method write*(s: TestSelectStream, msg: seq[byte], msglen = -1) method write*(s: TestSelectStream, msg: seq[byte], msglen = -1)
{.async, gcsafe.} = discard {.async, gcsafe.} = discard
method write*(s: TestSelectStream, msg: string, msglen = -1)
{.async, gcsafe.} = discard
method close(s: TestSelectStream) {.async, gcsafe.} = method close(s: TestSelectStream) {.async, gcsafe.} =
s.isClosed = true s.isClosed = true
@ -95,15 +89,13 @@ method readExactly*(s: TestLsStream,
var buf = "ls\n" var buf = "ls\n"
copyMem(pbytes, addr buf[0], buf.len()) copyMem(pbytes, addr buf[0], buf.len())
else: else:
copyMem(pbytes, cstring(Na), Na.len()) var buf = "na\n"
copyMem(pbytes, addr buf[0], buf.len())
method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} = method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
if s.step == 4: if s.step == 4:
await s.ls(msg) await s.ls(msg)
method write*(s: TestLsStream, msg: string, msglen = -1)
{.async, gcsafe.} = discard
method close(s: TestLsStream) {.async, gcsafe.} = method close(s: TestLsStream) {.async, gcsafe.} =
s.isClosed = true s.isClosed = true
@ -147,9 +139,9 @@ method readExactly*(s: TestNaStream,
cstring("\0x3na\n"), cstring("\0x3na\n"),
"\0x3na\n".len()) "\0x3na\n".len())
method write*(s: TestNaStream, msg: string, msglen = -1) {.async, gcsafe.} = method write*(s: TestNaStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
if s.step == 4: if s.step == 4:
await s.na(msg) await s.na(string.fromBytes(msg))
method close(s: TestNaStream) {.async, gcsafe.} = method close(s: TestNaStream) {.async, gcsafe.} =
s.isClosed = true s.isClosed = true
@ -240,7 +232,7 @@ suite "Multistream select":
let conn = newConnection(newTestNaStream(testNaHandler)) let conn = newConnection(newTestNaStream(testNaHandler))
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} = proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
check cast[string](msg) == Na check msg == Na
await conn.close() await conn.close()
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.used.}
import unittest, tables import unittest, tables
import chronos import chronos
import chronicles import chronicles
@ -23,7 +25,6 @@ import ../libp2p/[switch,
multiaddress, multiaddress,
peerinfo, peerinfo,
crypto/crypto, crypto/crypto,
peer,
protocols/protocol, protocols/protocol,
muxers/muxer, muxers/muxer,
muxers/mplex/mplex, muxers/mplex/mplex,

View File

@ -1,3 +1,4 @@
{.used.}
import unittest, options import unittest, options
import chronos import chronos

View File

@ -13,7 +13,6 @@ import ../libp2p/[errors,
multiaddress, multiaddress,
peerinfo, peerinfo,
crypto/crypto, crypto/crypto,
peer,
protocols/protocol, protocols/protocol,
muxers/muxer, muxers/muxer,
muxers/mplex/mplex, muxers/mplex/mplex,

View File

@ -1,7 +1,6 @@
import unittest import unittest
import chronos import chronos
import ../libp2p/[errors, import ../libp2p/[connection,
connection,
transports/transport, transports/transport,
transports/tcptransport, transports/tcptransport,
multiaddress, multiaddress,
@ -13,12 +12,6 @@ const
StreamTransportTrackerName = "stream.transport" StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server" StreamServerTrackerName = "stream.server"
template ignoreErrors(body: untyped): untyped =
try:
body
except:
echo getCurrentExceptionMsg()
suite "TCP transport": suite "TCP transport":
teardown: teardown:
check: check: