mirror of
https://github.com/logos-storage/logos-storage-nim-dht.git
synced 2026-01-02 13:33:08 +00:00
feat: Swap PeerRecords with SignedPeerRecords
Providers now add/get SignedPeerRecords to/from the DHT. Changed the PeerId calculation to use the public key of the discovery node.
This commit is contained in:
parent
4a2a6878b4
commit
35026762b4
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
coverage
|
||||
nimcache
|
||||
nimcache
|
||||
tests/testAll
|
||||
|
||||
@ -14,7 +14,7 @@ requires "nim >= 1.2.0",
|
||||
"chronicles >= 0.10.2 & < 0.11.0",
|
||||
"chronos >= 3.0.11 & < 3.1.0",
|
||||
"eth >= 1.0.0 & < 1.1.0", # to be removed in https://github.com/status-im/nim-libp2p-dht/issues/2
|
||||
"libp2p#unstable",
|
||||
"libp2p#316f205381f9015402a53009c8f61bf17d2989b5",
|
||||
"metrics",
|
||||
"protobufserialization >= 0.2.0 & < 0.3.0",
|
||||
"secp256k1 >= 0.5.2 & < 0.6.0",
|
||||
@ -58,3 +58,4 @@ task coverage, "generates code coverage report":
|
||||
exec("genhtml coverage/coverage.f.info --output-directory coverage/report")
|
||||
echo "Opening HTML coverage report in browser..."
|
||||
exec("open coverage/report/index.html")
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@ import
|
||||
|
||||
type
|
||||
ProvidersProtocol* = ref object
|
||||
providers: Table[NodeId, seq[PeerRecord]]
|
||||
providers: Table[NodeId, seq[SignedPeerRecord]]
|
||||
discovery*: protocol.Protocol
|
||||
|
||||
## ---- AddProvider ----
|
||||
@ -25,7 +25,7 @@ type
|
||||
const
|
||||
protoIdAddProvider = "AP".toBytes()
|
||||
|
||||
proc addProviderLocal(p: ProvidersProtocol, cId: NodeId, prov: PeerRecord) =
|
||||
proc addProviderLocal(p: ProvidersProtocol, cId: NodeId, prov: SignedPeerRecord) =
|
||||
trace "adding provider to local db", n=p.discovery.localNode, cId, prov
|
||||
p.providers.mgetOrPut(cId, @[]).add(prov)
|
||||
|
||||
@ -48,12 +48,12 @@ proc registerAddProvider(p: ProvidersProtocol) =
|
||||
let protocol = TalkProtocol(protocolHandler: handler)
|
||||
discard p.discovery.registerTalkProtocol(protoIdAddProvider, protocol) #TODO: handle error
|
||||
|
||||
proc sendAddProvider*(p: ProvidersProtocol, dst: Node, cId: NodeId, pr: PeerRecord) =
|
||||
proc sendAddProvider*(p: ProvidersProtocol, dst: Node, cId: NodeId, pr: SignedPeerRecord) =
|
||||
#type NodeDesc = tuple[ip: IpAddress, udpPort, tcpPort: Port, pk: PublicKey]
|
||||
let msg = AddProviderMessage(cId: cId, prov: pr)
|
||||
discard p.discovery.talkReq(dst, protoIdAddProvider, msg.encode())
|
||||
|
||||
proc addProvider*(p: ProvidersProtocol, cId: NodeId, pr: PeerRecord): Future[seq[Node]] {.async.} =
|
||||
proc addProvider*(p: ProvidersProtocol, cId: NodeId, pr: SignedPeerRecord): Future[seq[Node]] {.async.} =
|
||||
result = await p.discovery.lookup(cId)
|
||||
trace "lookup returned:", result
|
||||
# TODO: lookup is sepcified as not returning local, even if that is the closest. Is this OK?
|
||||
@ -87,7 +87,7 @@ proc getProvidersLocal*(
|
||||
p: ProvidersProtocol,
|
||||
cId: NodeId,
|
||||
maxitems: int = 5,
|
||||
): seq[PeerRecord] {.raises: [KeyError,Defect].}=
|
||||
): seq[SignedPeerRecord] {.raises: [KeyError,Defect].}=
|
||||
result = if (cId in p.providers): p.providers[cId] else: @[]
|
||||
|
||||
proc getProviders*(
|
||||
@ -95,7 +95,7 @@ proc getProviders*(
|
||||
cId: NodeId,
|
||||
maxitems: int = 5,
|
||||
timeout: timer.Duration = chronos.milliseconds(5000)
|
||||
): Future[seq[PeerRecord]] {.async.} =
|
||||
): Future[seq[SignedPeerRecord]] {.async.} =
|
||||
## Search for providers of the given cId.
|
||||
|
||||
# What providers do we know about?
|
||||
@ -144,7 +144,10 @@ proc registerGetProviders(p: ProvidersProtocol) =
|
||||
let returnMsg = recvGetProviders(p, fromId, msg)
|
||||
trace "returnMsg", returnMsg
|
||||
|
||||
returnMsg.encode()
|
||||
try:
|
||||
returnMsg.encode()
|
||||
except ResultError[CryptoError]:
|
||||
return @[]
|
||||
|
||||
let protocol = TalkProtocol(protocolHandler: handler)
|
||||
discard p.discovery.registerTalkProtocol(protoIdGetProviders, protocol) #TODO: handle error
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import
|
||||
../discv5/[node],
|
||||
libp2p/routing_record,
|
||||
libp2p/[routing_record, signed_envelope],
|
||||
libp2p/protobuf/minprotobuf,
|
||||
./providers_messages
|
||||
|
||||
@ -34,12 +34,22 @@ func getField*(pb: ProtoBuffer, field: int,
|
||||
else:
|
||||
err(ProtoError.IncorrectBlob)
|
||||
|
||||
func write*(pb: var ProtoBuffer, field: int, pr: PeerRecord) =
|
||||
## Write PeerRecord value ``pr`` to object ``pb`` using ProtoBuf's encoding.
|
||||
write(pb, field, pr.encode())
|
||||
func write*[T: SignedPeerRecord | PeerRecord | Envelope](
|
||||
pb: var ProtoBuffer,
|
||||
field: int,
|
||||
env: T) {.raises: [Defect, ResultError[CryptoError]].} =
|
||||
|
||||
## Write Envelope value ``env`` to object ``pb`` using ProtoBuf's encoding.
|
||||
let encoded = env.encode().tryGet()
|
||||
write(pb, field, encoded)
|
||||
|
||||
# TODO: This should be included upstream in libp2p/signed_envelope. Once it's
|
||||
# added in libp2p, we can remove it from here.
|
||||
proc encode*[T](msg: SignedPayload[T]): Result[seq[byte], CryptoError] =
|
||||
msg.envelope.encode()
|
||||
|
||||
proc getRepeatedField*(pb: ProtoBuffer, field: int,
|
||||
value: var seq[PeerRecord]): ProtoResult[bool] {.
|
||||
value: var seq[SignedPeerRecord]): ProtoResult[bool] {.
|
||||
inline.} =
|
||||
var items: seq[seq[byte]]
|
||||
value.setLen(0)
|
||||
@ -48,7 +58,7 @@ proc getRepeatedField*(pb: ProtoBuffer, field: int,
|
||||
ok(false)
|
||||
else:
|
||||
for item in items:
|
||||
let ma = PeerRecord.decode(item)
|
||||
let ma = SignedPeerRecord.decode(item)
|
||||
if ma.isOk():
|
||||
value.add(ma.get())
|
||||
else:
|
||||
@ -102,7 +112,6 @@ proc decode*(
|
||||
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var msg = ProvidersMessage()
|
||||
|
||||
? pb.getRequiredField(1, msg.total)
|
||||
discard ? pb.getRepeatedField(2, msg.provs)
|
||||
|
||||
@ -117,3 +126,4 @@ proc encode*(msg: ProvidersMessage): seq[byte] =
|
||||
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
|
||||
@ -5,11 +5,11 @@ import
|
||||
type
|
||||
AddProviderMessage* = object
|
||||
cId*: NodeId
|
||||
prov*: PeerRecord
|
||||
prov*: SignedPeerRecord
|
||||
|
||||
GetProvidersMessage* = object
|
||||
cId*: NodeId
|
||||
|
||||
ProvidersMessage* = object
|
||||
total*: uint32
|
||||
provs*: seq[PeerRecord]
|
||||
provs*: seq[SignedPeerRecord]
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/sequtils,
|
||||
chronos, stew/byteutils, nimcrypto, asynctest,
|
||||
eth/keys,
|
||||
@ -17,9 +18,12 @@ import
|
||||
chronicles,
|
||||
libp2pdht/discv5/protocol as discv5_protocol,
|
||||
test_helper,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/routing_record,
|
||||
libp2p/multihash,
|
||||
libp2p/multicodec
|
||||
libp2p/multicodec,
|
||||
libp2p/signed_envelope
|
||||
|
||||
|
||||
|
||||
@ -35,34 +39,31 @@ proc initProvidersNode(
|
||||
let d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
|
||||
newProvidersProtocol(d)
|
||||
|
||||
proc toPeerRecord(p: ProvidersProtocol) : PeerRecord =
|
||||
## hadle conversion between the two worlds
|
||||
proc toSignedPeerRecord(privKey: crypto.PrivateKey) : SignedPeerRecord =
|
||||
## handle conversion between the two worlds
|
||||
|
||||
#NodeId is a keccak-256 hash created by keccak256.digest and stored as UInt256
|
||||
let discNodeId = p.discovery.localNode.id
|
||||
## get it back to MDigest form
|
||||
var digest: MDigest[256]
|
||||
digest.data = discNodeId.toBytesBE
|
||||
## get into a MultiHash
|
||||
var mh = MultiHash.init(multiCodec("keccak-256"), digest).orError(HashError).get()
|
||||
result = PeerRecord.init(
|
||||
peerId = PeerId.init(mh.data.buffer).get,
|
||||
let pr = PeerRecord.init(
|
||||
peerId = PeerId.init(privKey.getPublicKey.get).get,
|
||||
seqNo = 0,
|
||||
addresses = @[])
|
||||
return SignedPeerRecord.init(privKey, pr).expect("Should init SignedPeerRecord with private key")
|
||||
# trace "IDs", discNodeId, digest, mh, peerId=result.peerId.hex
|
||||
|
||||
proc bootstrapNodes(nodecount: int, bootnodes: openArray[Record], rng = keys.newRng()) : seq[ProvidersProtocol] =
|
||||
proc bootstrapNodes(nodecount: int, bootnodes: openArray[Record], rng = keys.newRng()) : seq[(ProvidersProtocol, keys.PrivateKey)] =
|
||||
|
||||
for i in 0..<nodecount:
|
||||
let node = initProvidersNode(rng, keys.PrivateKey.random(rng[]), localAddress(20302 + i), bootnodes)
|
||||
let privKey = keys.PrivateKey.random(rng[])
|
||||
let node = initProvidersNode(rng, privKey, localAddress(20302 + i), bootnodes)
|
||||
node.discovery.start()
|
||||
result.add(node)
|
||||
result.add((node, privKey))
|
||||
debug "---- STARTING BOOSTRAPS ---"
|
||||
|
||||
#await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs
|
||||
|
||||
proc bootstrapNetwork(nodecount: int, rng = keys.newRng()) : seq[ProvidersProtocol] =
|
||||
proc bootstrapNetwork(nodecount: int, rng = keys.newRng()) : seq[(ProvidersProtocol, keys.PrivateKey)] =
|
||||
let
|
||||
privKey = keys.PrivateKey.fromHex(
|
||||
"a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[]
|
||||
bootNodeKey = keys.PrivateKey.fromHex(
|
||||
"a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[]
|
||||
bootNodeAddr = localAddress(20301)
|
||||
@ -71,55 +72,66 @@ proc bootstrapNetwork(nodecount: int, rng = keys.newRng()) : seq[ProvidersProtoc
|
||||
#waitFor bootNode.bootstrap() # immediate, since no bootnodes are defined above
|
||||
|
||||
result = bootstrapNodes(nodecount - 1, @[bootnode.discovery.localNode.record], rng = rng)
|
||||
result.insert(bootNode, 0)
|
||||
result.insert((bootNode, privKey), 0)
|
||||
|
||||
# TODO: Remove this once we have removed all traces of nim-eth/keys
|
||||
func pkToPk(pk: keys.PrivateKey) : Option[crypto.PrivateKey] =
|
||||
let res = some(crypto.PrivateKey.init((secp.SkPrivateKey)(pk)))
|
||||
return res
|
||||
|
||||
|
||||
# suite "Providers Tests":
|
||||
suite "Providers Tests: node alone":
|
||||
var
|
||||
rng: ref HmacDrbgContext
|
||||
nodes: seq[ProvidersProtocol]
|
||||
nodes: seq[(ProvidersProtocol, keys.PrivateKey)]
|
||||
targetId: NodeId
|
||||
node0: ProvidersProtocol
|
||||
privKey_keys0: keys.PrivateKey
|
||||
privKey0: crypto.PrivateKey
|
||||
signedPeerRec0: SignedPeerRecord
|
||||
peerRec0: PeerRecord
|
||||
|
||||
setupAll:
|
||||
debug "RUNNING BEFORE TESTS"
|
||||
rng = keys.newRng()
|
||||
nodes = bootstrapNetwork(nodecount=1)
|
||||
targetId = toNodeId(keys.PrivateKey.random(rng[]).toPublicKey)
|
||||
(node0, privKey_keys0) = nodes[0]
|
||||
privKey0 = privKey_keys0.pkToPk.get
|
||||
signedPeerRec0 = privKey0.toSignedPeerRecord
|
||||
peerRec0 = signedPeerRec0.data
|
||||
|
||||
teardownAll:
|
||||
debug "RUNNING AFTER TESTS"
|
||||
for n in nodes:
|
||||
for (n, _) in nodes:
|
||||
await n.discovery.closeWait()
|
||||
await sleepAsync(chronos.seconds(3))
|
||||
|
||||
|
||||
test "Node in isolation should store":
|
||||
debug "---- ADDING PROVIDERS ---", nodes = nodes.len
|
||||
let addedTo = await nodes[0].addProvider(targetId, nodes[0].toPeerRecord)
|
||||
let addedTo = await node0.addProvider(targetId, signedPeerRec0)
|
||||
debug "Provider added to: ", addedTo
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (addedTo.len == 1)
|
||||
check (addedTo[0].id == nodes[0].discovery.localNode.id)
|
||||
check (nodes[0].getProvidersLocal(targetId)[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
check (addedTo[0].id == node0.discovery.localNode.id)
|
||||
check (node0.getProvidersLocal(targetId)[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "Node in isolation should retrieve":
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await nodes[0].getProviders(targetId)
|
||||
let providers = await node0.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (providers.len > 0 and providers[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
check (providers.len > 0 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "Should not retrieve bogus":
|
||||
|
||||
let bogusId = toNodeId(keys.PrivateKey.random(rng[]).toPublicKey)
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await nodes[0].getProviders(bogusId)
|
||||
let providers = await node0.getProviders(bogusId)
|
||||
debug "Providers:", providers
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
@ -130,39 +142,49 @@ suite "Providers Tests: two nodes":
|
||||
|
||||
var
|
||||
rng: ref HmacDrbgContext
|
||||
nodes: seq[ProvidersProtocol]
|
||||
nodes: seq[(ProvidersProtocol, keys.PrivateKey)]
|
||||
targetId: NodeId
|
||||
node0: ProvidersProtocol
|
||||
privKey_keys0: keys.PrivateKey
|
||||
privKey0: crypto.PrivateKey
|
||||
signedPeerRec0: SignedPeerRecord
|
||||
peerRec0: PeerRecord
|
||||
|
||||
setupAll:
|
||||
rng = keys.newRng()
|
||||
nodes = bootstrapNetwork(nodecount=2)
|
||||
targetId = toNodeId(keys.PrivateKey.random(rng[]).toPublicKey)
|
||||
(node0, privKey_keys0) = nodes[0]
|
||||
privKey0 = privKey_keys0.pkToPk.get
|
||||
signedPeerRec0 = privKey0.toSignedPeerRecord
|
||||
peerRec0 = signedPeerRec0.data
|
||||
|
||||
teardownAll:
|
||||
for n in nodes:
|
||||
for (n, _) in nodes:
|
||||
await n.discovery.closeWait()
|
||||
await sleepAsync(chronos.seconds(3))
|
||||
|
||||
test "2 nodes, store and retieve from same":
|
||||
test "2 nodes, store and retrieve from same":
|
||||
|
||||
debug "---- ADDING PROVIDERS ---"
|
||||
let addedTo = await nodes[0].addProvider(targetId, nodes[0].toPeerRecord)
|
||||
let addedTo = await node0.addProvider(targetId, signedPeerRec0)
|
||||
debug "Provider added to: ", addedTo
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await nodes[0].getProviders(targetId)
|
||||
let providers = await node0.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (providers.len == 1 and providers[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "2 nodes, retieve from other":
|
||||
test "2 nodes, retrieve from other":
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await nodes[1].getProviders(targetId)
|
||||
let (node1, _) = nodes[1]
|
||||
let providers = await node1.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (providers.len == 1 and providers[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
|
||||
|
||||
@ -170,40 +192,50 @@ suite "Providers Tests: 20 nodes":
|
||||
|
||||
var
|
||||
rng: ref HmacDrbgContext
|
||||
nodes: seq[ProvidersProtocol]
|
||||
nodes: seq[(ProvidersProtocol, keys.PrivateKey)]
|
||||
targetId: NodeId
|
||||
node0: ProvidersProtocol
|
||||
privKey_keys0: keys.PrivateKey
|
||||
privKey0: crypto.PrivateKey
|
||||
signedPeerRec0: SignedPeerRecord
|
||||
peerRec0: PeerRecord
|
||||
|
||||
setupAll:
|
||||
rng = keys.newRng()
|
||||
nodes = bootstrapNetwork(nodecount=20)
|
||||
targetId = toNodeId(keys.PrivateKey.random(rng[]).toPublicKey)
|
||||
(node0, privKey_keys0) = nodes[0]
|
||||
privKey0 = privKey_keys0.pkToPk.get
|
||||
signedPeerRec0 = privKey0.toSignedPeerRecord
|
||||
peerRec0 = signedPeerRec0.data
|
||||
|
||||
await sleepAsync(chronos.seconds(15))
|
||||
|
||||
teardownAll:
|
||||
for n in nodes: # if last test is enabled, we need nodes[1..^1] here
|
||||
for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here
|
||||
await n.discovery.closeWait()
|
||||
|
||||
test "20 nodes, store and retieve from same":
|
||||
test "20 nodes, store and retrieve from same":
|
||||
|
||||
debug "---- ADDING PROVIDERS ---"
|
||||
let addedTo = await nodes[0].addProvider(targetId, nodes[0].toPeerRecord)
|
||||
let addedTo = await node0.addProvider(targetId, signedPeerRec0)
|
||||
debug "Provider added to: ", addedTo
|
||||
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await nodes[0].getProviders(targetId)
|
||||
let providers = await node0.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (providers.len == 1 and providers[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
test "20 nodes, retieve from other":
|
||||
test "20 nodes, retrieve from other":
|
||||
debug "---- STARTING PROVIDERS LOOKUP ---"
|
||||
let providers = await nodes[^1].getProviders(targetId)
|
||||
let (node19, _) = nodes[^2]
|
||||
let providers = await node19.getProviders(targetId)
|
||||
debug "Providers:", providers
|
||||
|
||||
debug "---- STARTING CHECKS ---"
|
||||
check (providers.len == 1 and providers[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId)
|
||||
|
||||
# test "20 nodes, retieve after bootnode dies":
|
||||
# # TODO: currently this is not working even with a 2 minute timeout
|
||||
@ -216,5 +248,3 @@ suite "Providers Tests: 20 nodes":
|
||||
|
||||
# debug "---- STARTING CHECKS ---"
|
||||
# check (providers.len == 1 and providers[0].peerId == nodes[0].toPeerRecord.peerId)
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user