Identify Push (#587)
* start of identifypush * better pushidentify * push identify test * fix: make peerid optional
This commit is contained in:
parent
bd2e9a0462
commit
bed00ec43c
|
@ -13,6 +13,7 @@ import options
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import ../protobuf/minprotobuf,
|
import ../protobuf/minprotobuf,
|
||||||
../peerinfo,
|
../peerinfo,
|
||||||
|
../connmanager,
|
||||||
../stream/connection,
|
../stream/connection,
|
||||||
../peerid,
|
../peerid,
|
||||||
../crypto/crypto,
|
../crypto/crypto,
|
||||||
|
@ -30,8 +31,6 @@ const
|
||||||
ProtoVersion* = "ipfs/0.1.0"
|
ProtoVersion* = "ipfs/0.1.0"
|
||||||
AgentVersion* = "nim-libp2p/0.0.1"
|
AgentVersion* = "nim-libp2p/0.0.1"
|
||||||
|
|
||||||
#TODO: implement push identify, leaving out for now as it is not essential
|
|
||||||
|
|
||||||
type
|
type
|
||||||
IdentifyError* = object of LPError
|
IdentifyError* = object of LPError
|
||||||
IdentityNoMatchError* = object of IdentifyError
|
IdentityNoMatchError* = object of IdentifyError
|
||||||
|
@ -49,6 +48,9 @@ type
|
||||||
Identify* = ref object of LPProtocol
|
Identify* = ref object of LPProtocol
|
||||||
peerInfo*: PeerInfo
|
peerInfo*: PeerInfo
|
||||||
|
|
||||||
|
IdentifyPush* = ref object of LPProtocol
|
||||||
|
connManager: ConnManager
|
||||||
|
|
||||||
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: Multiaddress): ProtoBuffer
|
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: Multiaddress): ProtoBuffer
|
||||||
{.raises: [Defect, IdentifyNoPubKeyError].} =
|
{.raises: [Defect, IdentifyNoPubKeyError].} =
|
||||||
result = initProtoBuffer()
|
result = initProtoBuffer()
|
||||||
|
@ -160,7 +162,57 @@ proc identify*(p: Identify,
|
||||||
|
|
||||||
raise newException(IdentityNoMatchError, "Peer ids don't match")
|
raise newException(IdentityNoMatchError, "Peer ids don't match")
|
||||||
|
|
||||||
proc push*(p: Identify, conn: Connection) {.async.} =
|
proc new*(T: typedesc[IdentifyPush], connManager: ConnManager): T =
|
||||||
await conn.write(IdentifyPushCodec)
|
let identifypush = T(connManager: connManager)
|
||||||
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
|
identifypush.init()
|
||||||
|
identifypush
|
||||||
|
|
||||||
|
|
||||||
|
proc init*(p: IdentifyPush) =
|
||||||
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
|
trace "handling identify push", conn
|
||||||
|
try:
|
||||||
|
var message = await conn.readLp(64*1024)
|
||||||
|
|
||||||
|
let infoOpt = decodeMsg(message)
|
||||||
|
if infoOpt.isNone():
|
||||||
|
raise newException(IdentityInvalidMsgError, "Incorrect message received!")
|
||||||
|
|
||||||
|
let indentInfo = infoOpt.get()
|
||||||
|
|
||||||
|
if isNil(conn.peerInfo):
|
||||||
|
raise newException(IdentityInvalidMsgError, "Connection got no peerInfo")
|
||||||
|
|
||||||
|
if indentInfo.pubKey.isSome:
|
||||||
|
let receivedPeerId = PeerID.init(indentInfo.pubKey.get()).tryGet()
|
||||||
|
if receivedPeerId != conn.peerInfo.peerId:
|
||||||
|
raise newException(IdentityNoMatchError, "Peer ids don't match")
|
||||||
|
|
||||||
|
if indentInfo.addrs.len > 0:
|
||||||
|
conn.peerInfo.addrs = indentInfo.addrs
|
||||||
|
|
||||||
|
if indentInfo.agentVersion.isSome:
|
||||||
|
conn.peerInfo.agentVersion = indentInfo.agentVersion.get()
|
||||||
|
|
||||||
|
if indentInfo.protoVersion.isSome:
|
||||||
|
conn.peerInfo.protoVersion = indentInfo.protoVersion.get()
|
||||||
|
|
||||||
|
if indentInfo.protos.len > 0:
|
||||||
|
conn.peerInfo.protocols = indentInfo.protos
|
||||||
|
|
||||||
|
trace "triggering peer event", peerInfo = conn.peerInfo
|
||||||
|
await p.connManager.triggerPeerEvents(conn.peerInfo, PeerEvent(kind: PeerEventKind.Identified))
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
info "exception in identify push handler", exc = exc.msg, conn
|
||||||
|
finally:
|
||||||
|
trace "exiting identify push handler", conn
|
||||||
|
await conn.closeWithEOF()
|
||||||
|
|
||||||
|
p.handler = handle
|
||||||
|
p.codec = IdentifyPushCodec
|
||||||
|
|
||||||
|
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async.} =
|
||||||
|
var pb = encodeMsg(peerInfo, conn.observedAddr)
|
||||||
await conn.writeLp(pb.buffer)
|
await conn.writeLp(pb.buffer)
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import options, bearssl
|
import options, bearssl
|
||||||
import chronos, strutils
|
import chronos, strutils, sequtils, sets, algorithm
|
||||||
import ../libp2p/[protocols/identify,
|
import ../libp2p/[protocols/identify,
|
||||||
multiaddress,
|
multiaddress,
|
||||||
peerinfo,
|
peerinfo,
|
||||||
|
@ -7,6 +7,8 @@ import ../libp2p/[protocols/identify,
|
||||||
stream/connection,
|
stream/connection,
|
||||||
multistream,
|
multistream,
|
||||||
transports/transport,
|
transports/transport,
|
||||||
|
switch,
|
||||||
|
builders,
|
||||||
transports/tcptransport,
|
transports/tcptransport,
|
||||||
crypto/crypto,
|
crypto/crypto,
|
||||||
upgrademngrs/upgrade]
|
upgrademngrs/upgrade]
|
||||||
|
@ -118,3 +120,94 @@ suite "Identify":
|
||||||
let pi2 = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
let pi2 = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
||||||
discard await msDial.select(conn, IdentifyCodec)
|
discard await msDial.select(conn, IdentifyCodec)
|
||||||
discard await identifyProto2.identify(conn, pi2)
|
discard await identifyProto2.identify(conn, pi2)
|
||||||
|
|
||||||
|
suite "handle push identify message":
|
||||||
|
var
|
||||||
|
switch1 {.threadvar.}: Switch
|
||||||
|
switch2 {.threadvar.}: Switch
|
||||||
|
identifyPush1 {.threadvar.}: IdentifyPush
|
||||||
|
identifyPush2 {.threadvar.}: IdentifyPush
|
||||||
|
awaiters {.threadvar.}: seq[Future[void]]
|
||||||
|
conn {.threadvar.}: Connection
|
||||||
|
asyncSetup:
|
||||||
|
switch1 = newStandardSwitch()
|
||||||
|
switch2 = newStandardSwitch()
|
||||||
|
|
||||||
|
identifyPush1 = IdentifyPush.new(switch1.connManager)
|
||||||
|
identifyPush2 = IdentifyPush.new(switch2.connManager)
|
||||||
|
|
||||||
|
switch1.mount(identifyPush1)
|
||||||
|
switch2.mount(identifyPush2)
|
||||||
|
|
||||||
|
awaiters.add(await switch1.start())
|
||||||
|
awaiters.add(await switch2.start())
|
||||||
|
|
||||||
|
conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, IdentifyPushCodec)
|
||||||
|
|
||||||
|
let storedInfo1 = switch1.peerStore.get(switch2.peerInfo.peerId)
|
||||||
|
let storedInfo2 = switch2.peerStore.get(switch1.peerInfo.peerId)
|
||||||
|
|
||||||
|
check:
|
||||||
|
storedInfo1.peerId == switch2.peerInfo.peerId
|
||||||
|
storedInfo2.peerId == switch1.peerInfo.peerId
|
||||||
|
|
||||||
|
storedInfo1.addrs.toSeq() == switch2.peerInfo.addrs
|
||||||
|
storedInfo2.addrs.toSeq() == switch1.peerInfo.addrs
|
||||||
|
|
||||||
|
storedInfo1.protos.toSeq() == switch2.peerInfo.protocols
|
||||||
|
storedInfo2.protos.toSeq() == switch1.peerInfo.protocols
|
||||||
|
|
||||||
|
proc closeAll() {.async.} =
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
await switch1.stop()
|
||||||
|
await switch2.stop()
|
||||||
|
|
||||||
|
# this needs to go at end
|
||||||
|
await allFuturesThrowing(awaiters)
|
||||||
|
|
||||||
|
asyncTest "simple push identify":
|
||||||
|
switch2.peerInfo.protocols.add("/newprotocol/")
|
||||||
|
switch2.peerInfo.addrs.add(MultiAddress.init("/ip4/127.0.0.1/tcp/5555").tryGet())
|
||||||
|
|
||||||
|
check:
|
||||||
|
switch1.peerStore.get(switch2.peerInfo.peerId).addrs.toSeq() != switch2.peerInfo.addrs
|
||||||
|
switch1.peerStore.get(switch2.peerInfo.peerId).protos != switch2.peerInfo.protocols.toSet()
|
||||||
|
|
||||||
|
await identifyPush2.push(switch2.peerInfo, conn)
|
||||||
|
|
||||||
|
await closeAll()
|
||||||
|
|
||||||
|
# Wait the very end to be sure that the push has been processed
|
||||||
|
var aprotos = switch1.peerStore.get(switch2.peerInfo.peerId).protos.toSeq()
|
||||||
|
var bprotos = switch2.peerInfo.protocols
|
||||||
|
aprotos.sort()
|
||||||
|
bprotos.sort()
|
||||||
|
check:
|
||||||
|
aprotos == bprotos
|
||||||
|
switch1.peerStore.get(switch2.peerInfo.peerId).addrs == switch2.peerInfo.addrs.toSet()
|
||||||
|
|
||||||
|
|
||||||
|
asyncTest "wrong peer id push identify":
|
||||||
|
switch2.peerInfo.protocols.add("/newprotocol/")
|
||||||
|
switch2.peerInfo.addrs.add(MultiAddress.init("/ip4/127.0.0.1/tcp/5555").tryGet())
|
||||||
|
|
||||||
|
check:
|
||||||
|
switch1.peerStore.get(switch2.peerInfo.peerId).addrs != switch2.peerInfo.addrs.toSet()
|
||||||
|
switch1.peerStore.get(switch2.peerInfo.peerId).protos.toSeq() != switch2.peerInfo.protocols
|
||||||
|
|
||||||
|
let oldPeerId = switch2.peerInfo.peerId
|
||||||
|
switch2.peerInfo = PeerInfo.init(PrivateKey.random(newRng()[]).get())
|
||||||
|
|
||||||
|
await identifyPush2.push(switch2.peerInfo, conn)
|
||||||
|
|
||||||
|
await closeAll()
|
||||||
|
|
||||||
|
# Wait the very end to be sure that the push has been processed
|
||||||
|
var aprotos = switch1.peerStore.get(oldPeerId).protos.toSeq()
|
||||||
|
var bprotos = switch2.peerInfo.protocols
|
||||||
|
aprotos.sort()
|
||||||
|
bprotos.sort()
|
||||||
|
check:
|
||||||
|
aprotos != bprotos
|
||||||
|
switch1.peerStore.get(oldPeerId).addrs.toSeq() != switch2.peerInfo.addrs
|
||||||
|
|
Loading…
Reference in New Issue