reworked protocol

This commit is contained in:
Dmitriy Ryajov 2019-08-30 17:45:57 -06:00
parent 22dd8c0f6b
commit 022a248ca3
5 changed files with 65 additions and 47 deletions

View File

@ -86,6 +86,13 @@ proc decodeMsg*(buf: seq[byte]): IdentifyInfo =
discard pb.getString(6, agentVersion) discard pb.getString(6, agentVersion)
result.agentVersion = agentVersion result.agentVersion = agentVersion
method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
await conn.writeLp(pb.buffer)
p.handler = handle
proc identify*(p: Identify, conn: Connection): Future[IdentifyInfo] {.async.} = proc identify*(p: Identify, conn: Connection): Future[IdentifyInfo] {.async.} =
var message = await conn.readLp() var message = await conn.readLp()
if len(message) == 0: if len(message) == 0:
@ -97,7 +104,3 @@ proc push*(p: Identify, conn: Connection) {.async.} =
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs()) var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
let length = pb.getLen() let length = pb.getLen()
await conn.writeLp(pb.buffer) await conn.writeLp(pb.buffer)
proc handle*(p: Identify, conn: Connection, proto: string) {.async, gcsafe.} =
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
await conn.writeLp(pb.buffer)

View File

@ -24,7 +24,6 @@ type
HandlerHolder* = object HandlerHolder* = object
proto: string proto: string
protocol: LPProtocol protocol: LPProtocol
handler: LPProtoHandler
match: Matcher match: Matcher
MultisteamSelect* = ref object of RootObj MultisteamSelect* = ref object of RootObj
@ -102,17 +101,15 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
for h in m.handlers: for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or ms == h.proto: if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
await conn.writeLp(h.proto & "\n") await conn.writeLp(h.proto & "\n")
await h.handler(h.protocol, conn, ms) await h.protocol.handler(conn, ms)
return return
await conn.write(m.na) await conn.write(m.na)
proc addHandler*[T: LPProtocol](m: MultisteamSelect, proc addHandler*[T: LPProtocol](m: MultisteamSelect,
proto: string, proto: string,
protocol: T, protocol: T,
handler: LPProtoHandler,
matcher: Matcher = nil) = matcher: Matcher = nil) =
## register a handler for the protocol ## register a handler for the protocol
m.handlers.add(HandlerHolder(proto: proto, m.handlers.add(HandlerHolder(proto: proto,
handler: handler,
protocol: protocol, protocol: protocol,
match: matcher)) match: matcher))

View File

@ -12,12 +12,14 @@ import connection, transport, stream,
peerinfo, multiaddress peerinfo, multiaddress
type type
LPProtoHandler* = proc (protocol: LPProtocol, LPProtoHandler* = proc (conn: Connection,
conn: Connection, proto: string):
proto: string): Future[void] {.gcsafe.} Future[void] {.gcsafe.}
LPProtocol* = ref object of RootObj LPProtocol* = ref object of RootObj
peerInfo*: PeerInfo peerInfo*: PeerInfo
codec*: string codec*: string
handler*: LPProtoHandler
proc newProtocol*(p: typedesc[LPProtocol], proc newProtocol*(p: typedesc[LPProtocol],
peerInfo: PeerInfo): p = peerInfo: PeerInfo): p =

View File

@ -8,10 +8,9 @@ import ../libp2p/identify, ../libp2p/multiaddress,
../libp2p/crypto/crypto ../libp2p/crypto/crypto
suite "Identify": suite "Identify":
test "handle identify message6": test "handle identify message":
proc testHandle(): Future[bool] {.async.} = proc testHandle(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53350") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53360")
let remoteSeckey = PrivateKey.random(RSA) let remoteSeckey = PrivateKey.random(RSA)
proc receiver() {.async.} = proc receiver() {.async.} =
@ -19,14 +18,10 @@ suite "Identify":
peerInfo.peerId = PeerID.init(remoteSeckey) peerInfo.peerId = PeerID.init(remoteSeckey)
peerInfo.addrs.add(ma) peerInfo.addrs.add(ma)
let identify = newProtocol(Identify, peerInfo) let identifyProto = newProtocol(Identify, peerInfo)
let msListen = newMultistream() let msListen = newMultistream()
proc handle(p: LPProtocol, conn: Connection, proto: string) {.async, gcsafe.} = msListen.addHandler(IdentifyCodec, identifyProto)
await identify.handle(conn, proto)
msListen.addHandler(IdentifyCodec, identify, handle)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn) await msListen.handle(conn)
@ -43,10 +38,10 @@ suite "Identify":
peerInfo.peerId = PeerID.init(seckey) peerInfo.peerId = PeerID.init(seckey)
peerInfo.addrs.add(ma) peerInfo.addrs.add(ma)
let identify = newProtocol(Identify, peerInfo) let identifyProto = newProtocol(Identify, peerInfo)
let res = await msDial.select(conn, IdentifyCodec) let res = await msDial.select(conn, IdentifyCodec)
let id = await identify.identify(conn) let id = await identifyProto.identify(conn)
await conn.close() await conn.close()
check id.pubKey == remoteSeckey.getKey() check id.pubKey == remoteSeckey.getKey()

View File

@ -1,8 +1,11 @@
import unittest, strutils, sequtils, sugar import unittest, strutils, sequtils, sugar
import chronos import chronos
import ../libp2p/connection, ../libp2p/multistream, import ../libp2p/connection, ../libp2p/multistream,
../libp2p/stream, ../libp2p/connection, ../libp2p/multiaddress, ../libp2p/stream, ../libp2p/connection,
../libp2p/transport, ../libp2p/tcptransport, ../libp2p/protocol ../libp2p/multiaddress, ../libp2p/transport,
../libp2p/tcptransport, ../libp2p/protocol,
../libp2p/crypto/crypto, ../libp2p/peerinfo,
../libp2p/peer
## Mock stream for select test ## Mock stream for select test
type type
@ -140,13 +143,17 @@ suite "Multistream select":
let ms = newMultistream() let ms = newMultistream()
let conn = newConnection(newTestSelectStream()) let conn = newConnection(newTestSelectStream())
var protocol: LPProtocol let seckey = PrivateKey.random(RSA)
proc testHandler(protocol: LPProtocol, var peerInfo: PeerInfo
conn: Connection, peerInfo.peerId = PeerID.init(seckey)
proto: string): Future[void] {.async, gcsafe.} = var protocol: LPProtocol = newProtocol(LPProtocol, peerInfo)
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} =
check proto == "/test/proto/1.0.0" check proto == "/test/proto/1.0.0"
ms.addHandler("/test/proto/1.0.0", protocol, testHandler) protocol.handler = testHandler
ms.addHandler("/test/proto/1.0.0", protocol)
await ms.handle(conn) await ms.handle(conn)
result = true result = true
@ -165,12 +172,16 @@ suite "Multistream select":
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
await conn.close() await conn.close()
var protocol: LPProtocol let seckey = PrivateKey.random(RSA)
proc testHandler(protocol: LPProtocol, var peerInfo: PeerInfo
conn: Connection, peerInfo.peerId = PeerID.init(seckey)
var protocol: LPProtocol = newProtocol(LPProtocol, peerInfo)
proc testHandler(conn: Connection,
proto: string): Future[void] {.async, gcsafe.} = discard proto: string): Future[void] {.async, gcsafe.} = discard
ms.addHandler("/test/proto1/1.0.0", protocol, testHandler)
ms.addHandler("/test/proto2/1.0.0", protocol, testHandler) protocol.handler = testHandler
ms.addHandler("/test/proto1/1.0.0", protocol)
ms.addHandler("/test/proto2/1.0.0", protocol)
await ms.handle(conn) await ms.handle(conn)
result = true result = true
@ -188,11 +199,14 @@ suite "Multistream select":
check cast[string](msg) == "\x3na\n" check cast[string](msg) == "\x3na\n"
await conn.close() await conn.close()
var protocol: LPProtocol let seckey = PrivateKey.random(RSA)
proc testHandler(protocol: LPProtocol, var peerInfo: PeerInfo
conn: Connection, peerInfo.peerId = PeerID.init(seckey)
var protocol: LPProtocol = newProtocol(LPProtocol, peerInfo)
proc testHandler(conn: Connection,
proto: string): Future[void] {.async, gcsafe.} = discard proto: string): Future[void] {.async, gcsafe.} = discard
ms.addHandler("/unabvailable/proto/1.0.0", protocol, testHandler) protocol.handler = testHandler
ms.addHandler("/unabvailable/proto/1.0.0", protocol)
await ms.handle(conn) await ms.handle(conn)
result = true result = true
@ -203,16 +217,20 @@ suite "Multistream select":
test "e2e - handle": test "e2e - handle":
proc endToEnd(): Future[bool] {.async.} = proc endToEnd(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53350") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53350")
var protocol: LPProtocol
proc testHandler(protocol: LPProtocol, let seckey = PrivateKey.random(RSA)
conn: Connection, var peerInfo: PeerInfo
peerInfo.peerId = PeerID.init(seckey)
var protocol: LPProtocol = newProtocol(LPProtocol, peerInfo)
proc testHandler(conn: Connection,
proto: string): Future[void] {.async, gcsafe.} = proto: string): Future[void] {.async, gcsafe.} =
check proto == "/test/proto/1.0.0" check proto == "/test/proto/1.0.0"
await conn.writeLp("Hello!") await conn.writeLp("Hello!")
await conn.close() await conn.close()
protocol.handler = testHandler
let msListen = newMultistream() let msListen = newMultistream()
msListen.addHandler("/test/proto/1.0.0", protocol, testHandler) msListen.addHandler("/test/proto/1.0.0", protocol)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn) await msListen.handle(conn)
@ -239,12 +257,15 @@ suite "Multistream select":
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53351") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53351")
let msListen = newMultistream() let msListen = newMultistream()
var protocol: LPProtocol let seckey = PrivateKey.random(RSA)
proc testHandler(protocol: LPProtocol, var peerInfo: PeerInfo
conn: Connection, peerInfo.peerId = PeerID.init(seckey)
var protocol: LPProtocol = newProtocol(LPProtocol, peerInfo)
proc testHandler(conn: Connection,
proto: string): Future[void] {.async.} = discard proto: string): Future[void] {.async.} = discard
msListen.addHandler("/test/proto1/1.0.0", protocol, testHandler) protocol.handler = testHandler
msListen.addHandler("/test/proto2/1.0.0", protocol, testHandler) msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =