wip: identify

This commit is contained in:
Dmitriy Ryajov 2019-08-29 23:16:30 -06:00
parent 682fecc395
commit aa2dfababd
2 changed files with 100 additions and 42 deletions

View File

@ -9,13 +9,13 @@
import chronos import chronos
import protobuf/minprotobuf, peerinfo, import protobuf/minprotobuf, peerinfo,
switchtypes, protocol, connection, protocol as proto, connection,
peer, crypto/crypto, multiaddress peer, crypto/crypto, multiaddress
const IdentifyCodec = "/ipfs/id/1.0.0" const IdentifyCodec* = "/ipfs/id/1.0.0"
const IdentifyPushCodec = "/ipfs/id/push/1.0.0" const IdentifyPushCodec* = "/ipfs/id/push/1.0.0"
const ProtoVersion = "ipfs/0.1.0" const ProtoVersion* = "ipfs/0.1.0"
const AgentVersion = "nim-libp2p/0.0.1" const AgentVersion* = "nim-libp2p/0.0.1"
type type
# TODO: we're doing protobuf manualy, this is only temporary # TODO: we're doing protobuf manualy, this is only temporary
@ -23,25 +23,23 @@ type
index: int index: int
field: T field: T
IdentifyInfo = object IdentifyInfo* = object
peerInfo: PeerInfo pubKey*: PublicKey
observedAddr: MultiAddress addrs*: seq[MultiAddress]
protoVersion: string observedAddr*: MultiAddress
agentVersion: string protoVersion*: string
agentVersion*: string
Identify = ref object of switchtypes.Protocol Identify* = ref object of LPProtocol
proc encodeIdentifyMsg(p: Identify, observedAddrs: Multiaddress): ProtoBuffer = proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer =
result = initProtoBuffer({WithVarintLength}) result = initProtoBuffer()
var pubKey: PublicKey
if p.peerInfo.peerId.extractPublicKey(pubkey) != true:
raise newException(CatchableError, "unable to extract public key")
result.write(initProtoField(1, pubKey)) result.write(initProtoField(1, peerInfo.peerId.publicKey.getBytes()))
for ma in p.peerInfo.addrs: for ma in peerInfo.addrs:
result.write(initProtoField(2, ma.data.buffer)) result.write(initProtoField(2, ma.data.buffer))
for item in p.peerInfo.protocols: for item in peerInfo.protocols:
result.write(initProtoField(3, item)) result.write(initProtoField(3, item))
result.write(initProtoField(4, observedAddrs.data.buffer)) result.write(initProtoField(4, observedAddrs.data.buffer))
@ -51,14 +49,16 @@ proc encodeIdentifyMsg(p: Identify, observedAddrs: Multiaddress): ProtoBuffer =
let agentVersion = AgentVersion let agentVersion = AgentVersion
result.write(initProtoField(6, agentVersion)) result.write(initProtoField(6, agentVersion))
result.finish() result.finish()
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo = proc decodeMsg*(buf: seq[byte]): IdentifyInfo =
## Get PeerInfo object from ``pb``. var pb = initProtoBuffer(buf)
var pubKey: PublicKey
if pb.getValue(1, pubKey) > -1:
result.pubKey = pubKey
result.addrs = newSeq[MultiAddress]() result.addrs = newSeq[MultiAddress]()
if pb.getValue(1, result.peerId) == -1:
raise newException(CatchableError, "Missing required field `publicKey`!")
var address = newSeq[byte]() var address = newSeq[byte]()
while pb.getBytes(2, address) != -1: while pb.getBytes(2, address) != -1:
if len(address) != 0: if len(address) != 0:
@ -66,10 +66,6 @@ proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
result.addrs.add(MultiAddress.init(copyaddr)) result.addrs.add(MultiAddress.init(copyaddr))
address.setLen(0) address.setLen(0)
proc decodeIdentifyMsg(p: Identify, buf: seq[byte]): IdentifyInfo =
var pb = initProtoBuffer(buf)
result.peerInfo = pb.getPeerInfo()
var proto = "" var proto = ""
var protos: seq[string] = newSeq[string]() var protos: seq[string] = newSeq[string]()
while pb.getString(3, proto) > 0: while pb.getString(3, proto) > 0:
@ -80,24 +76,26 @@ proc decodeIdentifyMsg(p: Identify, buf: seq[byte]): IdentifyInfo =
if pb.getBytes(4, observableAddr) > 0: # attempt to read the observed addr if pb.getBytes(4, observableAddr) > 0: # attempt to read the observed addr
result.observedAddr = MultiAddress.init(observableAddr) result.observedAddr = MultiAddress.init(observableAddr)
result.protoVersion = "" var protoVersion = ""
if pb.getString(5, result.protoVersion) <= 0: discard pb.getString(5, protoVersion)
raise newException(CatchableError, "Unable to read protocol version") result.protoVersion = protoVersion
var agentVersion = "" var agentVersion = ""
if pb.getString(5, agentVersion) <= 0: discard pb.getString(6, agentVersion)
raise newException(CatchableError, "Unable to read agent version") result.agentVersion = agentVersion
proc identify*(p: Identify, proc identify*(p: Identify, conn: Connection): Future[IdentifyInfo] {.async.} =
conn: Connection,
observedAddres: MultiAddress): Future[IdentifyInfo] {.async.} =
var pb = p.encodeIdentifyMsg(observedAddres)
let length = pb.getLen()
await conn.write(pb.getPtr(), length)
var message = await conn.readLp() var message = await conn.readLp()
if len(message) == 0: if len(message) == 0:
raise newException(CatchableError, "Incorrect or empty message received!") raise newException(CatchableError, "Incorrect or empty message received!")
result = p.decodeIdentifyMsg(message) result = decodeMsg(message)
proc push*(p: Identify, conn: Connection) {.async.} =
await conn.write(IdentifyPushCodec)
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
let length = pb.getLen()
await conn.writeLp(pb.buffer)
method handle*(p: Identify, peerInfo: PeerInfo, handler: ProtoHandler) {.async.} = discard proc handle*(p: Identify, conn: Connection, proto: string) {.async, gcsafe.} =
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
await conn.writeLp(pb.buffer)

60
tests/testidentify.nim Normal file
View File

@ -0,0 +1,60 @@
import unittest
import chronos, strutils, sequtils
import ../libp2p/identify, ../libp2p/multiaddress,
../libp2p/peerinfo, ../libp2p/peer, ../libp2p/connection,
../libp2p/identify, ../libp2p/multistream, ../libp2p/transport,
../libp2p/tcptransport, ../libp2p/protocol, ../libp2p/crypto/crypto
suite "Identify":
test "handle identify message6":
proc testHandle(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53350")
let remoteSeckey = PrivateKey.random(RSA)
proc receiver() {.async.} =
var peerInfo: PeerInfo
peerInfo.peerId = PeerID.init(remoteSeckey)
peerInfo.addrs.add(ma)
let identify = newProtocol(Identify, peerInfo)
let msListen = newMultistream()
proc handle(p: LPProtocol, conn: Connection, proto: string) {.async, gcsafe.} =
await identify.handle(conn, proto)
msListen.addHandler(IdentifyCodec, identify, handle)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn)
let transport: TcpTransport = newTransport(TcpTransport)
await transport.listen(ma, connHandler)
proc sender() {.async.} =
let msDial = newMultistream()
let transport: TcpTransport = newTransport(TcpTransport)
let conn = await transport.dial(ma)
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = PeerID.init(seckey)
peerInfo.addrs.add(ma)
let identify = newProtocol(Identify, peerInfo)
let res = await msDial.select(conn, IdentifyCodec)
let id = await identify.identify(conn)
await conn.close()
check id.pubKey == remoteSeckey.getKey()
check id.addrs[0] == ma
check id.protoVersion == ProtoVersion
check id.agentVersion == AgentVersion
await allFutures(receiver(), sender())
result = true
check:
waitFor(testHandle()) == true