From 307c76e1399b66f856520a6ef701d885cce931c0 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 23 Aug 2019 16:17:08 -0600 Subject: [PATCH] feat: implemented select --- libp2p/multistreamselect.nim | 67 ++++++++++++++++++++------------- tests/testmultistreamselect.nim | 59 +++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 26 deletions(-) create mode 100644 tests/testmultistreamselect.nim diff --git a/libp2p/multistreamselect.nim b/libp2p/multistreamselect.nim index be5f66102..c9769fa72 100644 --- a/libp2p/multistreamselect.nim +++ b/libp2p/multistreamselect.nim @@ -7,11 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils +import sequtils, strutils import chronos import connection, varint, vbuffer +const MsgSize* = 64*1024 const Codec* = "/multistream/1.0.0" +const MultiCodec* = Codec & "\n" const Na = "na\n" const Ls = "ls\n" @@ -32,44 +34,57 @@ type na*: seq[byte] ls*: seq[byte] -proc lp*(data: string): seq[byte] = - var buf = initVBuffer(newSeq[byte](256)) +proc lp*(data: string, s: var seq[byte]): int = + var buf = initVBuffer(s) buf.writeSeq(data) - var lpData = newSeq[byte](buf.length) - if buf.readSeq(lpData) < 0: - raise newException(MultisteamSelectException, "Error: failed to lenght prefix") - result = lpData + buf.finish() + s = buf.buffer proc newMultistream*(): MultisteamSelect = new result - result.codec = lp(Codec & "\n") - result.na = lp(Na) - result.ls = lp(Ls) + result.codec = newSeq[byte]() + discard lp(MultiCodec, result.codec) + + result.na = newSeq[byte]() + discard lp(Na, result.na) + + result.ls = newSeq[byte]() + discard lp(Ls, result.ls) proc select*(m: MultisteamSelect, conn: Connection, proto: string): Future[bool] {.async.} = ## select a remote protocol - await conn.write(m.codec) - await conn.write(lp(proto)) # select proto + await conn.write(m.codec) # write handshake + await conn.writeLp(proto) # select proto + var ms = cast[string](await conn.readLp()) + echo MultiCodec + if ms != MultiCodec: + raise newException(MultisteamSelectException, + "Error: invalid multistream codec " & "\"" & ms & "\"") - let ms = await conn.readLine(0, "\n") - if ms != Codec: - raise newException(MultisteamSelectException, "Error: invalid multistream codec" & $ms) + var msProto = cast[string](await conn.readLp()) + msProto.removeSuffix("\n") + result = msProto == proto - let remote = await conn.readLine(0, "\n") - result = remote == proto - -proc ls*(m: MultisteamSelect): seq[string] {.async.} = +proc ls*(m: MultisteamSelect): Future[seq[string]] {.async.} = ## list all remote protocol strings discard -proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = - ## handle requests on connection - discard +# proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = +# ## handle requests on connection +# await conn.write(m.codec) -proc addHandle*(m: MultisteamSelect, - proto: string, - handler: Handler, - matcher: Matcher = nil) = +# let ms = await conn.readLine(0, "\n") +# if ms != MultiCodec: +# raise newException(MultisteamSelectException, +# "Error: invalid multistream codec " & "\"" & $ms & "\"") + +# let ms = await conn.readLine(0, "\n") + + +proc addHandler*(m: MultisteamSelect, + proto: string, + handler: Handler, + matcher: Matcher = nil) = ## register a handler for the protocol m.handlers.add(HandlerHolder(proto: proto, handler: handler, diff --git a/tests/testmultistreamselect.nim b/tests/testmultistreamselect.nim new file mode 100644 index 000000000..17dce0475 --- /dev/null +++ b/tests/testmultistreamselect.nim @@ -0,0 +1,59 @@ +import unittest, strutils, sequtils +import chronos +import ../libp2p/connection, ../libp2p/multistreamselect, + ../libp2p/readerwriter, ../libp2p/connection + +# custom select stream +type + TestStream = ref object of ReadWrite + step*: int + +method readExactly*(s: TestStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = + case s.step: + of 1: + var buf = newSeq[byte](1) + buf[0] = 19 + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + s.step = 2 + of 2: + var buf = MultiCodec + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + s.step = 3 + of 3: + var buf = newSeq[byte](1) + buf[0] = 18 + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + s.step = 4 + of 4: + var buf = "/test/proto/1.0.0\n" + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + else: + copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) + +proc newTestStream(): TestStream = + new result + result.step = 1 + +suite "Multistream select": + test "test select": + proc testSelect(): Future[bool] {.async.} = + let ms = newMultistream() + let conn = newConnection(newTestStream()) + result = await ms.select(conn, "/test/proto/1.0.0") + + check: + waitFor(testSelect()) == true + + # test "test handle": + # proc testHandle(): Future[bool] {.async.} = + # let ms = newMultistream() + # let conn = newConnection(newTestStream()) + + # proc testHandler(conn: Connection, proto: string): Future[void] = + # check proto == "/test/proto/1.0.0" + + # ms.addHandler("/test/proto/1.0.0", testHandler) + # await ms.handle(conn) + + # check: + # waitFor(testHandle()) == true \ No newline at end of file