From e13f42f9bbcc2d25303f44560affb3d30eda4922 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 23 Aug 2019 17:54:30 -0600 Subject: [PATCH] wip: initial multistream handler implementation --- libp2p/multistreamselect.nim | 79 ++++++++++++++++---------------- tests/testmultistreamselect.nim | 80 +++++++++++++++++++++++---------- 2 files changed, 97 insertions(+), 62 deletions(-) diff --git a/libp2p/multistreamselect.nim b/libp2p/multistreamselect.nim index c9769fa72..13dba5f79 100644 --- a/libp2p/multistreamselect.nim +++ b/libp2p/multistreamselect.nim @@ -13,9 +13,9 @@ import connection, varint, vbuffer const MsgSize* = 64*1024 const Codec* = "/multistream/1.0.0" -const MultiCodec* = Codec & "\n" -const Na = "na\n" -const Ls = "ls\n" +const MultiCodec* = "\x19" & Codec & "\n" +const Na = "\x3na\n" +const Ls = "\x3ls\n" type MultisteamSelectException = object of CatchableError @@ -30,56 +30,59 @@ type MultisteamSelect* = ref object of RootObj handlers*: seq[HandlerHolder] - codec*: seq[byte] - na*: seq[byte] - ls*: seq[byte] - -proc lp*(data: string, s: var seq[byte]): int = - var buf = initVBuffer(s) - buf.writeSeq(data) - buf.finish() - s = buf.buffer + codec*: string + na*: string + ls*: string proc newMultistream*(): MultisteamSelect = new result - result.codec = newSeq[byte]() - discard lp(MultiCodec, result.codec) + result.codec = MultiCodec + result.ls = Ls + result.na = Na - result.na = newSeq[byte]() - discard lp(Na, result.na) - - result.ls = newSeq[byte]() - discard lp(Ls, result.ls) - -proc select*(m: MultisteamSelect, conn: Connection, proto: string): Future[bool] {.async.} = +proc select*(m: MultisteamSelect, conn: Connection, proto: string = ""): Future[bool] {.async.} = ## select a remote protocol + ## TODO: select should support a list of protos to be selected + await conn.write(m.codec) # write handshake await conn.writeLp(proto) # select proto var ms = cast[string](await conn.readLp()) - echo MultiCodec - if ms != MultiCodec: + ms.removeSuffix("\n") + if ms != Codec: raise newException(MultisteamSelectException, "Error: invalid multistream codec " & "\"" & ms & "\"") - var msProto = cast[string](await conn.readLp()) - msProto.removeSuffix("\n") - result = msProto == proto + if proto.len() <= 0: + return true -proc ls*(m: MultisteamSelect): Future[seq[string]] {.async.} = - ## list all remote protocol strings - discard + ms = cast[string](await conn.readLp()) + ms.removeSuffix("\n") + result = ms == proto -# proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = -# ## handle requests on connection -# await conn.write(m.codec) - -# let ms = await conn.readLine(0, "\n") -# if ms != MultiCodec: -# raise newException(MultisteamSelectException, -# "Error: invalid multistream codec " & "\"" & $ms & "\"") +proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = + ## handle requests on connection + if not (await m.select(conn)): + return -# let ms = await conn.readLine(0, "\n") + block handleLoop: + var ms = cast[string](await conn.readLp()) + ms.removeSuffix("\n") + if ms.len() <= 0: + await conn.writeLp(Na) + case ms: + of "ls": + for h in m.handlers: + await conn.writeLp(h.proto) + break handleLoop + else: + for h in m.handlers: + if (not isNil(h.match) and h.match(ms)) or ms == h.proto: + await h.handler(conn, ms) + break + else: + await conn.write(Na) + break handleLoop proc addHandler*(m: MultisteamSelect, proto: string, diff --git a/tests/testmultistreamselect.nim b/tests/testmultistreamselect.nim index 17dce0475..49f027134 100644 --- a/tests/testmultistreamselect.nim +++ b/tests/testmultistreamselect.nim @@ -3,12 +3,12 @@ import chronos import ../libp2p/connection, ../libp2p/multistreamselect, ../libp2p/readerwriter, ../libp2p/connection -# custom select stream +## Stream for select test type - TestStream = ref object of ReadWrite + TestSelectStream = ref object of ReadWrite step*: int -method readExactly*(s: TestStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = +method readExactly*(s: TestSelectStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = case s.step: of 1: var buf = newSeq[byte](1) @@ -16,7 +16,7 @@ method readExactly*(s: TestStream, pbytes: pointer, nbytes: int): Future[void] { copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) s.step = 2 of 2: - var buf = MultiCodec + var buf = "/multistream/1.0.0\n" copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) s.step = 3 of 3: @@ -30,30 +30,62 @@ method readExactly*(s: TestStream, pbytes: pointer, nbytes: int): Future[void] { else: copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) -proc newTestStream(): TestStream = +proc newTestSelectStream(): TestSelectStream = + new result + result.step = 1 + +## Stream for handles test +type + TestHandlesStream = ref object of ReadWrite + step*: int + +method readExactly*(s: TestHandlesStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = + case s.step: + of 1: + var buf = newSeq[byte](1) + buf[0] = 19 + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + s.step = 2 + of 2: + var buf = "/multistream/1.0.0\n" + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + s.step = 3 + of 3: + var buf = newSeq[byte](1) + buf[0] = 18 + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + s.step = 4 + of 4: + var buf = "/test/proto/1.0.0\n" + copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) + else: + copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) + +proc newTestHandlesStream(): TestHandlesStream = new result result.step = 1 suite "Multistream select": - test "test select": - proc testSelect(): Future[bool] {.async.} = - let ms = newMultistream() - let conn = newConnection(newTestStream()) - result = await ms.select(conn, "/test/proto/1.0.0") - - check: - waitFor(testSelect()) == true - - # test "test handle": - # proc testHandle(): Future[bool] {.async.} = + # test "test select custom proto": + # proc testSelect(): Future[bool] {.async.} = # let ms = newMultistream() - # let conn = newConnection(newTestStream()) - - # proc testHandler(conn: Connection, proto: string): Future[void] = - # check proto == "/test/proto/1.0.0" - - # ms.addHandler("/test/proto/1.0.0", testHandler) - # await ms.handle(conn) + # let conn = newConnection(newTestSelectStream()) + # result = await ms.select(conn, "/test/proto/1.0.0") # check: - # waitFor(testHandle()) == true \ No newline at end of file + # waitFor(testSelect()) == true + + test "test handle custom proto": + proc testHandle(): Future[bool] {.async.} = + let ms = newMultistream() + let conn = newConnection(newTestHandlesStream()) + + proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = + check proto == "/test/proto/1.0.0" + + ms.addHandler("/test/proto/1.0.0", testHandler) + await ms.handle(conn) + result = true + + check: + waitFor(testHandle()) == true \ No newline at end of file