mirror of
https://github.com/codex-storage/nim-codex-dht.git
synced 2025-02-10 19:06:36 +00:00
feat: merge add/get providers with discv5
Merge the add/get providers messages with the discovery v5 messages inside of the discovery v5 code. Discovery v5 uses RLP encoding, while the add/get providers uses protobufs, as per the libp2p spec. The merge of these two uses RLP on the outer wrapper of the message, and a protobuf-encoded blob on the “inside” for the add/get providers messages only. Eventually this needs to change so that we use protobufs only. There is still some code in the libp2pdht/dht directory, which is being read from the discovery v5 directory. Eventually these two should merge as well.
This commit is contained in:
parent
cbcaf926a1
commit
b843c8823c
@ -1,4 +1,4 @@
|
||||
import
|
||||
./dht/providers
|
||||
./dht/[providers_encoding, providers_messages]
|
||||
|
||||
export providers
|
||||
export providers_encoding, providers_messages
|
@ -1,159 +0,0 @@
|
||||
# Copyright (c) 2020-2022 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
import
|
||||
chronos,
|
||||
chronos/timer,
|
||||
chronicles,
|
||||
std/tables, sequtils,
|
||||
stew/byteutils, # toBytes
|
||||
../discv5/[protocol, node],
|
||||
libp2p/routing_record,
|
||||
./providers_messages,
|
||||
./providers_encoding
|
||||
|
||||
type
|
||||
ProvidersProtocol* = ref object
|
||||
providers: Table[NodeId, seq[SignedPeerRecord]]
|
||||
discovery*: protocol.Protocol
|
||||
|
||||
## ---- AddProvider ----
|
||||
|
||||
const
|
||||
protoIdAddProvider = "AP".toBytes()
|
||||
|
||||
proc addProviderLocal(p: ProvidersProtocol, cId: NodeId, prov: SignedPeerRecord) =
|
||||
trace "adding provider to local db", n=p.discovery.localNode, cId, prov
|
||||
p.providers.mgetOrPut(cId, @[]).add(prov)
|
||||
|
||||
proc recvAddProvider(p: ProvidersProtocol, nodeId: NodeId, msg: AddProviderMessage)
|
||||
{.raises: [Defect].} =
|
||||
p.addProviderLocal(msg.cId, msg.prov)
|
||||
#TODO: check that CID is reasonably close to our NodeID
|
||||
|
||||
proc registerAddProvider(p: ProvidersProtocol) =
|
||||
proc handler(protocol: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
|
||||
{.gcsafe, raises: [Defect].} =
|
||||
#TODO: add checks, add signed version
|
||||
let msg = AddProviderMessage.decode(request).get()
|
||||
trace "<<< add_provider ", src = fromId, dst = p.discovery.localNode.id, cid = msg.cId, prov=msg.prov
|
||||
|
||||
recvAddProvider(p, fromId, msg)
|
||||
|
||||
@[] # talk requires a response
|
||||
|
||||
let protocol = TalkProtocol(protocolHandler: handler)
|
||||
discard p.discovery.registerTalkProtocol(protoIdAddProvider, protocol) #TODO: handle error
|
||||
|
||||
proc sendAddProvider*(p: ProvidersProtocol, dst: Node, cId: NodeId, pr: SignedPeerRecord) =
|
||||
#type NodeDesc = tuple[ip: IpAddress, udpPort, tcpPort: Port, pk: PublicKey]
|
||||
let msg = AddProviderMessage(cId: cId, prov: pr)
|
||||
discard p.discovery.talkReq(dst, protoIdAddProvider, msg.encode())
|
||||
|
||||
proc addProvider*(p: ProvidersProtocol, cId: NodeId, pr: SignedPeerRecord): Future[seq[Node]] {.async.} =
|
||||
result = await p.discovery.lookup(cId)
|
||||
trace "lookup returned:", result
|
||||
# TODO: lookup is sepcified as not returning local, even if that is the closest. Is this OK?
|
||||
if result.len == 0:
|
||||
result.add(p.discovery.localNode)
|
||||
for n in result:
|
||||
if n != p.discovery.localNode:
|
||||
p.sendAddProvider(n, cId, pr)
|
||||
else:
|
||||
p.addProviderLocal(cId, pr)
|
||||
|
||||
## ---- GetProviders ----
|
||||
|
||||
const
|
||||
protoIdGetProviders = "GP".toBytes()
|
||||
|
||||
proc sendGetProviders(p: ProvidersProtocol, dst: Node,
|
||||
cId: NodeId): Future[ProvidersMessage]
|
||||
{.async.} =
|
||||
let msg = GetProvidersMessage(cId: cId)
|
||||
trace "sendGetProviders", msg
|
||||
let respbytes = await p.discovery.talkReq(dst, protoIdGetProviders, msg.encode())
|
||||
if respbytes.isOK():
|
||||
let a = respbytes.get()
|
||||
result = ProvidersMessage.decode(a).get()
|
||||
else:
|
||||
trace "sendGetProviders", msg
|
||||
result = ProvidersMessage() #TODO: add error handling
|
||||
|
||||
proc getProvidersLocal*(
|
||||
p: ProvidersProtocol,
|
||||
cId: NodeId,
|
||||
maxitems: int = 5,
|
||||
): seq[SignedPeerRecord] {.raises: [KeyError,Defect].}=
|
||||
result = if (cId in p.providers): p.providers[cId] else: @[]
|
||||
|
||||
proc getProviders*(
|
||||
p: ProvidersProtocol,
|
||||
cId: NodeId,
|
||||
maxitems: int = 5,
|
||||
timeout: timer.Duration = chronos.milliseconds(5000)
|
||||
): Future[seq[SignedPeerRecord]] {.async.} =
|
||||
## Search for providers of the given cId.
|
||||
|
||||
# What providers do we know about?
|
||||
result = p.getProvidersLocal(cId, maxitems)
|
||||
trace "local providers:", result
|
||||
|
||||
let nodesNearby = await p.discovery.lookup(cId)
|
||||
trace "nearby:", nodesNearby
|
||||
var providersFut: seq[Future[ProvidersMessage]]
|
||||
for n in nodesNearby:
|
||||
if n != p.discovery.localNode:
|
||||
providersFut.add(p.sendGetProviders(n, cId))
|
||||
|
||||
while providersFut.len > 0:
|
||||
let providersMsg = await one(providersFut)
|
||||
# trace "Got providers response", providersMsg
|
||||
|
||||
let index = providersFut.find(providersMsg)
|
||||
if index != -1:
|
||||
providersFut.del(index)
|
||||
|
||||
let providersMsg2 = await providersMsg
|
||||
trace "2", providersMsg2
|
||||
|
||||
let providers = providersMsg.read.provs
|
||||
result = result.concat(providers).deduplicate
|
||||
# TODO: hsndle timeout
|
||||
#
|
||||
trace "getProviders collected: ", result
|
||||
|
||||
proc recvGetProviders(p: ProvidersProtocol, nodeId: NodeId, msg: GetProvidersMessage) : ProvidersMessage
|
||||
{.raises: [Defect].} =
|
||||
#TODO: add checks, add signed version
|
||||
let provs = p.providers.getOrDefault(msg.cId)
|
||||
trace "providers:", provs
|
||||
|
||||
##TODO: handle multiple messages
|
||||
ProvidersMessage(total: 1, provs: provs)
|
||||
|
||||
proc registerGetProviders(p: ProvidersProtocol) =
|
||||
proc handler(protocol: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
|
||||
{.gcsafe, raises: [Defect].} =
|
||||
let msg = GetProvidersMessage.decode(request).get()
|
||||
trace "<<< get_providers ", src = fromId, dst = p.discovery.localNode.id, cid = msg.cId
|
||||
|
||||
let returnMsg = recvGetProviders(p, fromId, msg)
|
||||
trace "returnMsg", returnMsg
|
||||
|
||||
try:
|
||||
returnMsg.encode()
|
||||
except ResultError[CryptoError]:
|
||||
return @[]
|
||||
|
||||
let protocol = TalkProtocol(protocolHandler: handler)
|
||||
discard p.discovery.registerTalkProtocol(protoIdGetProviders, protocol) #TODO: handle error
|
||||
|
||||
proc newProvidersProtocol*(d: protocol.Protocol) : ProvidersProtocol =
|
||||
result.new()
|
||||
result.discovery = d
|
||||
result.registerAddProvider()
|
||||
result.registerGetProviders()
|
@ -15,7 +15,10 @@
|
||||
import
|
||||
std/[hashes, net],
|
||||
eth/[keys],
|
||||
./enr
|
||||
./enr,
|
||||
../../../../dht/providers_messages
|
||||
|
||||
export providers_messages
|
||||
|
||||
type
|
||||
MessageKind* = enum
|
||||
@ -34,6 +37,9 @@ type
|
||||
ticket = 0x08
|
||||
regConfirmation = 0x09
|
||||
topicQuery = 0x0A
|
||||
addProvider = 0x0B
|
||||
getProviders = 0x0C
|
||||
providers = 0x0D
|
||||
|
||||
RequestId* = object
|
||||
id*: seq[byte]
|
||||
@ -67,7 +73,8 @@ type
|
||||
TopicQueryMessage* = object
|
||||
|
||||
SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or
|
||||
TalkReqMessage or TalkRespMessage
|
||||
TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or
|
||||
ProvidersMessage
|
||||
|
||||
Message* = object
|
||||
reqId*: RequestId
|
||||
@ -92,6 +99,12 @@ type
|
||||
regConfirmation*: RegConfirmationMessage
|
||||
of topicQuery:
|
||||
topicQuery*: TopicQueryMessage
|
||||
of addProvider:
|
||||
addProvider*: AddProviderMessage
|
||||
of getProviders:
|
||||
getProviders*: GetProvidersMessage
|
||||
of providers:
|
||||
provs*: ProvidersMessage
|
||||
else:
|
||||
discard
|
||||
|
||||
@ -102,6 +115,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind =
|
||||
elif T is NodesMessage: nodes
|
||||
elif T is TalkReqMessage: talkReq
|
||||
elif T is TalkRespMessage: talkResp
|
||||
elif T is AddProviderMessage: addProvider
|
||||
elif T is GetProvidersMessage: getProviders
|
||||
elif T is ProvidersMessage: providers
|
||||
|
||||
proc hash*(reqId: RequestId): Hash =
|
||||
hash(reqId.id)
|
||||
|
@ -12,7 +12,11 @@ import
|
||||
std/net,
|
||||
stew/arrayops,
|
||||
eth/[rlp],
|
||||
"."/[messages, enr]
|
||||
chronicles,
|
||||
libp2p/routing_record,
|
||||
libp2p/signed_envelope,
|
||||
"."/[messages, enr],
|
||||
../../../../dht/providers_encoding
|
||||
|
||||
from stew/objects import checkedEnumAssign
|
||||
|
||||
@ -60,17 +64,32 @@ proc numFields(T: typedesc): int =
|
||||
for k, v in fieldPairs(default(T)): inc result
|
||||
|
||||
proc encodeMessage*[T: SomeMessage](p: T, reqId: RequestId): seq[byte] =
|
||||
# TODO: Remove all RLP encoding in favour of Protobufs
|
||||
result = newSeqOfCap[byte](64)
|
||||
result.add(messageKind(T).ord)
|
||||
|
||||
const sz = numFields(T)
|
||||
const
|
||||
usePbs = T is AddProviderMessage | GetProvidersMessage | ProvidersMessage
|
||||
sz = if usePbs: 1 else: numFields(T)
|
||||
|
||||
var writer = initRlpList(sz + 1)
|
||||
writer.append(reqId)
|
||||
for k, v in fieldPairs(p):
|
||||
writer.append(v)
|
||||
|
||||
when usePbs:
|
||||
let encoded =
|
||||
try: p.encode()
|
||||
except ResultError[CryptoError] as e:
|
||||
error "Failed to encode protobuf message", typ = $T, msg = e.msg
|
||||
@[]
|
||||
writer.append(encoded)
|
||||
trace "Encoded protobuf message", typ = $T, encoded
|
||||
else:
|
||||
for k, v in fieldPairs(p):
|
||||
writer.append(v)
|
||||
result.add(writer.finish())
|
||||
|
||||
proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
|
||||
# TODO: Remove all RLP decoding in favour of Protobufs
|
||||
## Decodes to the specific `Message` type.
|
||||
if body.len < 1:
|
||||
return err("No message data")
|
||||
@ -101,6 +120,24 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
|
||||
of nodes: rlp.decode(message.nodes)
|
||||
of talkReq: rlp.decode(message.talkReq)
|
||||
of talkResp: rlp.decode(message.talkResp)
|
||||
of addProvider:
|
||||
let res = AddProviderMessage.decode(rlp.toBytes)
|
||||
if res.isOk:
|
||||
message.addProvider = res.get
|
||||
else:
|
||||
return err "Unable to decode AddProviderMessage"
|
||||
of getProviders:
|
||||
let res = GetProvidersMessage.decode(rlp.toBytes)
|
||||
if res.isOk:
|
||||
message.getProviders = res.get
|
||||
else:
|
||||
return err "Unable to decode GetProvidersMessage"
|
||||
of providers:
|
||||
let res = ProvidersMessage.decode(rlp.toBytes)
|
||||
if res.isOk:
|
||||
message.provs = res.get
|
||||
else:
|
||||
return err "Unable to decode ProvidersMessage"
|
||||
of regTopic, ticket, regConfirmation, topicQuery:
|
||||
# We just pass the empty type of this message without attempting to
|
||||
# decode, so that the protocol knows what was received.
|
||||
|
@ -76,8 +76,8 @@
|
||||
import
|
||||
std/[tables, sets, options, math, sequtils, algorithm],
|
||||
stew/shims/net as stewNet, json_serialization/std/net,
|
||||
stew/[endians2, results], chronicles, chronos, stint, bearssl, metrics,
|
||||
eth/[rlp, keys, async_utils],
|
||||
stew/[endians2, results], chronicles, chronos, chronos/timer, stint, bearssl,
|
||||
metrics, eth/[rlp, keys, async_utils], libp2p/routing_record,
|
||||
"."/[transport, messages, messages_encoding, node, routing_table, enr, random2, ip_vote, nodes_verification]
|
||||
|
||||
import nimcrypto except toHex
|
||||
@ -120,7 +120,7 @@ type
|
||||
|
||||
Protocol* = ref object
|
||||
localNode*: Node
|
||||
privateKey: PrivateKey
|
||||
privateKey: keys.PrivateKey
|
||||
transport*: Transport[Protocol] # exported for tests
|
||||
routingTable*: RoutingTable
|
||||
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
|
||||
@ -134,6 +134,7 @@ type
|
||||
talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of
|
||||
# overkill here, use sequence
|
||||
rng*: ref BrHmacDrbgContext
|
||||
providers: Table[NodeId, seq[SignedPeerRecord]]
|
||||
|
||||
TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
|
||||
{.gcsafe, raises: [Defect].}
|
||||
@ -209,7 +210,7 @@ proc neighboursAtDistances*(d: Protocol, distances: seq[uint16],
|
||||
|
||||
proc nodesDiscovered*(d: Protocol): int = d.routingTable.len
|
||||
|
||||
func privKey*(d: Protocol): lent PrivateKey =
|
||||
func privKey*(d: Protocol): lent keys.PrivateKey =
|
||||
d.privateKey
|
||||
|
||||
func getRecord*(d: Protocol): Record =
|
||||
@ -301,6 +302,25 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||
kind = MessageKind.talkresp
|
||||
d.sendResponse(fromId, fromAddr, talkresp, reqId)
|
||||
|
||||
proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) =
|
||||
trace "adding provider to local db", n=p.localNode, cId, prov
|
||||
p.providers.mgetOrPut(cId, @[]).add(prov)
|
||||
|
||||
proc handleAddProvider(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||
addProvider: AddProviderMessage, reqId: RequestId) =
|
||||
d.addProviderLocal(addProvider.cId, addProvider.prov)
|
||||
|
||||
proc handleGetProviders(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||
getProviders: GetProvidersMessage, reqId: RequestId) =
|
||||
|
||||
#TODO: add checks, add signed version
|
||||
let provs = d.providers.getOrDefault(getProviders.cId)
|
||||
trace "providers:", provs
|
||||
|
||||
##TODO: handle multiple messages
|
||||
let response = ProvidersMessage(total: 1, provs: provs)
|
||||
d.sendResponse(fromId, fromAddr, response, reqId)
|
||||
|
||||
proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
|
||||
message: Message) =
|
||||
case message.kind
|
||||
@ -313,6 +333,13 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
|
||||
of talkReq:
|
||||
discovery_message_requests_incoming.inc()
|
||||
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
|
||||
of addProvider:
|
||||
discovery_message_requests_incoming.inc()
|
||||
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
|
||||
d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId)
|
||||
of getProviders:
|
||||
discovery_message_requests_incoming.inc()
|
||||
d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
|
||||
of regTopic, topicQuery:
|
||||
discovery_message_requests_incoming.inc()
|
||||
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
|
||||
@ -553,6 +580,101 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||
d.lastLookup = now(chronos.Moment)
|
||||
return closestNodes
|
||||
|
||||
proc addProvider*(
|
||||
d: Protocol,
|
||||
cId: NodeId,
|
||||
pr: SignedPeerRecord): Future[seq[Node]] {.async.} =
|
||||
|
||||
var res = await d.lookup(cId)
|
||||
trace "lookup returned:", res
|
||||
# TODO: lookup is sepcified as not returning local, even if that is the closest. Is this OK?
|
||||
if res.len == 0:
|
||||
res.add(d.localNode)
|
||||
for toNode in res:
|
||||
if toNode != d.localNode:
|
||||
discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr))
|
||||
else:
|
||||
d.addProviderLocal(cId, pr)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
proc sendGetProviders(d: Protocol, toNode: Node,
|
||||
cId: NodeId): Future[DiscResult[ProvidersMessage]]
|
||||
{.async.} =
|
||||
let msg = GetProvidersMessage(cId: cId)
|
||||
trace "sendGetProviders", toNode, msg
|
||||
|
||||
let
|
||||
reqId = d.sendRequest(toNode, msg)
|
||||
resp = await d.waitMessage(toNode, reqId)
|
||||
|
||||
if resp.isSome():
|
||||
if resp.get().kind == providers:
|
||||
d.routingTable.setJustSeen(toNode)
|
||||
return ok(resp.get().provs)
|
||||
else:
|
||||
# TODO: do we need to do something when there is an invalid response?
|
||||
d.replaceNode(toNode)
|
||||
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
||||
return err("Invalid response to GetProviders message")
|
||||
else:
|
||||
# TODO: do we need to do something when there is no response?
|
||||
d.replaceNode(toNode)
|
||||
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
|
||||
return err("GetProviders response message not received in time")
|
||||
|
||||
proc getProvidersLocal*(
|
||||
d: Protocol,
|
||||
cId: NodeId,
|
||||
maxitems: int = 5,
|
||||
): seq[SignedPeerRecord] {.raises: [KeyError,Defect].} =
|
||||
|
||||
return
|
||||
if (cId in d.providers): d.providers[cId]
|
||||
else: @[]
|
||||
|
||||
proc getProviders*(
|
||||
d: Protocol,
|
||||
cId: NodeId,
|
||||
maxitems: int = 5,
|
||||
timeout: timer.Duration = chronos.milliseconds(5000)
|
||||
): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} =
|
||||
|
||||
# What providers do we know about?
|
||||
var res = d.getProvidersLocal(cId, maxitems)
|
||||
trace "local providers:", res
|
||||
|
||||
let nodesNearby = await d.lookup(cId)
|
||||
trace "nearby:", nodesNearby
|
||||
var providersFut: seq[Future[DiscResult[ProvidersMessage]]]
|
||||
for n in nodesNearby:
|
||||
if n != d.localNode:
|
||||
providersFut.add(d.sendGetProviders(n, cId))
|
||||
|
||||
while providersFut.len > 0:
|
||||
let providersMsg = await one(providersFut)
|
||||
# trace "Got providers response", providersMsg
|
||||
|
||||
let index = providersFut.find(providersMsg)
|
||||
if index != -1:
|
||||
providersFut.del(index)
|
||||
|
||||
let providersMsg2 = await providersMsg
|
||||
trace "2", providersMsg2
|
||||
|
||||
let providersMsgRes = providersMsg.read
|
||||
if providersMsgRes.isOk:
|
||||
let providers = providersMsgRes.get.provs
|
||||
res = res.concat(providers).deduplicate
|
||||
else:
|
||||
error "Sending of GetProviders message failed", error = providersMsgRes.error
|
||||
# TODO: should we consider this as an error result if all GetProviders
|
||||
# requests fail??
|
||||
trace "getProviders collected: ", res
|
||||
|
||||
return ok res
|
||||
|
||||
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||
{.async.} =
|
||||
## Query k nodes for the given target, returns all nodes found, including the
|
||||
@ -778,7 +900,7 @@ func init*(
|
||||
)
|
||||
|
||||
proc newProtocol*(
|
||||
privKey: PrivateKey,
|
||||
privKey: keys.PrivateKey,
|
||||
enrIp: Option[ValidIpAddress],
|
||||
enrTcpPort, enrUdpPort: Option[Port],
|
||||
localEnrFields: openArray[(string, seq[byte])] = [],
|
||||
@ -788,7 +910,7 @@ proc newProtocol*(
|
||||
bindIp = IPv4_any(),
|
||||
enrAutoUpdate = false,
|
||||
config = defaultDiscoveryConfig,
|
||||
rng = newRng()):
|
||||
rng = keys.newRng()):
|
||||
Protocol =
|
||||
# TODO: Tried adding bindPort = udpPort as parameter but that gave
|
||||
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
|
||||
|
@ -30,16 +30,6 @@ import
|
||||
|
||||
# suite "Providers Tests: node alone":
|
||||
|
||||
proc initProvidersNode(
|
||||
rng: ref BrHmacDrbgContext,
|
||||
privKey: keys.PrivateKey,
|
||||
address: Address,
|
||||
bootstrapRecords: openArray[Record] = []):
|
||||
ProvidersProtocol =
|
||||
|
||||
let d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
|
||||
newProvidersProtocol(d)
|
||||
|
||||
proc toSignedPeerRecord(privKey: crypto.PrivateKey) : SignedPeerRecord =
|
||||
## handle conversion between the two worlds
|
||||
|
||||
@ -49,28 +39,39 @@ proc toSignedPeerRecord(privKey: crypto.PrivateKey) : SignedPeerRecord =
|
||||
return SignedPeerRecord.init(privKey, pr).expect("Should init SignedPeerRecord with private key")
|
||||
# trace "IDs", discNodeId, digest, mh, peerId=result.peerId.hex
|
||||
|
||||
proc bootstrapNodes(nodecount: int, bootnodes: openArray[Record], rng = keys.newRng()) : seq[(ProvidersProtocol, keys.PrivateKey)] =
|
||||
proc bootstrapNodes(
|
||||
nodecount: int,
|
||||
bootnodes: openArray[Record],
|
||||
rng = keys.newRng()
|
||||
) : seq[(discv5_protocol.Protocol, keys.PrivateKey)] =
|
||||
|
||||
for i in 0..<nodecount:
|
||||
let privKey = keys.PrivateKey.random(rng[])
|
||||
let node = initProvidersNode(rng, privKey, localAddress(20302 + i), bootnodes)
|
||||
node.discovery.start()
|
||||
result.add((node, privKey))
|
||||
debug "---- STARTING BOOSTRAPS ---"
|
||||
for i in 0..<nodecount:
|
||||
let privKey = keys.PrivateKey.random(rng[])
|
||||
let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes)
|
||||
node.start()
|
||||
result.add((node, privKey))
|
||||
debug "---- STARTING BOOSTRAPS ---"
|
||||
|
||||
#await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs
|
||||
#await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs
|
||||
|
||||
proc bootstrapNetwork(
|
||||
nodecount: int,
|
||||
rng = keys.newRng()
|
||||
) : seq[(discv5_protocol.Protocol, keys.PrivateKey)] =
|
||||
|
||||
proc bootstrapNetwork(nodecount: int, rng = keys.newRng()) : seq[(ProvidersProtocol, keys.PrivateKey)] =
|
||||
let
|
||||
bootNodeKey = keys.PrivateKey.fromHex(
|
||||
"a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[]
|
||||
bootNodeAddr = localAddress(20301)
|
||||
bootNode = initProvidersNode(rng, bootNodeKey, bootNodeAddr, @[]) # just a shortcut for new and open
|
||||
bootNode = initDiscoveryNode(rng, bootNodeKey, bootNodeAddr, @[]) # just a shortcut for new and open
|
||||
|
||||
#waitFor bootNode.bootstrap() # immediate, since no bootnodes are defined above
|
||||
|
||||
result = bootstrapNodes(nodecount - 1, @[bootnode.discovery.localNode.record], rng = rng)
|
||||
result.insert((bootNode, bootNodeKey), 0)
|
||||
var res = bootstrapNodes(nodecount - 1,
|
||||
@[bootnode.localNode.record],
|
||||
rng)
|
||||
res.insert((bootNode, bootNodeKey), 0)
|
||||
return res
|
||||
|
||||
# TODO: Remove this once we have removed all traces of nim-eth/keys
|
||||
func pkToPk(pk: keys.PrivateKey) : Option[crypto.PrivateKey] =
|
||||
@ -82,9 +83,9 @@ func pkToPk(pk: keys.PrivateKey) : Option[crypto.PrivateKey] =
|
||||
suite "Providers Tests: node alone":
|
||||
var
|
||||
rng: ref HmacDrbgContext
|
||||
nodes: seq[(ProvidersProtocol, keys.PrivateKey)]
|
||||
nodes: seq[(discv5_protocol.Protocol, keys.PrivateKey)]
|
||||
targetId: NodeId
|
||||
node0: ProvidersProtocol
|
||||
node0: discv5_protocol.Protocol
|
||||
privKey_keys0: keys.PrivateKey
|
||||
privKey0: crypto.PrivateKey
|
||||
signedPeerRec0: SignedPeerRecord
|
||||
@ -101,7 +102,7 @@ suite "Providers Tests: node alone":
|
||||
|
||||
teardownAll:
|
||||
for (n, _) in nodes:
|
||||
await n.discovery.closeWait()
|
||||
await n.closeWait()
|
||||
await sleepAsync(chronos.seconds(3))
|
||||
|
||||
|
||||
@ -112,16 +113,18 @@ suite "Providers Tests: node alone":
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (addedTo.len == 1)
|
||||
check (addedTo[0].id == node0.discovery.localNode.id)
|
||||
check (addedTo[0].id == node0.localNode.id)
|
||||
check (node0.getProvidersLocal(targetId)[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "Node in isolation should retrieve":
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await node0.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
let providersRes = await node0.getProviders(targetId)
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check providersRes.isOk
|
||||
let providers = providersRes.get
|
||||
debug "Providers:", providers
|
||||
check (providers.len > 0 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "Should not retrieve bogus":
|
||||
@ -129,10 +132,12 @@ suite "Providers Tests: node alone":
|
||||
let bogusId = toNodeId(keys.PrivateKey.random(rng[]).toPublicKey)
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await node0.getProviders(bogusId)
|
||||
debug "Providers:", providers
|
||||
let providersRes = await node0.getProviders(bogusId)
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check providersRes.isOk
|
||||
let providers = providersRes.get
|
||||
debug "Providers:", providers
|
||||
check (providers.len == 0)
|
||||
|
||||
|
||||
@ -140,9 +145,9 @@ suite "Providers Tests: two nodes":
|
||||
|
||||
var
|
||||
rng: ref HmacDrbgContext
|
||||
nodes: seq[(ProvidersProtocol, keys.PrivateKey)]
|
||||
nodes: seq[(discv5_protocol.Protocol, keys.PrivateKey)]
|
||||
targetId: NodeId
|
||||
node0: ProvidersProtocol
|
||||
node0: discv5_protocol.Protocol
|
||||
privKey_keys0: keys.PrivateKey
|
||||
privKey0: crypto.PrivateKey
|
||||
signedPeerRec0: SignedPeerRecord
|
||||
@ -159,7 +164,7 @@ suite "Providers Tests: two nodes":
|
||||
|
||||
teardownAll:
|
||||
for (n, _) in nodes:
|
||||
await n.discovery.closeWait()
|
||||
await n.closeWait()
|
||||
await sleepAsync(chronos.seconds(3))
|
||||
|
||||
test "2 nodes, store and retrieve from same":
|
||||
@ -169,19 +174,22 @@ suite "Providers Tests: two nodes":
|
||||
debug "Provider added to: ", addedTo
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await node0.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
let providersRes = await node0.getProviders(targetId)
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check providersRes.isOk
|
||||
let providers = providersRes.get
|
||||
debug "Providers:", providers
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "2 nodes, retrieve from other":
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let (node1, _) = nodes[1]
|
||||
let providers = await node1.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
let providersRes = await node1.getProviders(targetId)
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
let providers = providersRes.get
|
||||
debug "Providers:", providers
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
|
||||
@ -190,9 +198,9 @@ suite "Providers Tests: 20 nodes":
|
||||
|
||||
var
|
||||
rng: ref HmacDrbgContext
|
||||
nodes: seq[(ProvidersProtocol, keys.PrivateKey)]
|
||||
nodes: seq[(discv5_protocol.Protocol, keys.PrivateKey)]
|
||||
targetId: NodeId
|
||||
node0: ProvidersProtocol
|
||||
node0: discv5_protocol.Protocol
|
||||
privKey_keys0: keys.PrivateKey
|
||||
privKey0: crypto.PrivateKey
|
||||
signedPeerRec0: SignedPeerRecord
|
||||
@ -211,7 +219,7 @@ suite "Providers Tests: 20 nodes":
|
||||
|
||||
teardownAll:
|
||||
for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here
|
||||
await n.discovery.closeWait()
|
||||
await n.closeWait()
|
||||
|
||||
test "20 nodes, store and retrieve from same":
|
||||
|
||||
@ -220,25 +228,27 @@ suite "Providers Tests: 20 nodes":
|
||||
debug "Provider added to: ", addedTo
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await node0.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
let providersRes = await node0.getProviders(targetId)
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
let providers = providersRes.get
|
||||
debug "Providers:", providers
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "20 nodes, retrieve from other":
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let (node19, _) = nodes[^2]
|
||||
let providers = await node19.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
let providersRes = await node19.getProviders(targetId)
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
let providers = providersRes.get
|
||||
debug "Providers:", providers
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
# test "20 nodes, retieve after bootnode dies":
|
||||
# # TODO: currently this is not working even with a 2 minute timeout
|
||||
# debug "---- KILLING BOOTSTRAP NODE ---"
|
||||
# await nodes[0].discovery.closeWait()
|
||||
# await nodes[0].closeWait()
|
||||
|
||||
# debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
# let providers = await nodes[^2].getProviders(targetId)
|
||||
|
Loading…
x
Reference in New Issue
Block a user