From d6d255b4b5d6a4fa56db0eb6677ed7391cbb4897 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 31 Oct 2022 22:41:33 -0600 Subject: [PATCH] Cleanups (#55) * limit query to batchSize * allow initializing node from ip and port * misc cleanups --- .../private/eth/p2p/discoveryv5/node.nim | 34 +++++++++++++++---- .../private/eth/p2p/discoveryv5/protocol.nim | 19 +++++++---- .../p2p/discoveryv5/providers/maintenance.nim | 4 +-- libp2pdht/private/eth/p2p/discoveryv5/spr.nim | 18 +++++----- .../private/eth/p2p/discoveryv5/transport.nim | 17 +++++----- tests/dht/test_helper.nim | 4 ++- 6 files changed, 62 insertions(+), 34 deletions(-) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/node.nim b/libp2pdht/private/eth/p2p/discoveryv5/node.nim index 8e1b153..1987751 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/node.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/node.nim @@ -46,6 +46,21 @@ proc toNodeId*(pk: PublicKey): Result[NodeId, cstring] = let pid = ? PeerId.init(pk) ok pid.toNodeId +func newNode*( + ip: IpAddress, + port: Port, + pk: PublicKey, + record: SignedPeerRecord): Result[Node, cstring] = + + let + node = Node( + id: ? pk.toNodeId(), + pubkey: pk, + record: record, + address: Address(ip: ValidIpAddress.init(ip), port: port).some) + + ok node + func newNode*(r: SignedPeerRecord): Result[Node, cstring] = ## Create a new `Node` from a `SignedPeerRecord`. # TODO: Handle IPv6 @@ -59,17 +74,22 @@ func newNode*(r: SignedPeerRecord): Result[Node, cstring] = # Also this can not fail for a properly created record as id is checked upon # deserialization. let - tr = ? r.toTypedRecord() nodeId = ? pk.get().toNodeId() - if tr.ip.isSome() and tr.udp.isSome(): - let a = Address(ip: ipv4(tr.ip.get()), port: Port(tr.udp.get())) + if r.ip.isSome() and r.udp.isSome(): + let a = Address(ip: ipv4(r.ip.get()), port: Port(r.udp.get())) - ok(Node(id: nodeId, pubkey: pk.get() , record: r, - address: some(a))) + ok(Node( + id: nodeId, + pubkey: pk.get(), + record: r, + address: some(a))) else: - ok(Node(id: nodeId, pubkey: pk.get(), record: r, - address: none(Address))) + ok(Node( + id: nodeId, + pubkey: pk.get(), + record: r, + address: none(Address))) proc update*(n: Node, pk: PrivateKey, ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port] = none[Port]()): Result[void, cstring] = diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index 1fb85f6..bd40c2e 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -170,7 +170,6 @@ type ipVote: IpVote enrAutoUpdate: bool talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of - # overkill here, use sequence rng*: ref BrHmacDrbgContext providers: ProvidersManager @@ -1102,12 +1101,19 @@ proc newProtocol*( bindIp = IPv4_any(), config = defaultDiscoveryConfig, rng = newRng(), - providers = ProvidersManager.new( - SQLiteDatastore.new(Memory) - .expect("Should not fail!"))): - Protocol = + providers = ProvidersManager.new(SQLiteDatastore.new(Memory) + .expect("Should not fail!"))): Protocol = + ## Initialize DHT protocol + ## + info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record) - let node = newNode(record).expect("Properly initialized record") + + let + node = newNode( + bindIp, + bindPort, + privKey.getPublicKey.expect("Should get public key"), + record).expect("Properly initialized record") # TODO Consider whether this should be a Defect doAssert rng != nil, "RNG initialization failed" @@ -1125,7 +1131,6 @@ proc newProtocol*( result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) - proc open*(d: Protocol) {.raises: [Defect, CatchableError].} = info "Starting discovery node", node = d.localNode diff --git a/libp2pdht/private/eth/p2p/discoveryv5/providers/maintenance.nim b/libp2pdht/private/eth/p2p/discoveryv5/providers/maintenance.nim index c311cc6..92ca75d 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/providers/maintenance.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/providers/maintenance.nim @@ -32,7 +32,7 @@ proc cleanupExpired*( now = Moment.now() let - q = Query.init(CidKey) + q = Query.init(CidKey, limit = batchSize) block: without iter =? (await store.query(q)), err: @@ -74,7 +74,7 @@ proc cleanupOrphaned*( trace "Cleaning up orphaned records" let - providersQuery = Query.init(ProvidersKey) + providersQuery = Query.init(ProvidersKey, limit = batchSize) block: without iter =? (await store.query(providersQuery)), err: diff --git a/libp2pdht/private/eth/p2p/discoveryv5/spr.nim b/libp2pdht/private/eth/p2p/discoveryv5/spr.nim index 1e90e22..15c6c2a 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/spr.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/spr.nim @@ -184,16 +184,16 @@ proc ip*(r: SignedPeerRecord): Option[array[4, byte]] = # err("MultiAddress must be wire address (tcp, udp or unix)") proc udp*(r: SignedPeerRecord): Option[int] = - for address in r.data.addresses: - let ma = address.address + for address in r.data.addresses: + let ma = address.address - let code = ma[1].get.protoCode() - if code.isOk and code.get == multiCodec("udp"): - var pbuf: array[2, byte] - let res = ma[1].get.protoArgument(pbuf) - if res.isOk: - let p = fromBytesBE(uint16, pbuf) - return some(p.int) + let code = ma[1].get.protoCode() + if code.isOk and code.get == multiCodec("udp"): + var pbuf: array[2, byte] + let res = ma[1].get.protoArgument(pbuf) + if res.isOk: + let p = fromBytesBE(uint16, pbuf) + return some(p.int) proc fromBase64*(r: var SignedPeerRecord, s: string): bool = ## Loads SPR from base64-encoded protobuf-encoded bytes, and validates the diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index 3d7a1e4..532419b 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -197,13 +197,12 @@ proc closeWait*(t: Transport) {.async.} = await t.transp.closeWait proc newTransport*[T]( - client: T, - privKey: PrivateKey, - localNode: Node, - bindPort: Port, - bindIp = IPv4_any(), - rng = newRng()): - Transport[T]= + client: T, + privKey: PrivateKey, + localNode: Node, + bindPort: Port, + bindIp = IPv4_any(), + rng = newRng()): Transport[T]= # TODO Consider whether this should be a Defect doAssert rng != nil, "RNG initialization failed" @@ -211,6 +210,8 @@ proc newTransport*[T]( Transport[T]( client: client, bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort), - codec: Codec(localNode: localNode, privKey: privKey, + codec: Codec( + localNode: localNode, + privKey: privKey, sessions: Sessions.init(256)), rng: rng) diff --git a/tests/dht/test_helper.nim b/tests/dht/test_helper.nim index f6d4957..d8218de 100644 --- a/tests/dht/test_helper.nim +++ b/tests/dht/test_helper.nim @@ -36,7 +36,8 @@ proc initDiscoveryNode*( let protocol = newProtocol( privKey, some(address.ip), - some(address.port), some(address.port), + some(address.port), + some(address.port), bindPort = address.port, bootstrapRecords = bootstrapRecords, localEnrFields = localEnrFields, @@ -54,6 +55,7 @@ proc nodeIdInNodes*(id: NodeId, nodes: openArray[Node]): bool = proc generateNode*(privKey: PrivateKey, port: int = 20302, ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node = + let port = Port(port) spr = SignedPeerRecord.init(1, privKey, some(ip), some(port), some(port))