From d02735dc4664a5f9b3c24155b0db1da381bdf474 Mon Sep 17 00:00:00 2001 From: Menduist Date: Wed, 8 Sep 2021 11:07:46 +0200 Subject: [PATCH] Remove peer info (#610) Peer Info is now for local peer data only. For other peers info, use the peer store. Previous reference to peer info are replaced with the peerid --- libp2p/connmanager.nim | 60 +++--- libp2p/debugutils.nim | 5 +- libp2p/dialer.nim | 12 +- libp2p/muxers/mplex/lpchannel.nim | 5 +- libp2p/muxers/mplex/mplex.nim | 6 +- libp2p/muxers/muxer.nim | 4 +- libp2p/peerinfo.nim | 114 ++--------- libp2p/peerstore.nim | 65 +++---- libp2p/protocols/identify.nim | 57 +++--- libp2p/protocols/ping.nim | 4 +- libp2p/protocols/pubsub/gossipsub/scoring.nim | 101 ++++------ libp2p/protocols/pubsub/pubsub.nim | 13 +- libp2p/protocols/pubsub/rpc/message.nim | 17 +- libp2p/protocols/secure/noise.nim | 28 ++- libp2p/protocols/secure/secio.nim | 13 +- libp2p/protocols/secure/secure.nim | 7 +- libp2p/stream/bufferstream.nim | 3 +- libp2p/stream/chronosstream.nim | 17 +- libp2p/stream/connection.nim | 11 +- libp2p/switch.nim | 27 +-- libp2p/upgrademngrs/muxedupgrade.nim | 18 +- libp2p/upgrademngrs/upgrade.nim | 34 ++-- tests/pubsub/testgossipinternal.nim | 180 +++++++++--------- tests/testconnmngr.nim | 122 ++++++------ tests/testidentify.nim | 45 +++-- tests/testinterop.nim | 24 +-- tests/testnoise.nim | 64 ++++--- tests/testpeerinfo.nim | 45 +---- tests/testpeerstore.nim | 29 +-- tests/testping.nim | 2 +- tests/testswitch.nim | 56 +++--- 31 files changed, 466 insertions(+), 722 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index a8e293eec..56c673660 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -12,6 +12,7 @@ import std/[options, tables, sequtils, sets] import pkg/[chronos, chronicles, metrics] import peerinfo, + peerstore, stream/connection, muxers/muxer, utils/semaphore, @@ -48,7 +49,7 @@ type discard ConnEventHandler* = - proc(peerInfo: PeerInfo, event: ConnEvent): Future[void] + proc(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe, raises: [Defect].} PeerEventKind* {.pure.} = enum @@ -64,7 +65,7 @@ type discard PeerEventHandler* = - proc(peerInfo: PeerInfo, event: PeerEvent): Future[void] {.gcsafe.} + proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} MuxerHolder = object muxer: Muxer @@ -78,6 +79,7 @@ type muxed: Table[Connection, MuxerHolder] connEvents: array[ConnEventKind, OrderedSet[ConnEventHandler]] peerEvents: array[PeerEventKind, OrderedSet[PeerEventHandler]] + peerStore*: PeerStore proc newTooManyConnectionsError(): ref TooManyConnectionsError {.inline.} = result = newException(TooManyConnectionsError, "Too many connections") @@ -133,22 +135,22 @@ proc removeConnEventHandler*(c: ConnManager, raiseAssert exc.msg proc triggerConnEvent*(c: ConnManager, - peerInfo: PeerInfo, + peerId: PeerId, event: ConnEvent) {.async, gcsafe.} = try: - trace "About to trigger connection events", peer = peerInfo.peerId + trace "About to trigger connection events", peer = peerId if c.connEvents[event.kind].len() > 0: - trace "triggering connection events", peer = peerInfo.peerId, event = $event.kind + trace "triggering connection events", peer = peerId, event = $event.kind var connEvents: seq[Future[void]] for h in c.connEvents[event.kind]: - connEvents.add(h(peerInfo, event)) + connEvents.add(h(peerId, event)) checkFutures(await allFinished(connEvents)) except CancelledError as exc: raise exc except CatchableError as exc: warn "Exception in triggerConnEvents", - msg = exc.msg, peer = peerInfo.peerId, event = $event + msg = exc.msg, peer = peerId, event = $event proc addPeerEventHandler*(c: ConnManager, handler: PeerEventHandler, @@ -179,33 +181,33 @@ proc removePeerEventHandler*(c: ConnManager, raiseAssert exc.msg proc triggerPeerEvents*(c: ConnManager, - peerInfo: PeerInfo, + peerId: PeerId, event: PeerEvent) {.async, gcsafe.} = - trace "About to trigger peer events", peer = peerInfo.peerId + trace "About to trigger peer events", peer = peerId if c.peerEvents[event.kind].len == 0: return try: - let count = c.connCount(peerInfo.peerId) + let count = c.connCount(peerId) if event.kind == PeerEventKind.Joined and count != 1: - trace "peer already joined", peer = peerInfo.peerId, event = $event + trace "peer already joined", peer = peerId, event = $event return elif event.kind == PeerEventKind.Left and count != 0: - trace "peer still connected or already left", peer = peerInfo.peerId, event = $event + trace "peer still connected or already left", peer = peerId, event = $event return - trace "triggering peer events", peer = peerInfo.peerId, event = $event + trace "triggering peer events", peer = peerId, event = $event var peerEvents: seq[Future[void]] for h in c.peerEvents[event.kind]: - peerEvents.add(h(peerInfo, event)) + peerEvents.add(h(peerId, event)) checkFutures(await allFinished(peerEvents)) except CancelledError as exc: raise exc except CatchableError as exc: # handlers should not raise! - warn "Exception in triggerPeerEvents", exc = exc.msg, peer = peerInfo.peerId + warn "Exception in triggerPeerEvents", exc = exc.msg, peer = peerId proc contains*(c: ConnManager, conn: Connection): bool = ## checks if a connection is being tracked by the @@ -215,10 +217,7 @@ proc contains*(c: ConnManager, conn: Connection): bool = if isNil(conn): return - if isNil(conn.peerInfo): - return - - return conn in c.conns.getOrDefault(conn.peerInfo.peerId) + return conn in c.conns.getOrDefault(conn.peerId) proc contains*(c: ConnManager, peerId: PeerID): bool = peerId in c.conns @@ -252,7 +251,7 @@ proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = trace "Cleaned up muxer", m = muxerHolder.muxer proc delConn(c: ConnManager, conn: Connection) = - let peerId = conn.peerInfo.peerId + let peerId = conn.peerId c.conns.withValue(peerId, peerConns): peerConns[].excl(conn) @@ -269,10 +268,6 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = trace "Wont cleanup a nil connection" return - if isNil(conn.peerInfo): - trace "No peer info for connection" - return - # Remove connection from all tables without async breaks var muxer = some(MuxerHolder()) if not c.muxed.pop(conn, muxer.get()): @@ -293,12 +288,12 @@ proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} = trace "Triggering connect events", conn conn.upgrade() - let peerInfo = conn.peerInfo + let peerId = conn.peerId await c.triggerPeerEvents( - peerInfo, PeerEvent(kind: PeerEventKind.Joined, initiator: conn.dir == Direction.Out)) + peerId, PeerEvent(kind: PeerEventKind.Joined, initiator: conn.dir == Direction.Out)) await c.triggerConnEvent( - peerInfo, ConnEvent(kind: ConnEventKind.Connected, incoming: conn.dir == Direction.In)) + peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: conn.dir == Direction.In)) except CatchableError as exc: # This is top-level procedure which will work as separate task, so it # do not need to propagate CancelledError and should handle other errors @@ -308,10 +303,10 @@ proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} = proc peerCleanup(c: ConnManager, conn: Connection) {.async.} = try: trace "Triggering disconnect events", conn - let peerInfo = conn.peerInfo + let peerId = conn.peerId await c.triggerConnEvent( - peerInfo, ConnEvent(kind: ConnEventKind.Disconnected)) - await c.triggerPeerEvents(peerInfo, PeerEvent(kind: PeerEventKind.Left)) + peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + await c.triggerPeerEvents(peerId, PeerEvent(kind: PeerEventKind.Left)) except CatchableError as exc: # This is top-level procedure which will work as separate task, so it # do not need to propagate CancelledError and should handle other errors @@ -386,10 +381,7 @@ proc storeConn*(c: ConnManager, conn: Connection) if conn.closed or conn.atEof: raise newException(LPError, "Connection closed or EOF") - if isNil(conn.peerInfo): - raise newException(LPError, "Empty peer info") - - let peerId = conn.peerInfo.peerId + let peerId = conn.peerId if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer: debug "Too many connections for peer", conn, conns = c.conns.getOrDefault(peerId).len diff --git a/libp2p/debugutils.nim b/libp2p/debugutils.nim index e7dbcb947..1b8520ba4 100644 --- a/libp2p/debugutils.nim +++ b/libp2p/debugutils.nim @@ -72,8 +72,7 @@ proc dumpMessage*(conn: SecureConn, direction: FlowDirection, var pb = initProtoBuffer(options = {WithVarintLength}) pb.write(2, getTimestamp()) pb.write(4, uint64(direction)) - if len(conn.peerInfo.addrs) > 0: - pb.write(6, conn.peerInfo.addrs[0]) + pb.write(6, conn.observedAddr) pb.write(7, data) pb.finish() @@ -83,7 +82,7 @@ proc dumpMessage*(conn: SecureConn, direction: FlowDirection, else: getHomeDir() / libp2p_dump_dir - let fileName = $(conn.peerInfo.peerId) & ".pbcap" + let fileName = $(conn.peerId) & ".pbcap" # This is debugging procedure so it should not generate any exceptions, # and we going to return at every possible OS error. diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 961368466..b952b7548 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -36,7 +36,7 @@ type DialFailedError* = object of LPError Dialer* = ref object of Dial - peerInfo*: PeerInfo + localPeerId*: PeerId ms: MultistreamSelect connManager: ConnManager dialLock: Table[PeerID, AsyncLock] @@ -79,7 +79,7 @@ proc dialAndUpgrade( continue # Try the next address # make sure to assign the peer to the connection - dialed.peerInfo = PeerInfo.init(peerId, addrs) + dialed.peerId = peerId # also keep track of the connection's bottom unsafe transport direction # required by gossipsub scoring @@ -99,7 +99,7 @@ proc dialAndUpgrade( raise exc doAssert not isNil(conn), "connection died after upgradeOutgoing" - debug "Dial successful", conn, peerInfo = conn.peerInfo + debug "Dial successful", conn, peerId = conn.peerId return conn proc internalConnect( @@ -107,7 +107,7 @@ proc internalConnect( peerId: PeerID, addrs: seq[MultiAddress]): Future[Connection] {.async.} = - if self.peerInfo.peerId == peerId: + if self.localPeerId == peerId: raise newException(CatchableError, "can't dial self!") # Ensure there's only one in-flight attempt per peer @@ -231,12 +231,12 @@ method dial*( proc new*( T: type Dialer, - peerInfo: PeerInfo, + localPeerId: PeerId, connManager: ConnManager, transports: seq[Transport], ms: MultistreamSelect): Dialer = - T(peerInfo: peerInfo, + T(localPeerId: localPeerId, connManager: connManager, transports: transports, ms: ms) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index b0418bb7b..a211f6a38 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -57,10 +57,9 @@ type func shortLog*(s: LPChannel): auto = try: if s.isNil: "LPChannel(nil)" - elif s.conn.peerInfo.isNil: $s.oid elif s.name != $s.oid and s.name.len > 0: - &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}" - else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}" + &"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}" + else: &"{shortLog(s.conn.peerId)}:{s.oid}" except ValueError as exc: raise newException(Defect, exc.msg) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index d56912d13..d84cf5e00 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -68,7 +68,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( m.channels[chann.initiator].len.int64, - labelValues = [$chann.initiator, $m.connection.peerInfo.peerId]) + labelValues = [$chann.initiator, $m.connection.peerId]) except CatchableError as exc: warn "Error cleaning up mplex channel", m, chann, msg = exc.msg @@ -94,7 +94,7 @@ proc newStreamInternal*(m: Mplex, name, timeout = timeout) - result.peerInfo = m.connection.peerInfo + result.peerId = m.connection.peerId result.observedAddr = m.connection.observedAddr result.transportDir = m.connection.transportDir @@ -108,7 +108,7 @@ proc newStreamInternal*(m: Mplex, when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( m.channels[initiator].len.int64, - labelValues = [$initiator, $m.connection.peerInfo.peerId]) + labelValues = [$initiator, $m.connection.peerId]) proc handleStream(m: Mplex, chann: LPChannel) {.async.} = ## call the muxer stream handler for this channel diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 81f254b10..9ecdba95b 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -76,7 +76,9 @@ method init(c: MuxerProvider) = # finally await both the futures if not isNil(c.muxerHandler): - futs &= c.muxerHandler(muxer) + await c.muxerHandler(muxer) + when defined(libp2p_agents_metrics): + conn.shortAgent = muxer.connection.shortAgent checkFutures(await allFinished(futs)) except CancelledError as exc: diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index 14454bceb..ff75a7198 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -15,33 +15,19 @@ import peerid, multiaddress, crypto/crypto, errors export peerid, multiaddress, crypto, errors, results -## A peer can be constructed in one of tree ways: -## 1) A local peer with a private key -## 2) A remote peer with a PeerID and it's public key stored -## in the ``id`` itself -## 3) A remote peer with a standalone public key, that isn't -## encoded in the ``id`` -## +## Our local peer info type - KeyType* = enum - HasPrivate, - HasPublic - PeerInfoError* = LPError - PeerInfo* = ref object of RootObj + PeerInfo* = ref object peerId*: PeerID addrs*: seq[MultiAddress] protocols*: seq[string] protoVersion*: string agentVersion*: string - secure*: string - case keyType*: KeyType: - of HasPrivate: - privateKey*: PrivateKey - of HasPublic: - key: Option[PublicKey] + privateKey*: PrivateKey + publicKey*: PublicKey func shortLog*(p: PeerInfo): auto = ( @@ -53,14 +39,6 @@ func shortLog*(p: PeerInfo): auto = ) chronicles.formatIt(PeerInfo): shortLog(it) -template postInit(peerinfo: PeerInfo, - addrs: openarray[MultiAddress], - protocols: openarray[string]) = - if len(addrs) > 0: - peerinfo.addrs = @addrs - if len(protocols) > 0: - peerinfo.protocols = @protocols - proc init*( p: typedesc[PeerInfo], key: PrivateKey, @@ -70,84 +48,18 @@ proc init*( agentVersion: string = ""): PeerInfo {.raises: [Defect, PeerInfoError].} = + let pubkey = try: + key.getKey().tryGet() + except CatchableError: + raise newException(PeerInfoError, "invalid private key") + let peerInfo = PeerInfo( - keyType: HasPrivate, peerId: PeerID.init(key).tryGet(), + publicKey: pubkey, privateKey: key, protoVersion: protoVersion, - agentVersion: agentVersion) + agentVersion: agentVersion, + addrs: @addrs, + protocols: @protocols) - peerInfo.postInit(addrs, protocols) return peerInfo - -proc init*( - p: typedesc[PeerInfo], - peerId: PeerID, - addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = [], - protoVersion: string = "", - agentVersion: string = ""): PeerInfo = - let peerInfo = PeerInfo( - keyType: HasPublic, - peerId: peerId, - protoVersion: protoVersion, - agentVersion: agentVersion) - - peerInfo.postInit(addrs, protocols) - return peerInfo - -proc init*( - p: typedesc[PeerInfo], - peerId: string, - addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = [], - protoVersion: string = "", - agentVersion: string = ""): PeerInfo - {.raises: [Defect, PeerInfoError].} = - - let peerInfo = PeerInfo( - keyType: HasPublic, - peerId: PeerID.init(peerId).tryGet(), - protoVersion: protoVersion, - agentVersion: agentVersion) - - peerInfo.postInit(addrs, protocols) - return peerInfo - -proc init*( - p: typedesc[PeerInfo], - key: PublicKey, - addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = [], - protoVersion: string = "", - agentVersion: string = ""): PeerInfo - {.raises: [Defect, PeerInfoError].} = - - let peerInfo = PeerInfo( - keyType: HasPublic, - peerId: PeerID.init(key).tryGet(), - key: some(key), - protoVersion: protoVersion, - agentVersion: agentVersion) - - peerInfo.postInit(addrs, protocols) - return peerInfo - -proc publicKey*(p: PeerInfo): Option[PublicKey] = - var res = none(PublicKey) - if p.keyType == HasPublic: - if p.peerId.hasPublicKey(): - var pubKey: PublicKey - if p.peerId.extractPublicKey(pubKey): - res = some(pubKey) - elif p.key.isSome: - res = p.key - else: - let pkeyRes = p.privateKey.getPublicKey() - if pkeyRes.isOk: - res = some(pkeyRes.get()) - - return res - -func hash*(p: PeerInfo): Hash = - cast[pointer](p).hash diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index c4fc34b06..7887253a7 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -12,6 +12,7 @@ import std/[tables, sets, sequtils, options], ./crypto/crypto, + ./protocols/identify, ./peerid, ./peerinfo, ./multiaddress @@ -49,14 +50,10 @@ type addressBook*: AddressBook protoBook*: ProtoBook keyBook*: KeyBook - - StoredInfo* = object - # Collates stored info about a peer - peerId*: PeerID - addrs*: HashSet[MultiAddress] - protos*: HashSet[string] - publicKey*: PublicKey + agentBook*: PeerBook[string] + protoVersionBook*: PeerBook[string] + ## Constructs a new PeerStore with metadata of type M proc new*(T: type PeerStore): PeerStore = var p: PeerStore @@ -95,6 +92,9 @@ proc delete*[T](peerBook: var PeerBook[T], peerBook.book.del(peerId) return true +proc contains*[T](peerBook: PeerBook[T], peerId: PeerID): bool = + peerId in peerBook.book + ################ # Set Book API # ################ @@ -113,6 +113,16 @@ proc add*[T]( for handler in peerBook.changeHandlers: handler(peerId, peerBook.get(peerId)) +# Helper for seq +proc set*[T]( + peerBook: var SetPeerBook[T], + peerId: PeerID, + entry: seq[T]) = + ## Add entry to a given peer. If the peer is not known, + ## it will be set with the provided entry. + peerBook.set(peerId, entry.toHashSet()) + + ################## # Peer Store API # ################## @@ -135,37 +145,18 @@ proc delete*(peerStore: PeerStore, peerStore.protoBook.delete(peerId) and peerStore.keyBook.delete(peerId) -proc get*(peerStore: PeerStore, - peerId: PeerID): StoredInfo = - ## Get the stored information of a given peer. - - StoredInfo( - peerId: peerId, - addrs: peerStore.addressBook.get(peerId), - protos: peerStore.protoBook.get(peerId), - publicKey: peerStore.keyBook.get(peerId) - ) +proc updatePeerInfo*( + peerStore: PeerStore, + info: IdentifyInfo) = -proc update*(peerStore: PeerStore, peerInfo: PeerInfo) = - for address in peerInfo.addrs: - peerStore.addressBook.add(peerInfo.peerId, address) - for proto in peerInfo.protocols: - peerStore.protoBook.add(peerInfo.peerId, proto) - let pKey = peerInfo.publicKey() - if pKey.isSome: - peerStore.keyBook.set(peerInfo.peerId, pKey.get()) + if info.addrs.len > 0: + peerStore.addressBook.set(info.peerId, info.addrs) -proc replace*(peerStore: PeerStore, peerInfo: PeerInfo) = - discard peerStore.addressBook.delete(peerInfo.peerId) - discard peerStore.protoBook.delete(peerInfo.peerId) - discard peerStore.keyBook.delete(peerInfo.peerId) - peerStore.update(peerInfo) + if info.agentVersion.isSome: + peerStore.agentBook.set(info.peerId, info.agentVersion.get().string) -proc peers*(peerStore: PeerStore): seq[StoredInfo] = - ## Get all the stored information of every peer. - - let allKeys = concat(toSeq(keys(peerStore.addressBook.book)), - toSeq(keys(peerStore.protoBook.book)), - toSeq(keys(peerStore.keyBook.book))).toHashSet() + if info.protoVersion.isSome: + peerStore.protoVersionBook.set(info.peerId, info.protoVersion.get().string) - return allKeys.mapIt(peerStore.get(it)) + if info.protos.len > 0: + peerStore.protoBook.set(info.peerId, info.protos) diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 5744797c5..e19c5a78e 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -13,7 +13,6 @@ import options import chronos, chronicles import ../protobuf/minprotobuf, ../peerinfo, - ../connmanager, ../stream/connection, ../peerid, ../crypto/crypto, @@ -39,6 +38,7 @@ type IdentifyInfo* = object pubKey*: Option[PublicKey] + peerId*: PeerId addrs*: seq[MultiAddress] observedAddr*: Option[MultiAddress] protoVersion*: Option[string] @@ -48,16 +48,22 @@ type Identify* = ref object of LPProtocol peerInfo*: PeerInfo + IdentifyPushHandler* = proc ( + peer: PeerId, + newInfo: IdentifyInfo): + Future[void] + {.gcsafe, raises: [Defect].} + IdentifyPush* = ref object of LPProtocol - connManager: ConnManager + identifyHandler: IdentifyPushHandler proc encodeMsg*(peerInfo: PeerInfo, observedAddr: Multiaddress): ProtoBuffer {.raises: [Defect, IdentifyNoPubKeyError].} = result = initProtoBuffer() - if peerInfo.publicKey.isNone: - raise newException(IdentifyNoPubKeyError, "No public key found for peer!") - result.write(1, peerInfo.publicKey.get().getBytes().get()) + let pkey = peerInfo.publicKey + + result.write(1, pkey.getBytes().get()) for ma in peerInfo.addrs: result.write(2, ma.data.buffer) for proto in peerInfo.protocols: @@ -138,7 +144,7 @@ method init*(p: Identify) = proc identify*(p: Identify, conn: Connection, - remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = + remotePeerId: PeerId): Future[IdentifyInfo] {.async, gcsafe.} = trace "initiating identify", conn var message = await conn.readLp(64*1024) if len(message) == 0: @@ -150,24 +156,26 @@ proc identify*(p: Identify, raise newException(IdentityInvalidMsgError, "Incorrect message received!") result = infoOpt.get() - if not isNil(remotePeerInfo) and result.pubKey.isSome: + if result.pubKey.isSome: let peer = PeerID.init(result.pubKey.get()) if peer.isErr: raise newException(IdentityInvalidMsgError, $peer.error) else: - if peer.get() != remotePeerInfo.peerId: + result.peerId = peer.get() + if peer.get() != remotePeerId: trace "Peer ids don't match", remote = peer, - local = remotePeerInfo.peerId + local = remotePeerId raise newException(IdentityNoMatchError, "Peer ids don't match") + else: + raise newException(IdentityInvalidMsgError, "No pubkey in identify") -proc new*(T: typedesc[IdentifyPush], connManager: ConnManager): T = - let identifypush = T(connManager: connManager) +proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T = + let identifypush = T(identifyHandler: handler) identifypush.init() identifypush - proc init*(p: IdentifyPush) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = trace "handling identify push", conn @@ -178,30 +186,17 @@ proc init*(p: IdentifyPush) = if infoOpt.isNone(): raise newException(IdentityInvalidMsgError, "Incorrect message received!") - let indentInfo = infoOpt.get() - - if isNil(conn.peerInfo): - raise newException(IdentityInvalidMsgError, "Connection got no peerInfo") + var indentInfo = infoOpt.get() if indentInfo.pubKey.isSome: let receivedPeerId = PeerID.init(indentInfo.pubKey.get()).tryGet() - if receivedPeerId != conn.peerInfo.peerId: + if receivedPeerId != conn.peerId: raise newException(IdentityNoMatchError, "Peer ids don't match") + indentInfo.peerId = receivedPeerId - if indentInfo.addrs.len > 0: - conn.peerInfo.addrs = indentInfo.addrs - - if indentInfo.agentVersion.isSome: - conn.peerInfo.agentVersion = indentInfo.agentVersion.get() - - if indentInfo.protoVersion.isSome: - conn.peerInfo.protoVersion = indentInfo.protoVersion.get() - - if indentInfo.protos.len > 0: - conn.peerInfo.protocols = indentInfo.protos - - trace "triggering peer event", peerInfo = conn.peerInfo - await p.connManager.triggerPeerEvents(conn.peerInfo, PeerEvent(kind: PeerEventKind.Identified)) + trace "triggering peer event", peerInfo = conn.peerId + if not isNil(p.identifyHandler): + await p.identifyHandler(conn.peerId, indentInfo) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/protocols/ping.nim b/libp2p/protocols/ping.nim index fb61bc8eb..23565f897 100644 --- a/libp2p/protocols/ping.nim +++ b/libp2p/protocols/ping.nim @@ -29,7 +29,7 @@ type WrongPingAckError* = object of LPError PingHandler* = proc ( - peer: PeerInfo): + peer: PeerId): Future[void] {.gcsafe, raises: [Defect].} @@ -51,7 +51,7 @@ method init*(p: Ping) = trace "echoing ping", conn await conn.write(addr buf[0], PingSize) if not isNil(p.pingHandler): - await p.pingHandler(conn.peerInfo) + await p.pingHandler(conn.peerId) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 389376ddd..8af9729b5 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -86,24 +86,15 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = - when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].safeToLowerAscii() - if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()): - peer.shortAgent = shortAgent.get() - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - else: - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = ["unknown"]) + let agent = + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) try: await g.switch.disconnect(peer.peerId) @@ -180,30 +171,18 @@ proc updateScores*(g: GossipSub) = # avoid async score += topicScore * topicParams.topicWeight # Score metrics - when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].safeToLowerAscii() - if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()): - peer.shortAgent = shortAgent.get() - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" - libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) - libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) - libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) - libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent]) - else: - libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"]) + let agent = + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) + libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) + libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) + libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent]) # Score decay info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay @@ -236,28 +215,17 @@ proc updateScores*(g: GossipSub) = # avoid async score += colocationFactor * g.parameters.ipColocationFactorWeight # Score metrics - when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].safeToLowerAscii() - if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()): - peer.shortAgent = shortAgent.get() - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" - libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) - libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) - libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) - else: - libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"]) + let agent = + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) + libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) + libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) # decay behaviourPenalty peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay @@ -278,10 +246,7 @@ proc updateScores*(g: GossipSub) = # avoid async debug "disconnecting bad score peer", peer, score = peer.score asyncSpawn(try: g.disconnectPeer(peer) except Exception as exc: raiseAssert exc.msg) - when defined(libp2p_agents_metrics): - libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) - else: - libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"]) + libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) for peer in evicting: g.peerStats.del(peer) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index d7771822e..e10802c1f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -347,16 +347,11 @@ method handleConn*(p: PubSub, ## that we're interested in ## - if isNil(conn.peerInfo): - trace "no valid PeerId for peer" - await conn.close() - return - proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] = # call pubsub rpc handler p.rpcHandler(peer, msg) - let peer = p.getOrCreatePeer(conn.peerInfo.peerId, @[proto]) + let peer = p.getOrCreatePeer(conn.peerId, @[proto]) try: peer.handler = handler @@ -568,11 +563,11 @@ proc init*[PubParams: object | bool]( parameters: parameters, topicsHigh: int.high) - proc peerEventHandler(peerInfo: PeerInfo, event: PeerEvent) {.async.} = + proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: - pubsub.subscribePeer(peerInfo.peerId) + pubsub.subscribePeer(peerId) else: - pubsub.unsubscribePeer(peerInfo.peerId) + pubsub.unsubscribePeer(peerId) switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 9ae5063e5..51cb3ebfb 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -77,9 +77,6 @@ proc init*( let peer = peer.get() msg.fromPeer = peer.peerId if sign: - if peer.keyType != KeyType.HasPrivate: - raise (ref LPError)(msg: "Cannot sign message without private key") - msg.signature = sign(msg, peer.privateKey).expect("Couldn't sign message!") msg.key = peer.privateKey.getPublicKey().expect("Invalid private key!") .getBytes().expect("Couldn't get public key bytes!") @@ -87,3 +84,17 @@ proc init*( raise (ref LPError)(msg: "Cannot sign message without peer info") msg + +proc init*( + T: type Message, + peerId: PeerId, + data: seq[byte], + topic: string, + seqno: Option[uint64]): Message + {.gcsafe, raises: [Defect, LPError].} = + var msg = Message(data: data, topicIDs: @[topic]) + msg.fromPeer = peerId + + if seqno.isSome: + msg.seqno = @(seqno.get().toBytesBE()) + msg diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 72b7b07c9..dc110cafb 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -100,8 +100,7 @@ type func shortLog*(conn: NoiseConnection): auto = try: if conn.isNil: "NoiseConnection(nil)" - elif conn.peerInfo.isNil: $conn.oid - else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" + else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: raise newException(Defect, exc.msg) @@ -547,27 +546,26 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon else: trace "Remote signature verified", conn - if initiator and not isNil(conn.peerInfo): + if initiator: let pid = PeerID.init(remotePubKey) - if not conn.peerInfo.peerId.validate(): + if not conn.peerId.validate(): raise newException(NoiseHandshakeError, "Failed to validate peerId.") - if pid.isErr or pid.get() != conn.peerInfo.peerId: + if pid.isErr or pid.get() != conn.peerId: var failedKey: PublicKey - discard extractPublicKey(conn.peerInfo.peerId, failedKey) + discard extractPublicKey(conn.peerId, failedKey) debug "Noise handshake, peer infos don't match!", initiator, dealt_peer = conn, dealt_key = $failedKey, received_peer = $pid, received_key = $remotePubKey - raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) + raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerId) + else: + let pid = PeerID.init(remotePubKey) + if pid.isErr: + raise newException(NoiseHandshakeError, "Invalid remote peer id") + conn.peerId = pid.get() - conn.peerInfo = - if conn.peerInfo != nil: - conn.peerInfo - else: - PeerInfo.init(PeerID.init(remotePubKey).tryGet()) - - var tmp = NoiseConnection.init(conn, conn.peerInfo, conn.observedAddr) + var tmp = NoiseConnection.init(conn, conn.peerId, conn.observedAddr) if initiator: tmp.readCs = handshakeRes.cs2 @@ -579,7 +577,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon finally: burnMem(handshakeRes) - trace "Noise handshake completed!", initiator, peer = shortLog(secure.peerInfo) + trace "Noise handshake completed!", initiator, peer = shortLog(secure.peerId) conn.timeout = timeout diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 2ea77b6c8..329675d96 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -77,8 +77,7 @@ type func shortLog*(conn: SecioConn): auto = try: if conn.isNil: "SecioConn(nil)" - elif conn.peerInfo.isNil: $conn.oid - else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" + else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: raise newException(Defect, exc.msg) @@ -264,13 +263,7 @@ proc newSecioConn(conn: Connection, ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## ``order``. - conn.peerInfo = - if conn.peerInfo != nil: - conn.peerInfo - else: - PeerInfo.init(remotePubKey) - - result = SecioConn.init(conn, conn.peerInfo, conn.observedAddr) + result = SecioConn.init(conn, conn.peerId, conn.observedAddr) let i0 = if order < 0: 1 else: 0 let i1 = if order < 0: 0 else: 1 @@ -346,6 +339,8 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S remotePeerId = PeerID.init(remotePubkey).tryGet() # TODO: PeerID check against supplied PeerID + if not initiator: + conn.peerId = remotePeerId let order = getOrder(remoteBytesPubkey, localNonce, localBytesPubkey, remoteNonce).tryGet() trace "Remote proposal", schemes = remoteExchanges, ciphers = remoteCiphers, diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 903752ce6..e045eba06 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -36,8 +36,7 @@ type func shortLog*(conn: SecureConn): auto = try: if conn.isNil: "SecureConn(nil)" - elif conn.peerInfo.isNil: $conn.oid - else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" + else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: raise newException(Defect, exc.msg) @@ -45,11 +44,11 @@ chronicles.formatIt(SecureConn): shortLog(it) proc init*(T: type SecureConn, conn: Connection, - peerInfo: PeerInfo, + peerId: PeerId, observedAddr: Multiaddress, timeout: Duration = DefaultConnectionTimeout): T = result = T(stream: conn, - peerInfo: peerInfo, + peerId: peerId, observedAddr: observedAddr, closeEvent: conn.closeEvent, upgraded: conn.upgraded, diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 8065a9d20..9151029b1 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -38,8 +38,7 @@ type func shortLog*(s: BufferStream): auto = try: if s.isNil: "BufferStream(nil)" - elif s.peerInfo.isNil: $s.oid - else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}" + else: &"{shortLog(s.peerId)}:{s.oid}" except ValueError as exc: raise newException(Defect, exc.msg) diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index a4281bb08..10490c95a 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -26,7 +26,6 @@ type client: StreamTransport when defined(libp2p_agents_metrics): tracked: bool - shortAgent: string when defined(libp2p_agents_metrics): declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"]) @@ -38,8 +37,7 @@ declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"]) func shortLog*(conn: ChronosStream): auto = try: if conn.isNil: "ChronosStream(nil)" - elif conn.peerInfo.isNil: $conn.oid - else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" + else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: raise newException(Defect, exc.msg) @@ -84,16 +82,9 @@ template withExceptions(body: untyped) = when defined(libp2p_agents_metrics): proc trackPeerIdentity(s: ChronosStream) = - if not s.tracked: - if not isNil(s.peerInfo) and s.peerInfo.agentVersion.len > 0: - # / seems a weak "standard" so for now it's reliable - let shortAgent = s.peerInfo.agentVersion.split("/")[0].safeToLowerAscii() - if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()): - s.shortAgent = shortAgent.get() - else: - s.shortAgent = "unknown" - libp2p_peers_identity.inc(labelValues = [s.shortAgent]) - s.tracked = true + if not s.tracked and s.shortAgent.len > 0: + libp2p_peers_identity.inc(labelValues = [s.shortAgent]) + s.tracked = true proc untrackPeerIdentity(s: ChronosStream) = if s.tracked: diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 8e05a9947..7b974c386 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -33,11 +33,13 @@ type timeout*: Duration # channel timeout if no activity timerTaskFut: Future[void] # the current timer instance timeoutHandler*: TimeoutHandler # timeout handler - peerInfo*: PeerInfo + peerId*: PeerId observedAddr*: Multiaddress upgraded*: Future[void] tag*: string # debug tag for metrics (generally ms protocol) transportDir*: Direction # The bottom level transport (generally the socket) direction + when defined(libp2p_agents_metrics): + shortAgent*: string proc timeoutMonitor(s: Connection) {.async, gcsafe.} @@ -60,8 +62,7 @@ proc onUpgrade*(s: Connection) {.async.} = func shortLog*(conn: Connection): string = try: if conn.isNil: "Connection(nil)" - elif conn.peerInfo.isNil: $conn.oid - else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" + else: &"{shortLog(conn.peerId)}:{conn.oid}" except ValueError as exc: raiseAssert(exc.msg) @@ -151,12 +152,12 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = return proc init*(C: type Connection, - peerInfo: PeerInfo, + peerId: PeerId, dir: Direction, timeout: Duration = DefaultConnectionTimeout, timeoutHandler: TimeoutHandler = nil, observedAddr: MultiAddress = MultiAddress()): Connection = - result = C(peerInfo: peerInfo, + result = C(peerId: peerId, dir: dir, timeout: timeout, timeoutHandler: timeoutHandler, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index a6233e921..1f887e37c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -216,11 +216,6 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = s.acceptFuts.add(s.accept(t)) startFuts.add(server) - proc peerIdentifiedHandler(peerInfo: PeerInfo, event: PeerEvent) {.async.} = - s.peerStore.replace(peerInfo) - - s.connManager.addPeerEventHandler(peerIdentifiedHandler, PeerEventKind.Identified) - debug "Started libp2p node", peer = s.peerInfo return startFuts # listen for incoming connections @@ -270,27 +265,9 @@ proc newSwitch*(peerInfo: PeerInfo, transports: transports, connManager: connManager, peerStore: PeerStore.new(), - dialer: Dialer.new(peerInfo, connManager, transports, ms), + dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms), nameResolver: nameResolver) + switch.connManager.peerStore = switch.peerStore switch.mount(identity) return switch - -proc isConnected*(s: Switch, peerInfo: PeerInfo): bool - {.deprecated: "Use PeerID version".} = - not isNil(peerInfo) and isConnected(s, peerInfo.peerId) - -proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void] - {.deprecated: "Use PeerID version", gcsafe.} = - disconnect(s, peerInfo.peerId) - -proc connect*(s: Switch, peerInfo: PeerInfo): Future[void] - {.deprecated: "Use PeerID version".} = - connect(s, peerInfo.peerId, peerInfo.addrs) - -proc dial*(s: Switch, - peerInfo: PeerInfo, - proto: string): - Future[Connection] - {.deprecated: "Use PeerID version".} = - dial(s, peerInfo.peerId, peerInfo.addrs, proto) diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index df53d048b..2ba98c875 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -35,13 +35,15 @@ proc identify*( try: await self.identify(stream) + when defined(libp2p_agents_metrics): + muxer.connection.shortAgent = stream.shortAgent finally: await stream.closeWithEOF() proc mux*( self: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} = - ## mux incoming connection + ## mux outgoing connection trace "Muxing connection", conn if self.muxers.len == 0: @@ -86,17 +88,16 @@ method upgradeOutgoing*( raise newException(UpgradeFailedError, "unable to secure connection, stopping upgrade") - if sconn.peerInfo.isNil: - raise newException(UpgradeFailedError, - "current version of nim-libp2p requires that secure protocol negotiates peerid") - let muxer = await self.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future raise newException(UpgradeFailedError, "a muxer is required for outgoing connections") - if sconn.closed() or isNil(sconn.peerInfo): + when defined(libp2p_agents_metrics): + conn.shortAgent = muxer.connection.shortAgent + + if sconn.closed(): await sconn.close() raise newException(UpgradeFailedError, "Connection closed or missing peer info, stopping upgrade") @@ -164,11 +165,6 @@ proc muxerHandler( let conn = muxer.connection - if conn.peerInfo.isNil: - warn "This version of nim-libp2p requires secure protocol to negotiate peerid" - await muxer.close() - return - # store incoming connection self.connManager.storeConn(conn) diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim index abdfef223..c9010614f 100644 --- a/libp2p/upgrademngrs/upgrade.nim +++ b/libp2p/upgrademngrs/upgrade.nim @@ -9,15 +9,17 @@ {.push raises: [Defect].} -import std/[options, sequtils] +import std/[options, sequtils, strutils] import pkg/[chronos, chronicles, metrics] import ../stream/connection, ../protocols/secure/secure, ../protocols/identify, ../multistream, + ../peerstore, ../connmanager, - ../errors + ../errors, + ../utility export connmanager, connection, identify, secure, multistream @@ -70,26 +72,22 @@ proc identify*( ## identify the connection if (await self.ms.select(conn, self.identity.codec)): - let info = await self.identity.identify(conn, conn.peerInfo) + let + info = await self.identity.identify(conn, conn.peerId) + peerStore = self.connManager.peerStore if info.pubKey.isNone and isNil(conn): raise newException(UpgradeFailedError, "no public key provided and no existing peer identity found") - if isNil(conn.peerInfo): - conn.peerInfo = PeerInfo.init(info.pubKey.get()) + conn.peerId = info.peerId - if info.addrs.len > 0: - conn.peerInfo.addrs = info.addrs + when defined(libp2p_agents_metrics): + conn.shortAgent = "unknown" + if info.agentVersion.isSome and info.agentVersion.get().len > 0: + let shortAgent = info.agentVersion.get().split("/")[0].safeToLowerAscii() + if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()): + conn.shortAgent = shortAgent.get() - if info.agentVersion.isSome: - conn.peerInfo.agentVersion = info.agentVersion.get() - - if info.protoVersion.isSome: - conn.peerInfo.protoVersion = info.protoVersion.get() - - if info.protos.len > 0: - conn.peerInfo.protocols = info.protos - - await self.connManager.triggerPeerEvents(conn.peerInfo, PeerEvent(kind: PeerEventKind.Identified)) - trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo) + peerStore.updatePeerInfo(info) + trace "identified remote peer", conn, peerId = shortLog(conn.peerId) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 4e29ab18f..d8ee5c583 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -33,9 +33,9 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer = onNewPeer(p, pubSubPeer) pubSubPeer -proc randomPeerInfo(): PeerInfo = +proc randomPeerId(): PeerId = try: - PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() except CatchableError as exc: raise newException(Defect, exc.msg) @@ -58,9 +58,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn gossipSub.gossipsub[topic].incl(peer) @@ -99,9 +99,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn gossipSub.gossipsub[topic].incl(peer) @@ -125,9 +125,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn peer.score = scoreLow gossipSub.gossipsub[topic].incl(peer) @@ -155,9 +155,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) gossipSub.grafted(peer, topic) gossipSub.mesh[topic].incl(peer) @@ -182,9 +182,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - var peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + var peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -211,9 +211,9 @@ suite "GossipSub internal": for i in 0..<6: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler gossipSub.fanout[topic].incl(peer) @@ -245,9 +245,9 @@ suite "GossipSub internal": for i in 0..<6: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic2].incl(peer) @@ -279,9 +279,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -293,9 +293,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -304,10 +304,10 @@ suite "GossipSub internal": for i in 0..5: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo + let peerId = randomPeerId() + conn.peerId = peerId inc seqno - let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) + let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) check gossipSub.fanout[topic].len == 15 @@ -337,9 +337,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -351,10 +351,10 @@ suite "GossipSub internal": for i in 0..5: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo + let peerId = randomPeerId() + conn.peerId = peerId inc seqno - let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) + let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -377,9 +377,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) @@ -392,10 +392,10 @@ suite "GossipSub internal": for i in 0..5: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo + let peerId = randomPeerId() + conn.peerId = peerId inc seqno - let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) + let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -418,9 +418,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) @@ -433,10 +433,10 @@ suite "GossipSub internal": for i in 0..5: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo + let peerId = randomPeerId() + conn.peerId = peerId inc seqno - let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false) + let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -456,9 +456,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler # generate messages @@ -466,11 +466,11 @@ suite "GossipSub internal": for i in 0..5: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) inc seqno - let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false) + let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg])) check gossipSub.mcache.msgs.len == 0 @@ -490,9 +490,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn peer.handler = handler peer.appScore = gossipSub.parameters.graylistThreshold - 1 @@ -522,9 +522,9 @@ suite "GossipSub internal": let lotOfSubs = RPCMsg.withSubs(tooManyTopics, true) let conn = TestBufferStream.new(noop) - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) await gossipSub.rpcHandler(peer, lotOfSubs) @@ -546,14 +546,14 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn gossipSub.gossipsub[topic].incl(peer) gossipSub.backingOff .mgetOrPut(topic, initTable[PeerID, Moment]()) - .add(peerInfo.peerId, Moment.now() + 1.hours) + .add(peerId, Moment.now() + 1.hours) let prunes = gossipSub.handleGraft(peer, @[ControlGraft(topicID: topic)]) # there must be a control prune due to violation of backoff check prunes.len != 0 @@ -577,9 +577,9 @@ suite "GossipSub internal": for i in 0..<15: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn gossipSub.gossipsub[topic].incl(peer) gossipSub.mesh[topic].incl(peer) @@ -589,8 +589,8 @@ suite "GossipSub internal": check gossipSub.mesh[topic].len != 0 for i in 0..<15: - let peerInfo = conns[i].peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = conns[i].peerId + let peer = gossipSub.getPubSubPeer(peerId) gossipSub.handlePrune(peer, @[ControlPrune( topicID: topic, peers: @[], @@ -620,9 +620,9 @@ suite "GossipSub internal": let conn = TestBufferStream.new(noop) conn.transportDir = Direction.In conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.score = 40.0 peer.sendConn = conn gossipSub.grafted(peer, topic) @@ -632,9 +632,9 @@ suite "GossipSub internal": let conn = TestBufferStream.new(noop) conn.transportDir = Direction.Out conns &= conn - let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.score = 10.0 peer.sendConn = conn gossipSub.grafted(peer, topic) @@ -667,9 +667,9 @@ suite "GossipSub internal": for i in 0..<30: let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) peer.handler = handler gossipSub.grafted(peer, topic) gossipSub.mesh[topic].incl(peer) @@ -678,9 +678,9 @@ suite "GossipSub internal": # should ignore no budget peer let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) let id = @[0'u8, 1, 2, 3] let msg = ControlIHave( topicID: topic, @@ -694,9 +694,9 @@ suite "GossipSub internal": # given duplicate ihave should generate only one iwant let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) let id = @[0'u8, 1, 2, 3] let msg = ControlIHave( topicID: topic, @@ -709,9 +709,9 @@ suite "GossipSub internal": # given duplicate iwant should generate only one message let conn = TestBufferStream.new(noop) conns &= conn - let peerInfo = randomPeerInfo() - conn.peerInfo = peerInfo - let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) let id = @[0'u8, 1, 2, 3] gossipSub.mcache.put(id, Message()) let msg = ControlIWant( diff --git a/tests/testconnmngr.nim b/tests/testconnmngr.nim index 190716569..5b314d308 100644 --- a/tests/testconnmngr.nim +++ b/tests/testconnmngr.nim @@ -11,14 +11,14 @@ import helpers type TestMuxer = ref object of Muxer - peerInfo: PeerInfo + peerId: PeerId method newStream*( m: TestMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = - result = Connection.init(m.peerInfo, Direction.Out) + result = Connection.init(m.peerId, Direction.Out) suite "Connection Manager": teardown: @@ -26,13 +26,13 @@ suite "Connection Manager": asyncTest "add and retrieve a connection": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) connMngr.storeConn(conn) check conn in connMngr - let peerConn = connMngr.selectConn(peer.peerId) + let peerConn = connMngr.selectConn(peerId) check peerConn == conn check peerConn.dir == Direction.In @@ -40,8 +40,8 @@ suite "Connection Manager": asyncTest "shouldn't allow a closed connection": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) await conn.close() expect CatchableError: @@ -51,8 +51,8 @@ suite "Connection Manager": asyncTest "shouldn't allow an EOFed connection": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) conn.isEof = true expect CatchableError: @@ -63,8 +63,8 @@ suite "Connection Manager": asyncTest "add and retrieve a muxer": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) let muxer = new Muxer muxer.connection = conn @@ -79,8 +79,8 @@ suite "Connection Manager": asyncTest "shouldn't allow a muxer for an untracked connection": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) let muxer = new Muxer muxer.connection = conn @@ -93,17 +93,17 @@ suite "Connection Manager": asyncTest "get conn with direction": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn1 = Connection.init(peer, Direction.Out) - let conn2 = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn1 = Connection.init(peerId, Direction.Out) + let conn2 = Connection.init(peerId, Direction.In) connMngr.storeConn(conn1) connMngr.storeConn(conn2) check conn1 in connMngr check conn2 in connMngr - let outConn = connMngr.selectConn(peer.peerId, Direction.Out) - let inConn = connMngr.selectConn(peer.peerId, Direction.In) + let outConn = connMngr.selectConn(peerId, Direction.Out) + let inConn = connMngr.selectConn(peerId, Direction.In) check outConn != inConn check outConn.dir == Direction.Out @@ -113,40 +113,40 @@ suite "Connection Manager": asyncTest "get muxed stream for peer": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) let muxer = new TestMuxer - muxer.peerInfo = peer + muxer.peerId = peerId muxer.connection = conn connMngr.storeConn(conn) connMngr.storeMuxer(muxer) check muxer in connMngr - let stream = await connMngr.getStream(peer.peerId) + let stream = await connMngr.getStream(peerId) check not(isNil(stream)) - check stream.peerInfo == peer + check stream.peerId == peerId await connMngr.close() await stream.close() asyncTest "get stream from directed connection": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) let muxer = new TestMuxer - muxer.peerInfo = peer + muxer.peerId = peerId muxer.connection = conn connMngr.storeConn(conn) connMngr.storeMuxer(muxer) check muxer in connMngr - let stream1 = await connMngr.getStream(peer.peerId, Direction.In) + let stream1 = await connMngr.getStream(peerId, Direction.In) check not(isNil(stream1)) - let stream2 = await connMngr.getStream(peer.peerId, Direction.Out) + let stream2 = await connMngr.getStream(peerId, Direction.Out) check isNil(stream2) await connMngr.close() @@ -154,11 +154,11 @@ suite "Connection Manager": asyncTest "get stream from any connection": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) let muxer = new TestMuxer - muxer.peerInfo = peer + muxer.peerId = peerId muxer.connection = conn connMngr.storeConn(conn) @@ -173,13 +173,13 @@ suite "Connection Manager": asyncTest "should raise on too many connections": let connMngr = ConnManager.init(maxConnsPerPeer = 1) - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() - connMngr.storeConn(Connection.init(peer, Direction.In)) + connMngr.storeConn(Connection.init(peerId, Direction.In)) let conns = @[ - Connection.init(peer, Direction.In), - Connection.init(peer, Direction.In)] + Connection.init(peerId, Direction.In), + Connection.init(peerId, Direction.In)] expect TooManyConnectionsError: connMngr.storeConn(conns[0]) @@ -192,8 +192,8 @@ suite "Connection Manager": asyncTest "cleanup on connection close": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) - let conn = Connection.init(peer, Direction.In) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() + let conn = Connection.init(peerId, Direction.In) let muxer = new Muxer muxer.connection = conn @@ -213,14 +213,14 @@ suite "Connection Manager": asyncTest "drop connections for peer": let connMngr = ConnManager.init() - let peer = PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()) + let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() for i in 0..<2: let dir = if i mod 2 == 0: Direction.In else: Direction.Out - let conn = Connection.init(peer, dir) + let conn = Connection.init(peerId, dir) let muxer = new Muxer muxer.connection = conn @@ -229,14 +229,14 @@ suite "Connection Manager": check conn in connMngr check muxer in connMngr - check not(isNil(connMngr.selectConn(peer.peerId, dir))) + check not(isNil(connMngr.selectConn(peerId, dir))) - check peer.peerId in connMngr - await connMngr.dropPeer(peer.peerId) + check peerId in connMngr + await connMngr.dropPeer(peerId) - check peer.peerId notin connMngr - check isNil(connMngr.selectConn(peer.peerId, Direction.In)) - check isNil(connMngr.selectConn(peer.peerId, Direction.Out)) + check peerId notin connMngr + check isNil(connMngr.selectConn(peerId, Direction.In)) + check isNil(connMngr.selectConn(peerId, Direction.Out)) await connMngr.close() @@ -248,7 +248,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -259,7 +259,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -277,7 +277,7 @@ suite "Connection Manager": let conn = await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -288,7 +288,7 @@ suite "Connection Manager": discard await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -304,7 +304,7 @@ suite "Connection Manager": let conn = await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -314,7 +314,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -332,7 +332,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -344,7 +344,7 @@ suite "Connection Manager": discard await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -360,7 +360,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -371,7 +371,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -389,7 +389,7 @@ suite "Connection Manager": let conn = await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -400,7 +400,7 @@ suite "Connection Manager": discard await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -416,7 +416,7 @@ suite "Connection Manager": let conn = await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -426,7 +426,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -444,7 +444,7 @@ suite "Connection Manager": let conn = connMngr.trackIncomingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) @@ -456,7 +456,7 @@ suite "Connection Manager": discard await connMngr.trackOutgoingConn( proc(): Future[Connection] {.async.} = return Connection.init( - PeerInfo.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()), + PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) ) diff --git a/tests/testidentify.nim b/tests/testidentify.nim index f5447b500..5e161adc7 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -68,7 +68,7 @@ suite "Identify": conn = await transport2.dial(transport1.ma) discard await msDial.select(conn, IdentifyCodec) - let id = await identifyProto2.identify(conn, remotePeerInfo) + let id = await identifyProto2.identify(conn, remotePeerInfo.peerId) check id.pubKey.get() == remoteSecKey.getPublicKey().get() check id.addrs[0] == ma @@ -91,7 +91,7 @@ suite "Identify": conn = await transport2.dial(transport1.ma) discard await msDial.select(conn, IdentifyCodec) - let id = await identifyProto2.identify(conn, remotePeerInfo) + let id = await identifyProto2.identify(conn, remotePeerInfo.peerId) check id.pubKey.get() == remoteSecKey.getPublicKey().get() check id.addrs[0] == ma @@ -119,7 +119,7 @@ suite "Identify": expect IdentityNoMatchError: let pi2 = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) discard await msDial.select(conn, IdentifyCodec) - discard await identifyProto2.identify(conn, pi2) + discard await identifyProto2.identify(conn, pi2.peerId) suite "handle push identify message": var @@ -133,8 +133,13 @@ suite "Identify": switch1 = newStandardSwitch() switch2 = newStandardSwitch() - identifyPush1 = IdentifyPush.new(switch1.connManager) - identifyPush2 = IdentifyPush.new(switch2.connManager) + proc updateStore1(peerId: PeerId, info: IdentifyInfo) {.async.} = + switch1.peerStore.updatePeerInfo(info) + proc updateStore2(peerId: PeerId, info: IdentifyInfo) {.async.} = + switch2.peerStore.updatePeerInfo(info) + + identifyPush1 = IdentifyPush.new(updateStore1) + identifyPush2 = IdentifyPush.new(updateStore2) switch1.mount(identifyPush1) switch2.mount(identifyPush2) @@ -144,18 +149,12 @@ suite "Identify": conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, IdentifyPushCodec) - let storedInfo1 = switch1.peerStore.get(switch2.peerInfo.peerId) - let storedInfo2 = switch2.peerStore.get(switch1.peerInfo.peerId) - check: - storedInfo1.peerId == switch2.peerInfo.peerId - storedInfo2.peerId == switch1.peerInfo.peerId + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() + switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet() - storedInfo1.addrs == switch2.peerInfo.addrs.toHashSet() - storedInfo2.addrs == switch1.peerInfo.addrs.toHashSet() - - storedInfo1.protos == switch2.peerInfo.protocols.toHashSet() - storedInfo2.protos == switch1.peerInfo.protocols.toHashSet() + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() + switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet() proc closeAll() {.async.} = await conn.close() @@ -171,8 +170,8 @@ suite "Identify": switch2.peerInfo.addrs.add(MultiAddress.init("/ip4/127.0.0.1/tcp/5555").tryGet()) check: - switch1.peerStore.get(switch2.peerInfo.peerId).addrs != switch2.peerInfo.addrs.toHashSet() - switch1.peerStore.get(switch2.peerInfo.peerId).protos != switch2.peerInfo.protocols.toHashSet() + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) != switch2.peerInfo.addrs.toHashSet() + switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) != switch2.peerInfo.protocols.toHashSet() await identifyPush2.push(switch2.peerInfo, conn) @@ -180,8 +179,8 @@ suite "Identify": # Wait the very end to be sure that the push has been processed check: - switch1.peerStore.get(switch2.peerInfo.peerId).protos == switch2.peerInfo.protocols.toHashSet() - switch1.peerStore.get(switch2.peerInfo.peerId).addrs == switch2.peerInfo.addrs.toHashSet() + switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.protocols.toHashSet() + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() asyncTest "wrong peer id push identify": @@ -189,8 +188,8 @@ suite "Identify": switch2.peerInfo.addrs.add(MultiAddress.init("/ip4/127.0.0.1/tcp/5555").tryGet()) check: - switch1.peerStore.get(switch2.peerInfo.peerId).addrs != switch2.peerInfo.addrs.toHashSet() - switch1.peerStore.get(switch2.peerInfo.peerId).protos != switch2.peerInfo.protocols.toHashSet() + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) != switch2.peerInfo.addrs.toHashSet() + switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) != switch2.peerInfo.protocols.toHashSet() let oldPeerId = switch2.peerInfo.peerId switch2.peerInfo = PeerInfo.init(PrivateKey.random(newRng()[]).get()) @@ -201,5 +200,5 @@ suite "Identify": # Wait the very end to be sure that the push has been processed check: - switch1.peerStore.get(oldPeerId).protos != switch2.peerInfo.protocols.toHashSet() - switch1.peerStore.get(oldPeerId).addrs != switch2.peerInfo.addrs.toHashSet() + switch1.peerStore.protoBook.get(oldPeerId) != switch2.peerInfo.protocols.toHashSet() + switch1.peerStore.addressBook.get(oldPeerId) != switch2.peerInfo.addrs.toHashSet() diff --git a/tests/testinterop.nim b/tests/testinterop.nim index d754b9a9d..9f2479751 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -5,8 +5,6 @@ import ../libp2p import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto] type - # TODO: Unify both PeerInfo structs - NativePeerInfo = libp2p.PeerInfo DaemonPeerInfo = daemonapi.PeerInfo proc writeLp*(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} = @@ -69,10 +67,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1) {.async.} = if times >= count and not finished: finished = true - let peer = NativePeerInfo.init( - daemonPeer.peer, - daemonPeer.addresses) - await nativeNode.connect(peer.peerId, peer.addrs) + await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) @@ -124,10 +119,7 @@ proc testPubSubNodePublish(gossip: bool = false, count: int = 1) {.async.} = await pubsub.start() let nativePeer = nativeNode.peerInfo - let peer = NativePeerInfo.init( - daemonPeer.peer, - daemonPeer.addresses) - await nativeNode.connect(peer) + await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) @@ -192,9 +184,7 @@ suite "Interop": testFuture.complete() await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses), - protos[0]) + let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) await conn.writeLp("test 1") check "test 2" == string.fromBytes((await conn.readLp(1024))) @@ -240,9 +230,7 @@ suite "Interop": await stream.close() await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses), - protos[0]) + let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) await conn.writeLp(test & "\r\n") check expect == (await wait(testFuture, 10.secs)) @@ -370,9 +358,7 @@ suite "Interop": await stream.close() await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses), - protos[0]) + let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) await conn.writeLp(test & "\r\n") check expect == (await wait(testFuture, 10.secs)) diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 3ce356049..fe40a0824 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -51,7 +51,9 @@ method init(p: TestProto) {.gcsafe.} = p.handler = handle proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switch, PeerInfo) = - var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + var + privateKey = PrivateKey.random(ECDSA, rng[]).get() + peerInfo = PeerInfo.init(privateKey) peerInfo.addrs.add(ma) proc createMplex(conn: Connection): Muxer = @@ -62,9 +64,9 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc mplexProvider = MuxerProvider.new(createMplex, MplexCodec) muxers = [(MplexCodec, mplexProvider)].toTable() secureManagers = if secio: - [Secure(Secio.new(rng, peerInfo.privateKey))] + [Secure(Secio.new(rng, privateKey))] else: - [Secure(Noise.new(rng, peerInfo.privateKey, outgoing = outgoing))] + [Secure(Noise.new(rng, privateKey, outgoing = outgoing))] connManager = ConnManager.init() ms = MultistreamSelect.new() muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagers, connManager, ms) @@ -87,8 +89,9 @@ suite "Noise": asyncTest "e2e: handle write + noise": let server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server]) - serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) + serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() + serverInfo = PeerInfo.init(serverPrivKey, [server]) + serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) asyncSpawn transport1.start(server) @@ -105,10 +108,13 @@ suite "Noise": let acceptFut = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) - clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true) + clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() + clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma]) + clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) conn = await transport2.dial(transport1.ma) - sconn = await clientNoise.secure(conn, true) + + conn.peerId = serverInfo.peerId + let sconn = await clientNoise.secure(conn, true) var msg = newSeq[byte](6) await sconn.readExactly(addr msg[0], 6) @@ -124,8 +130,9 @@ suite "Noise": asyncTest "e2e: handle write + noise (wrong prologue)": let server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server]) - serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) + serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() + serverInfo = PeerInfo.init(serverPrivKey, [server]) + serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) @@ -145,9 +152,12 @@ suite "Noise": let handlerWait = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) - clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8]) + clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() + clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma]) + clientNoise = Noise.new(rng, clientPrivKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8]) conn = await transport2.dial(transport1.ma) + conn.peerId = serverInfo.peerId + var sconn: Connection = nil expect(NoiseDecryptTagError): sconn = await clientNoise.secure(conn, true) @@ -160,8 +170,9 @@ suite "Noise": asyncTest "e2e: handle read + noise": let server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server]) - serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) + serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() + serverInfo = PeerInfo.init(serverPrivKey, [server]) + serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) readTask = newFuture[void]() let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) @@ -181,10 +192,12 @@ suite "Noise": let acceptFut = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) - clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true) + clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() + clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma]) + clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) conn = await transport2.dial(transport1.ma) - sconn = await clientNoise.secure(conn, true) + conn.peerId = serverInfo.peerId + let sconn = await clientNoise.secure(conn, true) await sconn.write("Hello!") await acceptFut @@ -196,8 +209,9 @@ suite "Noise": asyncTest "e2e: handle read + noise fragmented": let server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server]) - serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) + serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() + serverInfo = PeerInfo.init(serverPrivKey, [server]) + serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) readTask = newFuture[void]() var hugePayload = newSeq[byte](0xFFFFF) @@ -220,10 +234,12 @@ suite "Noise": let acceptFut = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) - clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true) + clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() + clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma]) + clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) conn = await transport2.dial(transport1.ma) - sconn = await clientNoise.secure(conn, true) + conn.peerId = serverInfo.peerId + let sconn = await clientNoise.secure(conn, true) await sconn.writeLp(hugePayload) await readTask @@ -252,7 +268,7 @@ suite "Noise": (switch2, peerInfo2) = createSwitch(ma2, true) awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) - let conn = await switch2.dial(switch1.peerInfo, TestCodec) + let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec) await conn.writeLp("Hello!") let msg = string.fromBytes(await conn.readLp(1024)) check "Hello!" == msg @@ -281,7 +297,7 @@ suite "Noise": awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) expect(UpgradeFailedError): - let conn = await switch2.dial(switch1.peerInfo, TestCodec) + let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec) await allFuturesThrowing( switch1.stop(), diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index 390b76c68..edce5fb07 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -15,47 +15,4 @@ suite "PeerInfo": var peerId = PeerID.init(seckey).get() check peerId == peerInfo.peerId - check seckey == peerInfo.privateKey - check seckey.getPublicKey().get() == peerInfo.publicKey.get() - - test "Should init with public key": - let seckey = PrivateKey.random(ECDSA, rng[]).get() - var peerInfo = PeerInfo.init(seckey.getPublicKey().get()) - var peerId = PeerID.init(seckey.getPublicKey().get()).get() - - check peerId == peerInfo.peerId - check seckey.getPublicKey.get() == peerInfo.publicKey.get() - - test "Should init from PeerId with public key": - let seckey = PrivateKey.random(Ed25519, rng[]).get() - var peerInfo = PeerInfo.init(PeerID.init(seckey.getPublicKey.get()).get()) - var peerId = PeerID.init(seckey.getPublicKey.get()).get() - - check peerId == peerInfo.peerId - check seckey.getPublicKey.get() == peerInfo.publicKey.get() - - test "Should init from CIDv0 string": - var peerInfo: PeerInfo - try: - peerInfo = PeerInfo.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") - except CatchableError: - check false - - check: - PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N").get() == peerInfo.peerId - - # TODO: CIDv1 handling is missing from PeerID - # https://github.com/status-im/nim-libp2p/issues/53 - # test "Should init from CIDv1 string": - # var peerInfo = PeerInfo.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") - - # check: - # PeerID.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") == peerInfo.peerId - - test "Should return none if pubkey is missing from id": - let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get()) - check peerInfo.publicKey.isNone - - test "Should return some if pubkey is present in id": - let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519, rng[]).get()).get()) - check peerInfo.publicKey.isSome + check seckey.getPublicKey().get() == peerInfo.publicKey diff --git a/tests/testpeerstore.nim b/tests/testpeerstore.nim index db75f89a5..57f10d290 100644 --- a/tests/testpeerstore.nim +++ b/tests/testpeerstore.nim @@ -35,38 +35,11 @@ suite "PeerStore": peerStore.keyBook.set(peerId1, keyPair1.pubKey) peerStore.keyBook.set(peerId2, keyPair2.pubKey) - # Test PeerStore::get - let - peer1Stored = peerStore.get(peerId1) - peer2Stored = peerStore.get(peerId2) - check: - peer1Stored.peerId == peerId1 - peer1Stored.addrs == toHashSet([multiaddr1]) - peer1Stored.protos == toHashSet([testcodec1]) - peer1Stored.publicKey == keyPair1.pubkey - peer2Stored.peerId == peerId2 - peer2Stored.addrs == toHashSet([multiaddr2]) - peer2Stored.protos == toHashSet([testcodec2]) - peer2Stored.publicKey == keyPair2.pubkey - - # Test PeerStore::peers - let peers = peerStore.peers() - check: - peers.len == 2 - peers.anyIt(it.peerId == peerId1 and - it.addrs == toHashSet([multiaddr1]) and - it.protos == toHashSet([testcodec1]) and - it.publicKey == keyPair1.pubkey) - peers.anyIt(it.peerId == peerId2 and - it.addrs == toHashSet([multiaddr2]) and - it.protos == toHashSet([testcodec2]) and - it.publicKey == keyPair2.pubkey) - # Test PeerStore::delete check: # Delete existing peerId peerStore.delete(peerId1) == true - peerStore.peers().anyIt(it.peerId == peerId1) == false + peerId1 notin peerStore.addressBook # Now try and delete it again peerStore.delete(peerId1) == false diff --git a/tests/testping.nim b/tests/testping.nim index b17959810..d7894c9ad 100644 --- a/tests/testping.nim +++ b/tests/testping.nim @@ -36,7 +36,7 @@ suite "Ping": transport1 = TcpTransport.new(upgrade = Upgrade()) transport2 = TcpTransport.new(upgrade = Upgrade()) - proc handlePing(peer: PeerInfo) {.async, gcsafe, closure.} = + proc handlePing(peer: PeerId) {.async, gcsafe, closure.} = inc pingReceivedCount pingProto1 = Ping.new() pingProto2 = Ping.new(handlePing) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index b7c68eadc..33bea4e2a 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -248,18 +248,18 @@ suite "Switch": var step = 0 var kinds: set[ConnEventKind] - proc hook(peerInfo: PeerInfo, event: ConnEvent) {.async, gcsafe.} = + proc hook(peerId: PeerId, event: ConnEvent) {.async, gcsafe.} = kinds = kinds + {event.kind} case step: of 0: check: event.kind == ConnEventKind.Connected - peerInfo.peerId == switch1.peerInfo.peerId + peerId == switch1.peerInfo.peerId of 1: check: event.kind == ConnEventKind.Disconnected - check peerInfo.peerId == switch1.peerInfo.peerId + check peerId == switch1.peerInfo.peerId else: check false @@ -303,18 +303,18 @@ suite "Switch": var step = 0 var kinds: set[ConnEventKind] - proc hook(peerInfo: PeerInfo, event: ConnEvent) {.async, gcsafe.} = + proc hook(peerId: PeerId, event: ConnEvent) {.async, gcsafe.} = kinds = kinds + {event.kind} case step: of 0: check: event.kind == ConnEventKind.Connected - peerInfo.peerId == switch2.peerInfo.peerId + peerId == switch2.peerInfo.peerId of 1: check: event.kind == ConnEventKind.Disconnected - check peerInfo.peerId == switch2.peerInfo.peerId + check peerId == switch2.peerInfo.peerId else: check false @@ -358,17 +358,17 @@ suite "Switch": var step = 0 var kinds: set[PeerEventKind] - proc handler(peerInfo: PeerInfo, event: PeerEvent) {.async, gcsafe.} = + proc handler(peerId: PeerId, event: PeerEvent) {.async, gcsafe.} = kinds = kinds + {event.kind} case step: of 0: check: event.kind == PeerEventKind.Joined - peerInfo.peerId == switch2.peerInfo.peerId + peerId == switch2.peerInfo.peerId of 1: check: event.kind == PeerEventKind.Left - peerInfo.peerId == switch2.peerInfo.peerId + peerId == switch2.peerInfo.peerId else: check false @@ -412,17 +412,17 @@ suite "Switch": var step = 0 var kinds: set[PeerEventKind] - proc handler(peerInfo: PeerInfo, event: PeerEvent) {.async, gcsafe.} = + proc handler(peerId: PeerId, event: PeerEvent) {.async, gcsafe.} = kinds = kinds + {event.kind} case step: of 0: check: event.kind == PeerEventKind.Joined - peerInfo.peerId == switch1.peerInfo.peerId + peerId == switch1.peerInfo.peerId of 1: check: event.kind == PeerEventKind.Left - peerInfo.peerId == switch1.peerInfo.peerId + peerId == switch1.peerInfo.peerId else: check false @@ -476,7 +476,7 @@ suite "Switch": var step = 0 var kinds: set[PeerEventKind] - proc handler(peerInfo: PeerInfo, event: PeerEvent) {.async, gcsafe.} = + proc handler(peerId: PeerId, event: PeerEvent) {.async, gcsafe.} = kinds = kinds + {event.kind} case step: of 0: @@ -532,12 +532,14 @@ suite "Switch": let rng = newRng() # use same private keys to emulate two connection from same peer - let peerInfo = PeerInfo.init(PrivateKey.random(rng[]).tryGet()) + let + privateKey = PrivateKey.random(rng[]).tryGet() + peerInfo = PeerInfo.init(privateKey) var switches: seq[Switch] var done = newFuture[void]() var onConnect: Future[void] - proc hook(peerInfo: PeerInfo, event: ConnEvent) {.async, gcsafe.} = + proc hook(peerId: PeerId, event: ConnEvent) {.async, gcsafe.} = case event.kind: of ConnEventKind.Connected: await onConnect @@ -555,7 +557,7 @@ suite "Switch": awaiters.add(await switches[0].start()) switches.add(newStandardSwitch( - privKey = some(peerInfo.privateKey), + privKey = some(privateKey), rng = rng)) onConnect = switches[1].connect(switches[0].peerInfo.peerId, switches[0].peerInfo.addrs) await onConnect @@ -573,13 +575,15 @@ suite "Switch": let rng = newRng() # use same private keys to emulate two connection from same peer - let peerInfo = PeerInfo.init(PrivateKey.random(rng[]).tryGet()) + let + privateKey = PrivateKey.random(rng[]).tryGet() + peerInfo = PeerInfo.init(privateKey) var conns = 1 var switches: seq[Switch] var done = newFuture[void]() var onConnect: Future[void] - proc hook(peerInfo2: PeerInfo, event: ConnEvent) {.async, gcsafe.} = + proc hook(peerId2: PeerId, event: ConnEvent) {.async, gcsafe.} = case event.kind: of ConnEventKind.Connected: if conns == 5: @@ -602,7 +606,7 @@ suite "Switch": for i in 1..5: switches.add(newStandardSwitch( - privKey = some(peerInfo.privateKey), + privKey = some(privateKey), rng = rng)) switches[i].addConnEventHandler(hook, ConnEventKind.Disconnected) onConnect = switches[i].connect(switches[0].peerInfo.peerId, switches[0].peerInfo.addrs) @@ -893,15 +897,9 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo.peerId) check not switch2.isConnected(switch1.peerInfo.peerId) - let storedInfo1 = switch1.peerStore.get(switch2.peerInfo.peerId) - let storedInfo2 = switch2.peerStore.get(switch1.peerInfo.peerId) - check: - storedInfo1.peerId == switch2.peerInfo.peerId - storedInfo2.peerId == switch1.peerInfo.peerId + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() + switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet() - storedInfo1.addrs == switch2.peerInfo.addrs.toHashSet() - storedInfo2.addrs == switch1.peerInfo.addrs.toHashSet() - - storedInfo1.protos == switch2.peerInfo.protocols.toHashSet() - storedInfo2.protos == switch1.peerInfo.protocols.toHashSet() + switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() + switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet()