Cleanups (#55)

* limit query to batchSize

* allow initializing node from ip and port

* misc cleanups
This commit is contained in:
Dmitriy Ryajov 2022-10-31 22:41:33 -06:00 committed by GitHub
parent 08928e57d8
commit d6d255b4b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 34 deletions

View File

@ -46,6 +46,21 @@ proc toNodeId*(pk: PublicKey): Result[NodeId, cstring] =
let pid = ? PeerId.init(pk)
ok pid.toNodeId
func newNode*(
ip: IpAddress,
port: Port,
pk: PublicKey,
record: SignedPeerRecord): Result[Node, cstring] =
let
node = Node(
id: ? pk.toNodeId(),
pubkey: pk,
record: record,
address: Address(ip: ValidIpAddress.init(ip), port: port).some)
ok node
func newNode*(r: SignedPeerRecord): Result[Node, cstring] =
## Create a new `Node` from a `SignedPeerRecord`.
# TODO: Handle IPv6
@ -59,17 +74,22 @@ func newNode*(r: SignedPeerRecord): Result[Node, cstring] =
# Also this can not fail for a properly created record as id is checked upon
# deserialization.
let
tr = ? r.toTypedRecord()
nodeId = ? pk.get().toNodeId()
if tr.ip.isSome() and tr.udp.isSome():
let a = Address(ip: ipv4(tr.ip.get()), port: Port(tr.udp.get()))
if r.ip.isSome() and r.udp.isSome():
let a = Address(ip: ipv4(r.ip.get()), port: Port(r.udp.get()))
ok(Node(id: nodeId, pubkey: pk.get() , record: r,
address: some(a)))
ok(Node(
id: nodeId,
pubkey: pk.get(),
record: r,
address: some(a)))
else:
ok(Node(id: nodeId, pubkey: pk.get(), record: r,
address: none(Address)))
ok(Node(
id: nodeId,
pubkey: pk.get(),
record: r,
address: none(Address)))
proc update*(n: Node, pk: PrivateKey, ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port] = none[Port]()): Result[void, cstring] =

View File

@ -170,7 +170,6 @@ type
ipVote: IpVote
enrAutoUpdate: bool
talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of
# overkill here, use sequence
rng*: ref BrHmacDrbgContext
providers: ProvidersManager
@ -1102,12 +1101,19 @@ proc newProtocol*(
bindIp = IPv4_any(),
config = defaultDiscoveryConfig,
rng = newRng(),
providers = ProvidersManager.new(
SQLiteDatastore.new(Memory)
.expect("Should not fail!"))):
Protocol =
providers = ProvidersManager.new(SQLiteDatastore.new(Memory)
.expect("Should not fail!"))): Protocol =
## Initialize DHT protocol
##
info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record)
let node = newNode(record).expect("Properly initialized record")
let
node = newNode(
bindIp,
bindPort,
privKey.getPublicKey.expect("Should get public key"),
record).expect("Properly initialized record")
# TODO Consider whether this should be a Defect
doAssert rng != nil, "RNG initialization failed"
@ -1125,7 +1131,6 @@ proc newProtocol*(
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
info "Starting discovery node", node = d.localNode

View File

@ -32,7 +32,7 @@ proc cleanupExpired*(
now = Moment.now()
let
q = Query.init(CidKey)
q = Query.init(CidKey, limit = batchSize)
block:
without iter =? (await store.query(q)), err:
@ -74,7 +74,7 @@ proc cleanupOrphaned*(
trace "Cleaning up orphaned records"
let
providersQuery = Query.init(ProvidersKey)
providersQuery = Query.init(ProvidersKey, limit = batchSize)
block:
without iter =? (await store.query(providersQuery)), err:

View File

@ -184,16 +184,16 @@ proc ip*(r: SignedPeerRecord): Option[array[4, byte]] =
# err("MultiAddress must be wire address (tcp, udp or unix)")
proc udp*(r: SignedPeerRecord): Option[int] =
for address in r.data.addresses:
let ma = address.address
for address in r.data.addresses:
let ma = address.address
let code = ma[1].get.protoCode()
if code.isOk and code.get == multiCodec("udp"):
var pbuf: array[2, byte]
let res = ma[1].get.protoArgument(pbuf)
if res.isOk:
let p = fromBytesBE(uint16, pbuf)
return some(p.int)
let code = ma[1].get.protoCode()
if code.isOk and code.get == multiCodec("udp"):
var pbuf: array[2, byte]
let res = ma[1].get.protoArgument(pbuf)
if res.isOk:
let p = fromBytesBE(uint16, pbuf)
return some(p.int)
proc fromBase64*(r: var SignedPeerRecord, s: string): bool =
## Loads SPR from base64-encoded protobuf-encoded bytes, and validates the

View File

@ -197,13 +197,12 @@ proc closeWait*(t: Transport) {.async.} =
await t.transp.closeWait
proc newTransport*[T](
client: T,
privKey: PrivateKey,
localNode: Node,
bindPort: Port,
bindIp = IPv4_any(),
rng = newRng()):
Transport[T]=
client: T,
privKey: PrivateKey,
localNode: Node,
bindPort: Port,
bindIp = IPv4_any(),
rng = newRng()): Transport[T]=
# TODO Consider whether this should be a Defect
doAssert rng != nil, "RNG initialization failed"
@ -211,6 +210,8 @@ proc newTransport*[T](
Transport[T](
client: client,
bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort),
codec: Codec(localNode: localNode, privKey: privKey,
codec: Codec(
localNode: localNode,
privKey: privKey,
sessions: Sessions.init(256)),
rng: rng)

View File

@ -36,7 +36,8 @@ proc initDiscoveryNode*(
let protocol = newProtocol(
privKey,
some(address.ip),
some(address.port), some(address.port),
some(address.port),
some(address.port),
bindPort = address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
@ -54,6 +55,7 @@ proc nodeIdInNodes*(id: NodeId, nodes: openArray[Node]): bool =
proc generateNode*(privKey: PrivateKey, port: int = 20302,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node =
let
port = Port(port)
spr = SignedPeerRecord.init(1, privKey, some(ip), some(port), some(port))