feat: implemented select

This commit is contained in:
Dmitriy Ryajov 2019-08-23 16:17:08 -06:00
parent f23106029f
commit 307c76e139
2 changed files with 100 additions and 26 deletions

View File

@ -7,11 +7,13 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import sequtils import sequtils, strutils
import chronos import chronos
import connection, varint, vbuffer import connection, varint, vbuffer
const MsgSize* = 64*1024
const Codec* = "/multistream/1.0.0" const Codec* = "/multistream/1.0.0"
const MultiCodec* = Codec & "\n"
const Na = "na\n" const Na = "na\n"
const Ls = "ls\n" const Ls = "ls\n"
@ -32,44 +34,57 @@ type
na*: seq[byte] na*: seq[byte]
ls*: seq[byte] ls*: seq[byte]
proc lp*(data: string): seq[byte] = proc lp*(data: string, s: var seq[byte]): int =
var buf = initVBuffer(newSeq[byte](256)) var buf = initVBuffer(s)
buf.writeSeq(data) buf.writeSeq(data)
var lpData = newSeq[byte](buf.length) buf.finish()
if buf.readSeq(lpData) < 0: s = buf.buffer
raise newException(MultisteamSelectException, "Error: failed to lenght prefix")
result = lpData
proc newMultistream*(): MultisteamSelect = proc newMultistream*(): MultisteamSelect =
new result new result
result.codec = lp(Codec & "\n") result.codec = newSeq[byte]()
result.na = lp(Na) discard lp(MultiCodec, result.codec)
result.ls = lp(Ls)
result.na = newSeq[byte]()
discard lp(Na, result.na)
result.ls = newSeq[byte]()
discard lp(Ls, result.ls)
proc select*(m: MultisteamSelect, conn: Connection, proto: string): Future[bool] {.async.} = proc select*(m: MultisteamSelect, conn: Connection, proto: string): Future[bool] {.async.} =
## select a remote protocol ## select a remote protocol
await conn.write(m.codec) await conn.write(m.codec) # write handshake
await conn.write(lp(proto)) # select proto await conn.writeLp(proto) # select proto
var ms = cast[string](await conn.readLp())
echo MultiCodec
if ms != MultiCodec:
raise newException(MultisteamSelectException,
"Error: invalid multistream codec " & "\"" & ms & "\"")
let ms = await conn.readLine(0, "\n") var msProto = cast[string](await conn.readLp())
if ms != Codec: msProto.removeSuffix("\n")
raise newException(MultisteamSelectException, "Error: invalid multistream codec" & $ms) result = msProto == proto
let remote = await conn.readLine(0, "\n") proc ls*(m: MultisteamSelect): Future[seq[string]] {.async.} =
result = remote == proto
proc ls*(m: MultisteamSelect): seq[string] {.async.} =
## list all remote protocol strings ## list all remote protocol strings
discard discard
proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = # proc handle*(m: MultisteamSelect, conn: Connection) {.async.} =
## handle requests on connection # ## handle requests on connection
discard # await conn.write(m.codec)
proc addHandle*(m: MultisteamSelect, # let ms = await conn.readLine(0, "\n")
proto: string, # if ms != MultiCodec:
handler: Handler, # raise newException(MultisteamSelectException,
matcher: Matcher = nil) = # "Error: invalid multistream codec " & "\"" & $ms & "\"")
# let ms = await conn.readLine(0, "\n")
proc addHandler*(m: MultisteamSelect,
proto: string,
handler: Handler,
matcher: Matcher = nil) =
## register a handler for the protocol ## register a handler for the protocol
m.handlers.add(HandlerHolder(proto: proto, m.handlers.add(HandlerHolder(proto: proto,
handler: handler, handler: handler,

View File

@ -0,0 +1,59 @@
import unittest, strutils, sequtils
import chronos
import ../libp2p/connection, ../libp2p/multistreamselect,
../libp2p/readerwriter, ../libp2p/connection
# custom select stream
type
TestStream = ref object of ReadWrite
step*: int
method readExactly*(s: TestStream, pbytes: pointer, nbytes: int): Future[void] {.async.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
s.step = 2
of 2:
var buf = MultiCodec
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
s.step = 3
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
s.step = 4
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
else:
copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len())
proc newTestStream(): TestStream =
new result
result.step = 1
suite "Multistream select":
test "test select":
proc testSelect(): Future[bool] {.async.} =
let ms = newMultistream()
let conn = newConnection(newTestStream())
result = await ms.select(conn, "/test/proto/1.0.0")
check:
waitFor(testSelect()) == true
# test "test handle":
# proc testHandle(): Future[bool] {.async.} =
# let ms = newMultistream()
# let conn = newConnection(newTestStream())
# proc testHandler(conn: Connection, proto: string): Future[void] =
# check proto == "/test/proto/1.0.0"
# ms.addHandler("/test/proto/1.0.0", testHandler)
# await ms.handle(conn)
# check:
# waitFor(testHandle()) == true