fix varint issues

* fixes #111
This commit is contained in:
Jacek Sieka 2020-05-08 22:58:23 +02:00 committed by Dmitriy Ryajov
parent 87e1f3c61f
commit 3053f03814
20 changed files with 143 additions and 165 deletions

View File

@ -34,7 +34,7 @@ type
method init(p: TestProto) {.gcsafe.} =
# handle incoming connections in closure
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
echo "Got from remote - ", cast[string](await conn.readLp())
echo "Got from remote - ", cast[string](await conn.readLp(1024))
await conn.writeLp("Hello!")
await conn.close()
@ -90,7 +90,7 @@ proc main() {.async, gcsafe.} =
await conn.writeLp("Hello!") # writeLp send a length prefixed buffer over the wire
# readLp reads length prefixed bytes and returns a buffer without the prefix
echo "Remote responded with - ", cast[string](await conn.readLp())
echo "Remote responded with - ", cast[string](await conn.readLp(1024))
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown

View File

@ -60,7 +60,7 @@ proc dialPeer(p: ChatProto, address: string) {.async.} =
proc readAndPrint(p: ChatProto) {.async.} =
while true:
while p.connected:
echo cast[string](await p.conn.readLp())
echo cast[string](await p.conn.readLp(1024))
await sleepAsync(100.millis)
proc writeAndPrint(p: ChatProto) {.async.} =
@ -148,4 +148,3 @@ proc main() {.async.} =
when isMainModule: # isMainModule = true when the module is compiled as the main file
waitFor(main())

View File

@ -60,7 +60,7 @@ proc dialPeer(p: ChatProto, address: string) {.async.} =
proc readAndPrint(p: ChatProto) {.async.} =
while true:
while p.connected:
echo cast[string](await p.conn.readLp())
echo cast[string](await p.conn.readLp(1024))
await sleepAsync(100.millis)
proc writeAndPrint(p: ChatProto) {.async.} =
@ -148,4 +148,3 @@ proc main() {.async.} =
when isMainModule: # isMainModule = true when the module is compiled as the main file
waitFor(main())

View File

@ -63,7 +63,7 @@ proc readAndPrint(p: ChatProto) {.async.} =
while p.connected:
# TODO: echo &"{p.id} -> "
echo cast[string](await p.conn.readLp())
echo cast[string](await p.conn.readLp(1024))
await sleepAsync(100.millis)
proc writeAndPrint(p: ChatProto) {.async.} =

View File

@ -23,7 +23,6 @@ logScope:
topic = "Connection"
const
DefaultReadSize* = 1 shl 20
ConnectionTrackerName* = "libp2p.connection"
type
@ -35,9 +34,6 @@ type
# (we got many actually :-))
readLoops*: seq[Future[void]]
InvalidVarintException = object of LPStreamError
InvalidVarintSizeException = object of LPStreamError
ConnectionTracker* = ref object of TrackerBase
opened*: uint64
closed*: uint64
@ -68,12 +64,6 @@ proc setupConnectionTracker(): ConnectionTracker =
declareGauge libp2p_open_connection, "open Connection instances"
proc newInvalidVarintException*(): ref InvalidVarintException =
newException(InvalidVarintException, "Unable to parse varint")
proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException =
newException(InvalidVarintSizeException, "Wrong varint size")
proc bindStreamClose(conn: Connection) {.async.} =
# bind stream's close event to connection's close
# to ensure correct close propagation
@ -155,42 +145,6 @@ method close*(s: Connection) {.async, gcsafe.} =
s.peerInfo.id else: ""
libp2p_open_connection.dec()
proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
## read lenght prefixed msg
var
size: uint
length: int
res: VarintStatus
buff = newSeq[byte](10)
try:
for i in 0..<len(buff):
await s.readExactly(addr buff[i], 1)
res = LP.getUVarint(buff.toOpenArray(0, i), length, size)
if res == VarintStatus.Success:
break
if res != VarintStatus.Success:
raise newInvalidVarintException()
if size.int > DefaultReadSize:
raise newInvalidVarintSizeException()
buff.setLen(size)
if size > 0.uint:
trace "reading exact bytes from stream", size = size
await s.readExactly(addr buff[0], int(size))
return buff
except LPStreamIncompleteError as exc:
trace "remote connection ended unexpectedly", exc = exc.msg
raise exc
except LPStreamReadError as exc:
trace "couldn't read from stream", exc = exc.msg
raise exc
proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} =
## write lenght prefixed
var buf = initVBuffer()
buf.writeSeq(msg)
buf.finish()
s.write(buf.buffer)
method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} =
## get resolved multiaddresses for the connection
result = c.observedAddrs

View File

@ -58,7 +58,7 @@ proc select*(m: MultistreamSelect,
trace "selecting proto", proto = proto
await conn.writeLp((proto[0] & "\n")) # select proto
result = cast[string]((await conn.readLp())) # read ms header
result = cast[string]((await conn.readLp(1024))) # read ms header
result.removeSuffix("\n")
if result != Codec:
error "handshake failed", codec = result.toHex()
@ -67,7 +67,7 @@ proc select*(m: MultistreamSelect,
if proto.len() == 0: # no protocols, must be a handshake call
return
result = string.fromBytes(await conn.readLp()) # read the first proto
result = string.fromBytes(await conn.readLp(1024)) # read the first proto
trace "reading first requested proto"
result.removeSuffix("\n")
if result == proto[0]:
@ -78,7 +78,7 @@ proc select*(m: MultistreamSelect,
trace "selecting one of several protos"
for p in proto[1..<proto.len()]:
await conn.writeLp((p & "\n")) # select proto
result = string.fromBytes(await conn.readLp()) # read the first proto
result = string.fromBytes(await conn.readLp(1024)) # read the first proto
result.removeSuffix("\n")
if result == p:
trace "selected protocol", protocol = result
@ -104,7 +104,7 @@ proc list*(m: MultistreamSelect,
await conn.write(Ls) # send ls
var list = newSeq[string]()
let ms = string.fromBytes(await conn.readLp())
let ms = string.fromBytes(await conn.readLp(1024))
for s in ms.split("\n"):
if s.len() > 0:
list.add(s)
@ -115,7 +115,7 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
trace "handle: starting multistream handling"
tryAndWarn "multistream handle":
while not conn.closed:
var ms = string.fromBytes(await conn.readLp())
var ms = string.fromBytes(await conn.readLp(1024))
ms.removeSuffix("\n")
trace "handle: got request for ", ms

View File

@ -11,6 +11,7 @@ import chronos
import nimcrypto/utils, chronicles
import types,
../../connection,
../../utility,
../../varint,
../../vbuffer,
../../stream/lpstream
@ -29,46 +30,18 @@ type
proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType =
newException(InvalidMplexMsgType, "invalid message type")
proc readMplexVarint(conn: Connection): Future[uint64] {.async, gcsafe.} =
var
varint: uint
length: int
res: VarintStatus
buffer = newSeq[byte](10)
try:
for i in 0..<len(buffer):
await conn.readExactly(addr buffer[i], 1)
res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
if res == VarintStatus.Success:
break
if res != VarintStatus.Success:
raise newInvalidVarintException()
return varint
except LPStreamIncompleteError as exc:
trace "unable to read varint", exc = exc.msg
raise exc
proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
let header = await conn.readMplexVarint()
let header = await conn.readVarint()
trace "read header varint", varint = header
let dataLenVarint = await conn.readMplexVarint()
trace "read data len varint", varint = dataLenVarint
if dataLenVarint.int > DefaultReadSize:
raise newInvalidVarintSizeException()
var data: seq[byte] = newSeq[byte](dataLenVarint.int)
if dataLenVarint.int > 0:
await conn.readExactly(addr data[0], dataLenVarint.int)
trace "read data", data = data.len
let data = await conn.readLp(MaxMsgSize)
trace "read data", dataLen = data.len, data = shortLog(data)
let msgType = header and 0x7
if msgType.int > ord(MessageType.ResetOut):
raise newInvalidMplexMsgType()
result = (uint64(header shr 3), MessageType(msgType), data)
result = (header shr 3, MessageType(msgType), data)
proc writeMsg*(conn: Connection,
id: uint64,

View File

@ -9,6 +9,7 @@
import chronos
# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream
const MaxMsgSize* = 1 shl 20 # 1mb
const MaxChannels* = 1000
const MplexCodec* = "/mplex/6.7.0"

View File

@ -124,7 +124,7 @@ proc identify*(p: Identify,
conn: Connection,
remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} =
trace "initiating identify"
var message = await conn.readLp()
var message = await conn.readLp(64*1024)
if len(message) == 0:
trace "identify: Invalid or empty message received!"
raise newException(IdentityInvalidMsgError,

View File

@ -56,7 +56,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
try:
while not conn.closed:
trace "waiting for data", peer = p.id, closed = conn.closed
let data = await conn.readLp()
let data = await conn.readLp(64 * 1024)
let hexData = data.toHex()
trace "read data from peer", peer = p.id, data = data.shortLog
if $hexData.hash in p.recvdRpcCache:

View File

@ -8,7 +8,9 @@
## those terms.
import oids
import chronicles, chronos
import chronicles, chronos, strformat
import ../varint,
../vbuffer
type
LPStream* = ref object of RootObj
@ -27,6 +29,9 @@ type
par*: ref Exception
LPStreamEOFError* = object of LPStreamError
InvalidVarintError* = object of LPStreamError
MaxSizeError* = object of LPStreamError
proc newLPStreamReadError*(p: ref Exception): ref Exception =
var w = newException(LPStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
@ -109,6 +114,45 @@ proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, de
except LPStreamIncompleteError, LPStreamReadError:
discard # EOF, in which case we should return whatever we read so far..
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
var
varint: uint64
length: int
buffer: array[10, byte]
for i in 0..<len(buffer):
await conn.readExactly(addr buffer[i], 1)
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
if res == VarintStatus.Success:
return varint
if res != VarintStatus.Incomplete:
break
if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
## read length prefixed msg, with the length encoded as a varint
let
length = await s.readVarint()
maxLen = uint64(if maxSize < 0: int.high else: maxSize)
if length > maxLen:
raise (ref MaxSizeError)(msg: "Message exceeds maximum length")
if length == 0:
return
var res = newSeq[byte](length)
await s.readExactly(addr res[0], res.len)
return res
proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} =
## write length prefixed
var buf = initVBuffer()
buf.writeSeq(msg)
buf.finish()
s.write(buf.buffer)
method write*(s: LPStream, msg: seq[byte]) {.base, async.} =
doAssert(false, "not implemented!")

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.used.}
import unittest, sequtils, options, tables, sets
import chronos
import utils,

View File

@ -1,5 +1,7 @@
include ../../libp2p/protocols/pubsub/gossipsub
{.used.}
import unittest
import ../../libp2p/errors
import ../../libp2p/stream/bufferstream

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.used.}
import unittest, sequtils, options, tables, sets
import chronos
import chronicles

View File

@ -1,3 +1,5 @@
{.used.}
import options, sets, sequtils
import unittest
import ../../libp2p/[peer,

View File

@ -55,7 +55,7 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} =
if res == VarintStatus.Success:
break
if res != VarintStatus.Success:
raise newInvalidVarintException()
raise (ref InvalidVarintError)()
result.setLen(size)
if size > 0.uint:
await s.readExactly(addr result[0], int(size))
@ -209,11 +209,11 @@ suite "Interop":
daemonPeer.addresses),
protos[0])
await conn.writeLp("test 1")
check "test 2" == cast[string]((await conn.readLp()))
check "test 2" == cast[string]((await conn.readLp(1024)))
await sleepAsync(10.millis)
await conn.writeLp("test 3")
check "test 4" == cast[string]((await conn.readLp()))
check "test 4" == cast[string]((await conn.readLp(1024)))
await wait(testFuture, 10.secs)
await nativeNode.stop()
@ -271,7 +271,7 @@ suite "Interop":
var testFuture = newFuture[string]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
var line = cast[string](await conn.readLp())
var line = cast[string](await conn.readLp(1024))
check line == test
testFuture.complete(line)
await conn.close()
@ -306,10 +306,10 @@ suite "Interop":
var testFuture = newFuture[void]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
check "test 1" == cast[string](await conn.readLp())
check "test 1" == cast[string](await conn.readLp(1024))
await conn.writeLp(cast[seq[byte]]("test 2"))
check "test 3" == cast[string](await conn.readLp())
check "test 3" == cast[string](await conn.readLp(1024))
await conn.writeLp(cast[seq[byte]]("test 4"))
testFuture.complete()
@ -355,7 +355,7 @@ suite "Interop":
var testFuture = newFuture[int]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
while count < 10:
var line = cast[string](await conn.readLp())
var line = cast[string](await conn.readLp(1024))
check line == test
await conn.writeLp(cast[seq[byte]](test))
count.inc()

View File

@ -153,7 +153,7 @@ suite "Mplex":
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
let msg = await stream.readLp(1024)
check cast[string](msg) == "Hello from stream!"
await stream.close()
done.complete()
@ -200,7 +200,7 @@ suite "Mplex":
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
let msg = await stream.readLp(1024)
check cast[string](msg) == "Hello from stream!"
await stream.close()
done.complete()
@ -251,7 +251,7 @@ suite "Mplex":
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
defer:
await stream.close()
let msg = await stream.readLp()
let msg = await stream.readLp(MaxMsgSize)
check msg == bigseq
trace "Bigseq check passed!"
listenJob.complete()
@ -312,7 +312,7 @@ suite "Mplex":
let mplexDial = newMplex(conn)
let dialFut = mplexDial.handle()
let stream = await mplexDial.newStream("DIALER")
let msg = cast[string](await stream.readLp())
let msg = cast[string](await stream.readLp(1024))
check msg == "Hello from stream!"
# await dialFut
@ -339,7 +339,7 @@ suite "Mplex":
var listenConn: Connection
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
let msg = await stream.readLp(1024)
check cast[string](msg) == &"stream {count}!"
count.inc
await stream.close()
@ -386,7 +386,7 @@ suite "Mplex":
proc connHandler(conn: Connection) {.async, gcsafe.} =
listenConn = conn
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
let msg = await stream.readLp(1024)
check cast[string](msg) == &"stream {count} from dialer!"
await stream.writeLp(&"stream {count} from listener!")
count.inc
@ -411,7 +411,7 @@ suite "Mplex":
for i in 1..10:
let stream = await mplexDial.newStream("dialer stream")
await stream.writeLp(&"stream {i} from dialer!")
let msg = await stream.readLp()
let msg = await stream.readLp(1024)
check cast[string](msg) == &"stream {i} from listener!"
await stream.close()
@ -473,7 +473,7 @@ suite "Mplex":
const MsgSize = 1024
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
await stream.close()
complete.complete()
@ -542,7 +542,7 @@ suite "Mplex":
const MsgSize = 512
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
await stream.close()
complete.complete()

View File

@ -269,7 +269,7 @@ suite "Multistream select":
check (await msDial.select(conn, "/test/proto/1.0.0")) == true
let hello = cast[string](await conn.readLp())
let hello = cast[string](await conn.readLp(1024))
result = hello == "Hello!"
await conn.close()
@ -358,7 +358,7 @@ suite "Multistream select":
check (await msDial.select(conn,
@["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0"
let hello = cast[string](await conn.readLp())
let hello = cast[string](await conn.readLp(1024))
result = hello == "Hello!"
await conn.close()
@ -397,7 +397,7 @@ suite "Multistream select":
check (await msDial.select(conn, @["/test/proto2/1.0.0", "/test/proto1/1.0.0"])) == "/test/proto2/1.0.0"
result = cast[string](await conn.readLp()) == "Hello from /test/proto2/1.0.0!"
result = cast[string](await conn.readLp(1024)) == "Hello from /test/proto2/1.0.0!"
await conn.close()
await transport2.close()

View File

@ -41,7 +41,7 @@ type
method init(p: TestProto) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp())
let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
await conn.close()
@ -166,7 +166,7 @@ suite "Noise":
let sconn = await serverNoise.secure(conn, false)
defer:
await sconn.close()
let msg = await sconn.readLp()
let msg = await sconn.readLp(1024*1024)
check msg == hugePayload
readTask.complete()
@ -214,7 +214,7 @@ suite "Noise":
awaiters.add(await switch2.start())
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
await conn.writeLp("Hello!")
let msg = cast[string](await conn.readLp())
let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg
await allFuturesThrowing(switch1.stop(), switch2.stop())

View File

@ -68,7 +68,7 @@ suite "Switch":
let done = newFuture[void]()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp())
let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
await conn.close()
@ -87,7 +87,7 @@ suite "Switch":
try:
await conn.writeLp("Hello!")
let msg = cast[string](await conn.readLp())
let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg
result = true
except LPStreamError:
@ -118,7 +118,7 @@ suite "Switch":
(switch1, peerInfo1) = createSwitch(ma1)
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp())
let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
await conn.close()
@ -136,7 +136,7 @@ suite "Switch":
try:
await conn.writeLp("Hello!")
let msg = cast[string](await conn.readLp())
let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg
result = true
except LPStreamError: