Merge remote-tracking branch 'origin/master' into node-wire-prover

This commit is contained in:
Mark Spanbroek 2024-02-28 16:36:55 +01:00
commit 5e9307204a
No known key found for this signature in database
GPG Key ID: FBE3E9548D427C00
9 changed files with 219 additions and 83 deletions

View File

@ -22,6 +22,10 @@ Instructions below correspond roughly to environmental setups in nim-codex's [CI
Other approaches may be viable. On macOS, some users may prefer [MacPorts](https://www.macports.org/) to [Homebrew](https://brew.sh/). On Windows, rather than use MSYS2, some users may prefer to install developer tools with [winget](https://docs.microsoft.com/en-us/windows/package-manager/winget/), [Scoop](https://scoop.sh/), or [Chocolatey](https://chocolatey.org/), or download installers for e.g. Make and CMake while otherwise relying on official Windows developer tools. Community contributions to these docs and our build system are welcome!
### Rust
The current implementation of Codex's zero-knowledge proving circuit requires the installation of rust v1.76.0 or greater. Be sure to install it for your OS and add it to your terminal's path such that the command `cargo --version` gives a compatible version.
### Linux
*Package manager commands may require `sudo` depending on OS setup.*
@ -69,7 +73,8 @@ Launch an MSYS2 [environment](https://www.msys2.org/docs/environments/). UCRT64
Assuming a UCRT64 environment, in Bash run
```text
$ pacman -S base-devel git unzip mingw-w64-ucrt-x86_64-toolchain mingw-w64-ucrt-x86_64-cmake
$ pacman -Suy
$ pacman -S base-devel git unzip mingw-w64-ucrt-x86_64-toolchain mingw-w64-ucrt-x86_64-cmake mingw-w64-ucrt-x86_64-rust
```
<!-- #### Headless Windows container -->
@ -101,7 +106,6 @@ File: `C:/Users/<username>/AppData/Roaming/Code/User/settings.json`
}
```
### Other
It is possible that nim-codex can be built and run on other platforms supported by the [Nim](https://nim-lang.org/) language: BSD family, older versions of Windows, etc. There has not been sufficient experimentation with nim-codex on such platforms, so instructions are not provided. Community contributions to these docs and our build system are welcome!

View File

@ -144,16 +144,6 @@ proc sendWantBlock(
@[address],
wantType = WantType.WantBlock) # we want this remote to send us a block
proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeerCtx]): ?BlockExcPeerCtx =
if cheapestPeers.len <= 0:
trace "No cheapest peers, selecting first in list"
let
peers = toSeq(b.peers) # Get any peer
if peers.len <= 0:
return none(BlockExcPeerCtx)
return some(peers[0])
return some(cheapestPeers[0]) # get cheapest
proc monitorBlockHandle(
b: BlockExcEngine,
handle: Future[Block],
@ -236,7 +226,7 @@ proc blockPresenceHandler*(
have = presence.have
price = presence.price
trace "Updating precense"
trace "Updating presence"
peerCtx.setPresence(presence)
let
@ -285,6 +275,20 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
break # do next peer
proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
## Tells neighboring peers that we're no longer interested in a block.
trace "Sending block request cancellations to peers", addrs = addrs.len
let failed = (await allFinished(
b.peers.mapIt(
b.network.request.sendWantCancellations(
peer = it.id,
addresses = addrs))))
.filterIt(it.failed)
if failed.len > 0:
trace "Failed to send block request cancellations to peers", peers = failed.len
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Resolving blocks", blocks = blocksDelivery.len
@ -295,6 +299,8 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy
cids.incl(bd.blk.cid)
if bd.address.leaf:
cids.incl(bd.address.treeCid)
await b.cancelBlocks(blocksDelivery.mapIt(it.address))
b.discovery.queueProvideBlocksReq(cids.toSeq)
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
@ -405,7 +411,7 @@ proc wantListHandler*(
for e in wantList.entries:
let
idx = peerCtx.peerWants.find(e)
idx = peerCtx.peerWants.findIt(it.address == e.address)
logScope:
peer = peerCtx.id

View File

@ -39,14 +39,6 @@ type
BlockPresenceHandler* = proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.}
PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.}
WantListSender* = proc(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.}
BlockExcHandlers* = object
onWantList*: WantListHandler
@ -55,6 +47,15 @@ type
onAccount*: AccountHandler
onPayment*: PaymentHandler
WantListSender* = proc(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.}
WantCancellationSender* = proc(peer: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.}
BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.}
PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.}
@ -62,6 +63,7 @@ type
BlockExcRequest* = object
sendWantList*: WantListSender
sendWantCancellations*: WantCancellationSender
sendBlocksDelivery*: BlocksDeliverySender
sendPresence*: PresenceSender
sendAccount*: AccountSender
@ -139,6 +141,17 @@ proc sendWantList*(
b.send(id, Message(wantlist: msg))
proc sendWantCancellations*(
b: BlockExcNetwork,
id: PeerId,
addresses: seq[BlockAddress]): Future[void] {.async.} =
## Informs a remote peer that we're no longer interested in a set of blocks
##
trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id
await b.sendWantList(id = id, addresses = addresses, cancel = true)
proc handleBlocksDelivery(
b: BlockExcNetwork,
peer: NetworkPeer,
@ -340,6 +353,9 @@ proc new*(
id, cids, priority, cancel,
wantType, full, sendDontHave)
proc sendWantCancellations(id: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.} =
self.sendWantCancellations(id, addresses)
proc sendBlocksDelivery(id: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} =
self.sendBlocksDelivery(id, blocksDelivery)
@ -354,6 +370,7 @@ proc new*(
self.request = BlockExcRequest(
sendWantList: sendWantList,
sendWantCancellations: sendWantCancellations,
sendBlocksDelivery: sendBlocksDelivery,
sendPresence: sendPresence,
sendAccount: sendAccount,

View File

@ -8,6 +8,7 @@
## those terms.
##
import std/enumerate
import std/parseutils
import std/options
@ -17,7 +18,7 @@ import ./utils/asyncheapqueue
import ./utils/fileutils
import ./utils/asynciter
export asyncheapqueue, fileutils, asynciter
export asyncheapqueue, fileutils, asynciter, chronos
func divUp*[T: SomeInteger](a, b : T): T =
@ -35,6 +36,24 @@ proc orElse*[A](a, b: Option[A]): Option[A] =
else:
b
template findIt*(s, pred: untyped): untyped =
## Returns the index of the first object matching a predicate, or -1 if no
## object matches it.
runnableExamples:
type MyType = object
att: int
var s = @[MyType(att: 1), MyType(att: 2), MyType(att: 3)]
doAssert s.findIt(it.att == 2) == 1
doAssert s.findIt(it.att == 4) == -1
var index = -1
for i, it {.inject.} in enumerate(items(s)):
if pred:
index = i
break
index
when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine
const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'}
@ -75,18 +94,3 @@ when not declared(parseDuration): # Odd code formatting to minimize diff v. main
result = start #..is no unit to the end of `s`.
var sizeF = number * scale + 0.5 # Saturate to int64.high when too big
size = seconds(int(sizeF))
when isMainModule:
import unittest2
suite "time parse":
test "parseDuration":
var res: Duration # caller must still know if 'b' refers to bytes|bits
check parseDuration("10Hr", res) == 3
check res == hours(10)
check parseDuration("64min", res) == 3
check res == minutes(64)
check parseDuration("7m/block", res) == 2 # '/' stops parse
check res == minutes(7) # 1 shl 30, forced binary metric
check parseDuration("3d", res) == 2 # '/' stops parse
check res == days(3) # 1 shl 30, forced binary metric

View File

@ -16,10 +16,6 @@ import ../../examples
import ../../helpers
asyncchecksuite "NetworkStore engine - 2 nodes":
let
chunker1 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb)
chunker2 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb)
var
nodeCmps1, nodeCmps2: NodesComponents
peerCtx1, peerCtx2: BlockExcPeerCtx
@ -28,20 +24,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]]
setup:
while true:
let chunk = await chunker1.getBytes()
if chunk.len <= 0:
break
blocks1.add(bt.Block.new(chunk).tryGet())
while true:
let chunk = await chunker2.getBytes()
if chunk.len <= 0:
break
blocks2.add(bt.Block.new(chunk).tryGet())
blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
blocks2 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
nodeCmps1 = generateNodes(1, blocks1)[0]
nodeCmps2 = generateNodes(1, blocks2)[0]
@ -180,42 +164,30 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
check eventually wallet.balance(channel, Asset) > 0
asyncchecksuite "NetworkStore - multiple nodes":
let
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256'nb)
var
switch: seq[Switch]
networkStore: seq[NetworkStore]
nodes: seq[NodesComponents]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
for e in generateNodes(5):
switch.add(e.switch)
networkStore.add(e.networkStore)
blocks = await makeRandomBlocks(datasetSize = 4096, blockSize = 256'nb)
nodes = generateNodes(5)
for e in nodes:
await e.engine.start()
await allFuturesThrowing(
switch.mapIt( it.start() )
nodes.mapIt( it.switch.start() )
)
teardown:
await allFuturesThrowing(
switch.mapIt( it.stop() )
nodes.mapIt( it.switch.stop() )
)
switch = @[]
networkStore = @[]
nodes = @[]
test "Should receive blocks for own want list":
let
downloader = networkStore[4]
downloader = nodes[4].networkStore
engine = downloader.engine
# Add blocks from 1st peer to want list
@ -233,9 +205,9 @@ asyncchecksuite "NetworkStore - multiple nodes":
)
for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet()
(await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(switch)
await connectNodes(nodes)
await sleepAsync(1.seconds)
await allFuturesThrowing(
@ -251,7 +223,7 @@ asyncchecksuite "NetworkStore - multiple nodes":
test "Should exchange blocks with multiple nodes":
let
downloader = networkStore[4]
downloader = nodes[4].networkStore
engine = downloader.engine
# Add blocks from 1st peer to want list
@ -264,9 +236,9 @@ asyncchecksuite "NetworkStore - multiple nodes":
)
for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet()
(await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(switch)
await connectNodes(nodes)
await sleepAsync(1.seconds)
await allFuturesThrowing(
@ -275,3 +247,47 @@ asyncchecksuite "NetworkStore - multiple nodes":
check pendingBlocks1.mapIt( it.read ) == blocks[0..3]
check pendingBlocks2.mapIt( it.read ) == blocks[12..15]
test "Should actively cancel want-haves if block received from elsewhere":
let
# Peer wanting to download blocks
downloader = nodes[4]
# Bystander peer - gets block request but can't satisfy them
bystander = nodes[3]
# Holder of actual blocks
blockHolder = nodes[1]
let aBlock = blocks[0]
(await blockHolder.engine.localStore.putBlock(aBlock)).tryGet()
await connectNodes(@[downloader, bystander])
# Downloader asks for block...
let blockRequest = downloader.engine.requestBlock(aBlock.cid)
# ... and bystander learns that downloader wants it, but can't provide it.
check eventually(
bystander
.engine
.peers
.get(downloader.switch.peerInfo.peerId)
.peerWants
.filterIt( it.address == aBlock.address )
.len == 1
)
# As soon as we connect the downloader to the blockHolder, the block should
# propagate to the downloader...
await connectNodes(@[downloader, blockHolder])
check (await blockRequest).cid == aBlock.cid
check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet()
# ... and the bystander should have cancelled the want-have
check eventually(
bystander
.engine
.peers
.get(downloader.switch.peerInfo.peerId)
.peerWants
.filterIt( it.address == aBlock.address )
.len == 0
)

View File

@ -142,6 +142,11 @@ asyncchecksuite "NetworkStore engine handlers":
localStore: BlockStore
blocks: seq[Block]
const NopSendWantCancellationsProc = proc(
id: PeerId,
addresses: seq[BlockAddress]
) {.gcsafe, async.} = discard
setup:
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
@ -275,6 +280,10 @@ asyncchecksuite "NetworkStore engine handlers":
let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
# Install NOP for want list cancellations so they don't cause a crash
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
await engine.blocksDeliveryHandler(peerId, blocksDelivery)
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
@ -306,10 +315,14 @@ asyncchecksuite "NetworkStore engine handlers":
check receiver == peerId
check balances[account.address.toDestination] == amount
done.complete()
done.complete(),
# Install NOP for want list cancellations so they don't cause a crash
sendWantCancellations: NopSendWantCancellationsProc
))
await engine.blocksDeliveryHandler(peerId, blocks.mapIt(BlockDelivery(blk: it, address: it.address)))
await engine.blocksDeliveryHandler(peerId, blocks.mapIt(
BlockDelivery(blk: it, address: it.address)))
await done.wait(100.millis)
test "Should handle block presence":
@ -352,6 +365,30 @@ asyncchecksuite "NetworkStore engine handlers":
check a in peerCtx.peerHave
check peerCtx.blocks[a].price == price
test "Should send cancellations for received blocks":
let
pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid))
blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
cancellations = newTable(
blocks.mapIt((it.address, newFuture[void]())).toSeq
)
proc sendWantCancellations(
id: PeerId,
addresses: seq[BlockAddress]
) {.gcsafe, async.} =
for address in addresses:
cancellations[address].complete()
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendWantCancellations: sendWantCancellations
))
await engine.blocksDeliveryHandler(peerId, blocksDelivery)
discard await allFinished(pending)
await allFuturesThrowing(cancellations.values().toSeq)
asyncchecksuite "Task Handler":
var
rng: Rng

View File

@ -106,6 +106,19 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest
return manifest
proc makeRandomBlocks*(
datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} =
var chunker = RandomChunker.new(Rng.instance(), size = datasetSize,
chunkSize = blockSize)
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
result.add(Block.new(chunk).tryGet())
proc corruptBlocks*(
store: BlockStore,
manifest: Manifest,

View File

@ -62,3 +62,6 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} =
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo.peerId, node.peerInfo.addrs)
proc connectNodes*(nodes: seq[NodesComponents]) {.async.} =
await connectNodes(nodes.mapIt( it.switch ))

View File

@ -0,0 +1,36 @@
import std/unittest
import pkg/codex/utils
suite "findIt":
setup:
type AnObject = object
attribute1*: int
var objList = @[
AnObject(attribute1: 1),
AnObject(attribute1: 3),
AnObject(attribute1: 5),
AnObject(attribute1: 3),
]
test "should retur index of first object matching predicate":
assert objList.findIt(it.attribute1 == 3) == 1
test "should return -1 when no object matches predicate":
assert objList.findIt(it.attribute1 == 15) == -1
suite "parseDuration":
test "should parse durations":
var res: Duration # caller must still know if 'b' refers to bytes|bits
check parseDuration("10Hr", res) == 3
check res == hours(10)
check parseDuration("64min", res) == 3
check res == minutes(64)
check parseDuration("7m/block", res) == 2 # '/' stops parse
check res == minutes(7) # 1 shl 30, forced binary metric
check parseDuration("3d", res) == 2 # '/' stops parse
check res == days(3) # 1 shl 30, forced binary metric