367 lines
12 KiB
Nim
367 lines
12 KiB
Nim
# Fluffy
|
|
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.used.}
|
|
|
|
import
|
|
std/[algorithm, sequtils],
|
|
chronos,
|
|
testutils/unittests,
|
|
results,
|
|
eth/keys,
|
|
eth/p2p/discoveryv5/routing_table,
|
|
nimcrypto/[hash, sha2],
|
|
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
|
../network/wire/[portal_protocol, portal_stream, portal_protocol_config],
|
|
../database/content_db,
|
|
./test_helpers
|
|
|
|
const protocolId = [byte 0x50, 0x00]
|
|
|
|
proc toContentId(contentKey: ByteList): results.Opt[ContentId] =
|
|
# Note: Returning sha256 digest as content id here. This content key to
|
|
# content id derivation is different for the different content networks
|
|
# and their content types.
|
|
let idHash = sha256.digest(contentKey.asSeq())
|
|
ok(readUintBE[256](idHash.data))
|
|
|
|
proc initPortalProtocol(
|
|
rng: ref HmacDrbgContext,
|
|
privKey: PrivateKey,
|
|
address: Address,
|
|
bootstrapRecords: openArray[Record] = [],
|
|
): PortalProtocol =
|
|
let
|
|
d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
|
|
db = ContentDB.new("", uint32.high, inMemory = true)
|
|
manager = StreamManager.new(d)
|
|
q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
|
|
stream = manager.registerNewStream(q)
|
|
|
|
proto = PortalProtocol.new(
|
|
d,
|
|
protocolId,
|
|
toContentId,
|
|
createGetHandler(db),
|
|
stream,
|
|
bootstrapRecords = bootstrapRecords,
|
|
)
|
|
|
|
proto.dbPut = createStoreHandler(db, defaultRadiusConfig, proto)
|
|
|
|
return proto
|
|
|
|
proc stopPortalProtocol(proto: PortalProtocol) {.async.} =
|
|
proto.stop()
|
|
await proto.baseProtocol.closeWait()
|
|
|
|
proc defaultTestSetup(rng: ref HmacDrbgContext): (PortalProtocol, PortalProtocol) =
|
|
let
|
|
proto1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
|
proto2 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20303))
|
|
|
|
(proto1, proto2)
|
|
|
|
procSuite "Portal Wire Protocol Tests":
|
|
let rng = newRng()
|
|
|
|
asyncTest "Ping/Pong":
|
|
let (proto1, proto2) = defaultTestSetup(rng)
|
|
|
|
let pong = await proto1.ping(proto2.localNode)
|
|
|
|
let customPayload = ByteList(SSZ.encode(CustomPayload(dataRadius: UInt256.high())))
|
|
|
|
check:
|
|
pong.isOk()
|
|
pong.get().enrSeq == 1'u64
|
|
pong.get().customPayload == customPayload
|
|
|
|
await proto1.stopPortalProtocol()
|
|
await proto2.stopPortalProtocol()
|
|
|
|
asyncTest "FindNodes/Nodes":
|
|
let (proto1, proto2) = defaultTestSetup(rng)
|
|
|
|
block: # Find itself
|
|
let nodes =
|
|
await proto1.findNodesImpl(proto2.localNode, List[uint16, 256](@[0'u16]))
|
|
|
|
check:
|
|
nodes.isOk()
|
|
nodes.get().total == 1'u8
|
|
nodes.get().enrs.len() == 1
|
|
|
|
block: # Find nothing: this should result in nothing as we haven't started
|
|
# the seeding of the portal protocol routing table yet.
|
|
let nodes = await proto1.findNodesImpl(proto2.localNode, List[uint16, 256](@[]))
|
|
|
|
check:
|
|
nodes.isOk()
|
|
nodes.get().total == 1'u8
|
|
nodes.get().enrs.len() == 0
|
|
|
|
block: # Find for distance
|
|
# ping in one direction to add, ping in the other to update as seen,
|
|
# adding the node in the discovery v5 routing table. Could also launch
|
|
# with bootstrap node instead.
|
|
check (await proto1.baseProtocol.ping(proto2.localNode)).isOk()
|
|
check (await proto2.baseProtocol.ping(proto1.localNode)).isOk()
|
|
|
|
# Start the portal protocol to seed nodes from the discoveryv5 routing
|
|
# table.
|
|
proto2.start()
|
|
|
|
let distance = logDistance(proto1.localNode.id, proto2.localNode.id)
|
|
let nodes =
|
|
await proto1.findNodesImpl(proto2.localNode, List[uint16, 256](@[distance]))
|
|
|
|
check:
|
|
nodes.isOk()
|
|
nodes.get().total == 1'u8
|
|
nodes.get().enrs.len() == 1
|
|
|
|
await proto1.stopPortalProtocol()
|
|
await proto2.stopPortalProtocol()
|
|
|
|
asyncTest "FindContent/Content - send enrs":
|
|
let (proto1, proto2) = defaultTestSetup(rng)
|
|
|
|
# ping in one direction to add, ping in the other to update as seen.
|
|
check (await proto1.baseProtocol.ping(proto2.localNode)).isOk()
|
|
check (await proto2.baseProtocol.ping(proto1.localNode)).isOk()
|
|
|
|
let contentKey = ByteList.init(@[1'u8])
|
|
|
|
# content does not exist so this should provide us with the closest nodes
|
|
# to the content, which is the only node in the routing table.
|
|
let content = await proto1.findContentImpl(proto2.localNode, contentKey)
|
|
|
|
check:
|
|
content.isOk()
|
|
content.get().enrs.len() == 1
|
|
|
|
await proto1.stopPortalProtocol()
|
|
await proto2.stopPortalProtocol()
|
|
|
|
asyncTest "Offer/Accept":
|
|
let (proto1, proto2) = defaultTestSetup(rng)
|
|
let contentKeys = ContentKeysList(@[ByteList(@[byte 0x01, 0x02, 0x03])])
|
|
|
|
let accept = await proto1.offerImpl(proto2.baseProtocol.localNode, contentKeys)
|
|
|
|
check:
|
|
accept.isOk()
|
|
accept.get().connectionId.len == 2
|
|
accept.get().contentKeys.len == contentKeys.len
|
|
|
|
await proto1.stopPortalProtocol()
|
|
await proto2.stopPortalProtocol()
|
|
|
|
asyncTest "Offer/Accept/Stream":
|
|
let (proto1, proto2) = defaultTestSetup(rng)
|
|
var content: seq[ContentKV]
|
|
for i in 0 ..< contentKeysLimit:
|
|
let contentKV =
|
|
ContentKV(contentKey: ByteList(@[byte i]), content: repeat(byte i, 5000))
|
|
content.add(contentKV)
|
|
|
|
let res = await proto1.offer(proto2.baseProtocol.localNode, content)
|
|
|
|
check res.isOk()
|
|
|
|
let (srcNodeId, contentKeys, contentItems) =
|
|
await proto2.stream.contentQueue.popFirst()
|
|
|
|
check contentItems.len() == content.len()
|
|
|
|
for i, contentItem in contentItems:
|
|
let contentKV = content[i]
|
|
check:
|
|
contentItem == contentKV.content
|
|
contentKeys[i] == contentKV.contentKey
|
|
|
|
await proto1.stopPortalProtocol()
|
|
await proto2.stopPortalProtocol()
|
|
|
|
asyncTest "Correctly mark node as seen after request":
|
|
let (proto1, proto2) = defaultTestSetup(rng)
|
|
|
|
let initialNeighbours = proto1.neighbours(proto1.localNode.id, seenOnly = false)
|
|
|
|
check:
|
|
len(initialNeighbours) == 0
|
|
|
|
discard proto1.addNode(proto2.localNode)
|
|
|
|
let allNeighboursAfterAdd = proto1.neighbours(proto1.localNode.id, seenOnly = false)
|
|
let seenNeighboursAfterAdd = proto1.neighbours(proto1.localNode.id, seenOnly = true)
|
|
|
|
check:
|
|
len(allNeighboursAfterAdd) == 1
|
|
len(seenNeighboursAfterAdd) == 0
|
|
|
|
let pong = await proto1.ping(proto2.localNode)
|
|
|
|
let allNeighboursAfterPing =
|
|
proto1.neighbours(proto1.localNode.id, seenOnly = false)
|
|
let seenNeighboursAfterPing =
|
|
proto1.neighbours(proto1.localNode.id, seenOnly = true)
|
|
|
|
check:
|
|
pong.isOk()
|
|
len(allNeighboursAfterPing) == 1
|
|
len(seenNeighboursAfterPing) == 1
|
|
|
|
await proto1.stopPortalProtocol()
|
|
await proto2.stopPortalProtocol()
|
|
|
|
asyncTest "Lookup nodes":
|
|
let
|
|
node1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
|
node2 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20303))
|
|
node3 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20304))
|
|
|
|
# Make node1 know about node2, and node2 about node3
|
|
# node1 will then do a lookup for node3
|
|
check node1.addNode(node2.localNode) == Added
|
|
check node2.addNode(node3.localNode) == Added
|
|
|
|
check (await node2.ping(node3.localNode)).isOk()
|
|
|
|
let lookupResult = await node1.lookup(node3.localNode.id)
|
|
|
|
check:
|
|
# Result should contain node3 as it is in the routing table of node2
|
|
lookupResult.contains(node3.localNode)
|
|
|
|
await node1.stopPortalProtocol()
|
|
await node2.stopPortalProtocol()
|
|
await node3.stopPortalProtocol()
|
|
|
|
asyncTest "Lookup content - nodes interested":
|
|
let
|
|
node1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
|
node2 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20303))
|
|
node3 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20304))
|
|
|
|
content = @[byte 1, 2]
|
|
contentList = List[byte, 2048].init(content)
|
|
contentId = readUintBE[256](sha256.digest(content).data)
|
|
|
|
# Store the content on node3
|
|
node3.storeContent(contentList, contentId, content)
|
|
|
|
# Make node1 know about node2, and node2 about node3
|
|
check node1.addNode(node2.localNode) == Added
|
|
check node2.addNode(node3.localNode) == Added
|
|
|
|
# node1 needs to know the radius of the nodes to determine if they are
|
|
# interested in content, so a ping is done.
|
|
check (await node1.ping(node2.localNode)).isOk()
|
|
check (await node2.ping(node3.localNode)).isOk()
|
|
|
|
let lookupResult = await node1.contentLookup(contentList, contentId)
|
|
|
|
check:
|
|
lookupResult.isSome()
|
|
|
|
let res = lookupResult.unsafeGet()
|
|
|
|
check:
|
|
res.content == content
|
|
res.nodesInterestedInContent.contains(node2.localNode)
|
|
|
|
await node1.stopPortalProtocol()
|
|
await node2.stopPortalProtocol()
|
|
await node3.stopPortalProtocol()
|
|
|
|
asyncTest "Valid Bootstrap Node":
|
|
let
|
|
node1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
|
node2 = initPortalProtocol(
|
|
rng,
|
|
PrivateKey.random(rng[]),
|
|
localAddress(20303),
|
|
bootstrapRecords = [node1.localNode.record],
|
|
)
|
|
|
|
node1.start()
|
|
node2.start()
|
|
|
|
check node2.neighbours(node2.localNode.id).len == 1
|
|
|
|
await node1.stopPortalProtocol()
|
|
await node2.stopPortalProtocol()
|
|
|
|
asyncTest "Invalid Bootstrap Node":
|
|
let
|
|
node1 = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(20302))
|
|
node2 = initPortalProtocol(
|
|
rng,
|
|
PrivateKey.random(rng[]),
|
|
localAddress(20303),
|
|
bootstrapRecords = [node1.localNode.record],
|
|
)
|
|
|
|
# seedTable to add node1 to the routing table
|
|
node2.seedTable()
|
|
check node2.neighbours(node2.localNode.id).len == 1
|
|
|
|
# This should fail and drop node1 from the routing table
|
|
await node2.revalidateNode(node1.localNode)
|
|
|
|
check node2.neighbours(node2.localNode.id).len == 0
|
|
|
|
await node1.closeWait()
|
|
await node2.stopPortalProtocol()
|
|
|
|
asyncTest "Adjusting radius after hitting full database":
|
|
# TODO: This test is extremely breakable when changing
|
|
# `contentDeletionFraction` and/or the used test values.
|
|
# Need to rework either this test, or the pruning mechanism, or probably
|
|
# both.
|
|
let
|
|
node1 = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(20303))
|
|
|
|
dbLimit = 400_000'u32
|
|
db = ContentDB.new("", dbLimit, inMemory = true)
|
|
m = StreamManager.new(node1)
|
|
q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
|
|
stream = m.registerNewStream(q)
|
|
|
|
proto1 =
|
|
PortalProtocol.new(node1, protocolId, toContentId, createGetHandler(db), stream)
|
|
|
|
proto1.dbPut = createStoreHandler(db, defaultRadiusConfig, proto1)
|
|
|
|
let item = genByteSeq(10_000)
|
|
var distances: seq[UInt256] = @[]
|
|
|
|
for i in 0 ..< 40:
|
|
proto1.storeContent(ByteList.init(@[uint8(i)]), u256(i), item)
|
|
distances.add(u256(i) xor proto1.localNode.id)
|
|
|
|
distances.sort(order = SortOrder.Descending)
|
|
|
|
# With the selected db limit of 100_000 bytes and added elements of 10_000
|
|
# bytes each, the two furthest elements should be prined, i.e index 0 and 1.
|
|
# Index 2 should be still be in database and its distance should be <=
|
|
# updated radius
|
|
check:
|
|
db.get((distances[0] xor proto1.localNode.id)).isNone()
|
|
db.get((distances[1] xor proto1.localNode.id)).isNone()
|
|
db.get((distances[2] xor proto1.localNode.id)).isNone()
|
|
db.get((distances[3] xor proto1.localNode.id)).isSome()
|
|
# The radius has been updated and is lower than the maximum start value.
|
|
proto1.dataRadius < UInt256.high
|
|
# Yet higher than or equal to the furthest non deleted element.
|
|
proto1.dataRadius >= distances[3]
|
|
|
|
proto1.stop()
|
|
await node1.closeWait()
|