mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-01-13 18:27:10 +00:00
fix: multistream
This commit is contained in:
parent
5237fc8b12
commit
6f8de062bb
@ -43,9 +43,9 @@ proc select*(m: MultisteamSelect,
|
|||||||
proto: seq[string]):
|
proto: seq[string]):
|
||||||
Future[string] {.async.} =
|
Future[string] {.async.} =
|
||||||
## select a remote protocol
|
## select a remote protocol
|
||||||
await conn.write(m.codec) # write handshake
|
await conn.write(m.codec) # write handshake
|
||||||
if proto.len() > 0:
|
if proto.len() > 0:
|
||||||
await conn.writeLp(proto[0]) # select proto
|
await conn.writeLp((proto[0] & "\n")) # select proto
|
||||||
|
|
||||||
var ms = cast[string](await conn.readLp()) # read ms header
|
var ms = cast[string](await conn.readLp()) # read ms header
|
||||||
ms.removeSuffix("\n")
|
ms.removeSuffix("\n")
|
||||||
@ -53,7 +53,7 @@ proc select*(m: MultisteamSelect,
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
if proto.len() == 0: # no protocols, must be a handshake call
|
if proto.len() == 0: # no protocols, must be a handshake call
|
||||||
return ""
|
return ms
|
||||||
|
|
||||||
ms = cast[string](await conn.readLp()) # read the first proto
|
ms = cast[string](await conn.readLp()) # read the first proto
|
||||||
ms.removeSuffix("\n")
|
ms.removeSuffix("\n")
|
||||||
@ -62,7 +62,7 @@ proc select*(m: MultisteamSelect,
|
|||||||
|
|
||||||
if not result.len > 0:
|
if not result.len > 0:
|
||||||
for p in proto[1..<proto.len()]:
|
for p in proto[1..<proto.len()]:
|
||||||
await conn.writeLp(p) # select proto
|
await conn.writeLp((p & "\n")) # select proto
|
||||||
ms = cast[string](await conn.readLp()) # read the first proto
|
ms = cast[string](await conn.readLp()) # read the first proto
|
||||||
ms.removeSuffix("\n")
|
ms.removeSuffix("\n")
|
||||||
if ms == p:
|
if ms == p:
|
||||||
@ -81,7 +81,7 @@ proc select*(m: MultisteamSelect,
|
|||||||
proc list*(m: MultisteamSelect,
|
proc list*(m: MultisteamSelect,
|
||||||
conn: Connection): Future[seq[string]] {.async.} =
|
conn: Connection): Future[seq[string]] {.async.} =
|
||||||
## list remote protos requests on connection
|
## list remote protos requests on connection
|
||||||
if not (await m.select(conn)).len > 0:
|
if (await m.select(conn)).len == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
await conn.write(m.ls) # send ls
|
await conn.write(m.ls) # send ls
|
||||||
@ -96,7 +96,7 @@ proc list*(m: MultisteamSelect,
|
|||||||
|
|
||||||
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
|
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
|
||||||
## handle requests on connection
|
## handle requests on connection
|
||||||
if not (await m.select(conn)).len > 0:
|
if (await m.select(conn)).len == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
while not conn.closed:
|
while not conn.closed:
|
||||||
@ -118,7 +118,7 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
|
|||||||
else:
|
else:
|
||||||
for h in m.handlers:
|
for h in m.handlers:
|
||||||
if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
|
if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
|
||||||
await conn.writeLp(h.proto & "\n")
|
await conn.writeLp((h.proto & "\n"))
|
||||||
await h.protocol.handler(conn, ms)
|
await h.protocol.handler(conn, ms)
|
||||||
return
|
return
|
||||||
await conn.write(m.na)
|
await conn.write(m.na)
|
||||||
|
@ -189,7 +189,6 @@ suite "Multistream select":
|
|||||||
proc testHandler(conn: Connection,
|
proc testHandler(conn: Connection,
|
||||||
proto: string):
|
proto: string):
|
||||||
Future[void] {.async, gcsafe.} = discard
|
Future[void] {.async, gcsafe.} = discard
|
||||||
|
|
||||||
protocol.handler = testHandler
|
protocol.handler = testHandler
|
||||||
ms.addHandler("/test/proto1/1.0.0", protocol)
|
ms.addHandler("/test/proto1/1.0.0", protocol)
|
||||||
ms.addHandler("/test/proto2/1.0.0", protocol)
|
ms.addHandler("/test/proto2/1.0.0", protocol)
|
||||||
@ -255,7 +254,7 @@ suite "Multistream select":
|
|||||||
let transport2: TcpTransport = newTransport(TcpTransport)
|
let transport2: TcpTransport = newTransport(TcpTransport)
|
||||||
let conn = await transport2.dial(ma)
|
let conn = await transport2.dial(ma)
|
||||||
|
|
||||||
check (await msDial.select(conn, @["/test/proto/1.0.0"])) == "/test/proto/1.0.0"
|
check (await msDial.select(conn, "/test/proto/1.0.0")) == "/test/proto/1.0.0"
|
||||||
|
|
||||||
let hello = cast[string](await conn.readLp())
|
let hello = cast[string](await conn.readLp())
|
||||||
result = hello == "Hello!"
|
result = hello == "Hello!"
|
||||||
@ -273,6 +272,8 @@ suite "Multistream select":
|
|||||||
var peerInfo: PeerInfo
|
var peerInfo: PeerInfo
|
||||||
peerInfo.peerId = PeerID.init(seckey)
|
peerInfo.peerId = PeerID.init(seckey)
|
||||||
var protocol: LPProtocol = new LPProtocol
|
var protocol: LPProtocol = new LPProtocol
|
||||||
|
protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||||
|
await conn.close()
|
||||||
proc testHandler(conn: Connection,
|
proc testHandler(conn: Connection,
|
||||||
proto: string):
|
proto: string):
|
||||||
Future[void] {.async.} = discard
|
Future[void] {.async.} = discard
|
||||||
@ -283,7 +284,6 @@ suite "Multistream select":
|
|||||||
let transport1: TcpTransport = newTransport(TcpTransport)
|
let transport1: TcpTransport = newTransport(TcpTransport)
|
||||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
await msListen.handle(conn)
|
await msListen.handle(conn)
|
||||||
|
|
||||||
await transport1.listen(ma, connHandler)
|
await transport1.listen(ma, connHandler)
|
||||||
|
|
||||||
let msDial = newMultistream()
|
let msDial = newMultistream()
|
||||||
@ -298,7 +298,7 @@ suite "Multistream select":
|
|||||||
check:
|
check:
|
||||||
waitFor(endToEnd()) == true
|
waitFor(endToEnd()) == true
|
||||||
|
|
||||||
test "e2e - select one of one invalid":
|
test "e2e - select one from a list with unsupported protos":
|
||||||
proc endToEnd(): Future[bool] {.async.} =
|
proc endToEnd(): Future[bool] {.async.} =
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53352")
|
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53352")
|
||||||
|
|
||||||
@ -372,4 +372,4 @@ suite "Multistream select":
|
|||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
waitFor(endToEnd()) == true
|
waitFor(endToEnd()) == true
|
||||||
|
Loading…
x
Reference in New Issue
Block a user