wip: multistream select rework

This commit is contained in:
Dmitriy Ryajov 2020-04-16 23:26:44 -06:00
parent 6ff6833d26
commit 13f299fb0c

View File

@ -7,8 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import strutils import strutils, sequtils
import chronos, chronicles import chronos, chronicles, stew/byteutils
import vbuffer, import vbuffer,
protocols/protocol, protocols/protocol,
streams/[connection, streams/[connection,
@ -37,9 +37,10 @@ type
MultistreamSelect* = object MultistreamSelect* = object
handlers*: seq[HandlerHolder] handlers*: seq[HandlerHolder]
codec*: string codec*: seq[byte]
na: string na*: seq[byte]
ls: string ls*: seq[byte]
lp: LenPrefixed
MultistreamHandshakeException* = object of CatchableError MultistreamHandshakeException* = object of CatchableError
@ -47,27 +48,30 @@ proc newMultistreamHandshakeException*(): ref Exception {.inline.} =
result = newException(MultistreamHandshakeException, result = newException(MultistreamHandshakeException,
"could not perform multistream handshake") "could not perform multistream handshake")
var appendNl: Through[seq[byte]] = proc (i: Source[seq[byte]]): Source[seq[byte]] {.gcsafe.} =
proc append(item: Future[seq[byte]]): Future[seq[byte]] {.async.} = proc append(item: Future[seq[byte]]): Future[seq[byte]] {.async.} =
result = await item result = await item
result.add(byte('\n')) result.add(byte('\n'))
var appendNl: Through[seq[byte]] = proc (i: Source[seq[byte]]): Source[seq[byte]] {.gcsafe.} =
return iterator(): Future[seq[byte]] {.closure.} = return iterator(): Future[seq[byte]] {.closure.} =
for item in i: for item in i:
yield append(item) yield append(item)
var stripNl: Through[seq[byte]] = proc (i: Source[seq[byte]]): Source[seq[byte]] {.gcsafe.} =
proc strip(item: Future[seq[byte]]): Future[seq[byte]] {.async.} = proc strip(item: Future[seq[byte]]): Future[seq[byte]] {.async.} =
result = await item result = await item
if result[^1] == byte('\n'): if result.len > 0 and result[^1] == byte('\n'):
result.setLen(result.high) result.setLen(result.high)
var stripNl: Through[seq[byte]] = proc (i: Source[seq[byte]]): Source[seq[byte]] {.gcsafe.} =
return iterator(): Future[seq[byte]] {.closure.} = return iterator(): Future[seq[byte]] {.closure.} =
for item in i: for item in i:
yield strip(item) yield strip(item)
proc init*(M: type[MultistreamSelect]): MultistreamSelect = proc init*(M: type[MultistreamSelect]): MultistreamSelect =
M(codec: Codec, ls: Ls, na: Na) M(codec: toSeq(Codec).mapIt( it.byte ),
ls: Ls.toBytes(),
na: Na.toBytes(),
lp: LenPrefixed.init())
proc select*(m: MultistreamSelect, proc select*(m: MultistreamSelect,
conn: Connection, conn: Connection,
@ -75,29 +79,26 @@ proc select*(m: MultistreamSelect,
Future[string] {.async.} = Future[string] {.async.} =
trace "initiating handshake", codec = m.codec trace "initiating handshake", codec = m.codec
var pushable = Pushable[seq[byte]].init() # pushable source var pushable = Pushable[seq[byte]].init() # pushable source
var lp = LenPrefixed.init()
var sink = pipe(pushable, var source = pipe(pushable,
appendNl, appendNl,
lp.encoder, m.lp.encoder,
conn) conn.toThrough,
m.lp.decoder,
let source = pipe(conn,
lp.decoder,
stripNl) stripNl)
# handshake first # handshake first
await pushable.push(cast[seq[byte]](m.codec)) await pushable.push(m.codec)
# (common optimization) if we've got # (common optimization) if we've got
# protos send the first one out immediately # protos send the first one out immediately
# without waiting for the handshake response # without waiting for the handshake response
if protos.len > 0: if protos.len > 0:
await pushable.push(cast[seq[byte]](protos[0])) await pushable.push(protos[0].toBytes())
# check for handshake result # check for handshake result
result = cast[string](await source()) var res = await source()
if result != m.codec: if res != m.codec:
error "handshake failed", codec = result.toHex() error "handshake failed", codec = result.toHex()
raise newMultistreamHandshakeException() raise newMultistreamHandshakeException()
@ -106,18 +107,17 @@ proc select*(m: MultistreamSelect,
while i < protos.len: while i < protos.len:
# first read because we've the outstanding requirest above # first read because we've the outstanding requirest above
trace "reading requested proto" trace "reading requested proto"
for chunk in source: res = await source()
result = cast[string](await chunk)
if result == protos[i]: var protoBytes = protos[i].toBytes()
trace "succesfully selected ", proto = proto if res == protoBytes:
break trace "succesfully selected ", proto = protos[i]
return protos[i]
if i > 0: if i > 0:
trace "selecting proto", proto = proto trace "selecting proto", proto = protos[i]
await pushable.push(cast[seq[byte]](protos[i])) # select proto await pushable.push(protoBytes) # select proto
i.inc() i.inc()
await sink
proc select*(m: MultistreamSelect, proc select*(m: MultistreamSelect,
conn: Connection, conn: Connection,
@ -130,92 +130,98 @@ proc select*(m: MultistreamSelect,
proc select*(m: MultistreamSelect, conn: Connection): Future[bool] = proc select*(m: MultistreamSelect, conn: Connection): Future[bool] =
m.select(conn, "") m.select(conn, "")
# proc list*(m: MultistreamSelect, proc list*(m: MultistreamSelect,
# conn: Connection): Future[seq[string]] {.async.} = conn: Connection): Future[seq[string]] {.async.} =
# ## list remote protos requests on connection ## list remote protos requests on connection
# if not await m.select(conn): if not await m.select(conn):
# return return
# await conn.write(m.ls) # send ls var pushable = Pushable[seq[byte]].init()
var source = pipe(pushable,
appendNl,
m.lp.encoder,
conn.toThrough,
m.lp.decoder,
stripNl)
# var list = newSeq[string]() await pushable.push(m.ls) # send ls
# let ms = cast[string]((await conn.readLp()))
# for s in ms.split("\n"):
# if s.len() > 0:
# list.add(s)
# result = list var list = newSeq[string]()
for chunk in source:
var msg = string.fromBytes((await chunk))
for s in msg.split("\n"):
if s.len() > 0:
list.add(s)
# proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = result = list
# trace "handle: starting multistream handling"
# try:
# while not conn.closed:
# var ms = cast[string]((await conn.readLp()))
# ms.removeSuffix("\n")
# trace "handle: got request for ", ms proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
# if ms.len() <= 0: trace "handle: starting multistream handling"
# trace "handle: invalid proto" try:
# await conn.write(m.na) var pushable = Pushable[seq[byte]].init()
var source = pipe(pushable,
appendNl,
m.lp.encoder,
conn.toThrough,
m.lp.decoder,
stripNl)
# if m.handlers.len() == 0: for chunk in source:
# trace "handle: sending `na` for protocol ", protocol = ms var msg = string.fromBytes((await chunk))
# await conn.write(m.na) trace "got request for ", msg
# continue if msg.len <= 0:
trace "invalid proto"
await pushable.push(m.na)
# case ms: if m.handlers.len() == 0:
# of "ls": trace "sending `na` for protocol ", protocol = msg
# trace "handle: listing protos" await pushable.push(m.na)
# var protos = "" continue
# for h in m.handlers:
# protos &= (h.proto & "\n")
# await conn.writeLp(protos)
# of Codec:
# await conn.write(m.codec)
# else:
# for h in m.handlers:
# if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
# trace "found handler for", protocol = ms
# await conn.writeLp((h.proto & "\n"))
# try:
# await h.protocol.handler(conn, ms)
# return
# except CatchableError as exc:
# warn "exception while handling", msg = exc.msg
# return
# warn "no handlers for ", protocol = ms
# await conn.write(m.na)
# except CatchableError as exc:
# trace "Exception occurred", exc = exc.msg
# finally:
# trace "leaving multistream loop"
# proc addHandler*[T: LPProtocol](m: MultistreamSelect, case msg:
# codec: string, of Ls:
# protocol: T, trace "listing protos"
# matcher: Matcher = nil) = for h in m.handlers:
# ## register a protocol await pushable.push(h.proto.toBytes())
# # TODO: This is a bug in chronicles, of Codec:
# # it break if I uncoment this line. trace "handling handshake"
# # Which is almost the same as the await pushable.push(m.codec)
# # one on the next override of addHandler else:
# # for h in m.handlers:
# # trace "registering protocol", codec = codec if (not isNil(h.match) and h.match(msg)) or msg == h.proto:
# m.handlers.add(HandlerHolder(proto: codec, trace "found handler for", protocol = msg
# protocol: protocol, await pushable.push(h.proto.toBytes())
# match: matcher)) try:
await h.protocol.handler(conn, msg)
return
except CatchableError as exc:
warn "exception while handling", msg = exc.msg
return
warn "no handlers for ", protocol = msg
await pushable.push(m.na)
except CatchableError as exc:
trace "Exception occurred", exc = exc.msg
finally:
trace "leaving multistream loop"
# proc addHandler*[T: LPProtoHandler](m: MultistreamSelect, proc addHandler*(m: var MultistreamSelect,
# codec: string, codec: string,
# handler: T, protocol: LPProtocol,
# matcher: Matcher = nil) = matcher: Matcher = nil) =
# ## helper to allow registering pure handlers ## register a protocol
trace "registering protocol", codec = codec
m.handlers.add(HandlerHolder(proto: codec,
protocol: protocol,
match: matcher))
# trace "registering proto handler", codec = codec proc addHandler*(m: var MultistreamSelect,
# let protocol = new LPProtocol codec: string,
# protocol.codec = codec handler: LPProtoHandler,
# protocol.handler = handler matcher: Matcher = nil) =
## helper to allow registering pure handlers
# m.handlers.add(HandlerHolder(proto: codec, trace "registering proto handler", codec = codec
# protocol: protocol, let protocol = LPProtocol(codec: codec, handler: handler)
# match: matcher)) m.handlers.add(HandlerHolder(proto: codec,
protocol: protocol,
match: matcher))