Node setup (#32)

* settup basic nim node

* adding http utils

* adding confutils

* rough rest api proto

* adding missing deps

* turn tls emulation off

* adding toml serialization

* wip

* adding missing deps

* make sure to clean old state in teardown

* adding file upload rest endpoint

* renaming blockexchange to networkstore

* updating nim-presto

* updating libp2p

* wip adding streaming upload

* reworked chunking

* bump to latest unstable

* adding asyncfutures stream

* make streamable

* deleting unused files

* reworking stores api

* use new stores api

* rework blockset and remove blockstream

* don't return option from constructor

* rework chunker

* wip implement upload

* fix tests

* move unrelated logic to engine

* don't print entire message

* logging

* basic encode/decode to/from dag-pb

* add basic upload/download support

* fix tests

* renaming blockset to manifest

* don't pass config to node

* remove config and use new manifest

* wip: make endpoints more reliable

* wip: adding node tests

* include correct manifest test

* removing asyncfutures

* proper chunking of files

* simplify stream reading

* test with encoding/decoding with many blocks

* add block storing tests

* adding retrieval test

* add logging

* tidy up chunker

* tidy up manifest and node

* use default chunk size

* fix tests

* fix tests

* make sure Eof is set properly

* wip

* minor cleanup

* add file utils

* cleanup config

* splitout DaggerServer and "main"

* remove events since they are not used

* add broadcast method to network peer

* add and wire localstore

* use localstore in the node

* wip

* logging

* move file utils

* use the constant

* updating deps

* fix memstore

* use latest libp2p unstable

* fix tests

* rework block streaming

* don't fail storing if the block already exists

* add helper info endpoint

* correct comment

* rename localstore to fsstore

* fix tests

* remove unused tests

* add test to retrieve one block

* move some test files around

* consolidate setup

* Update dagger/blockexchange/engine.nim

Co-authored-by: Tanguy <tanguy@status.im>

* typo

* better block path handling

* don't inherit rootobj

* remove useless template

* Update tests/dagger/blockexc/testblockexc.nim

Co-authored-by: markspanbroek <mark@spanbroek.net>

* use isMainModule

* use proper flag for starter/stoped

* cleanup optional use

* wrap in isMainModule

* use `cancelAndAwait`

* remove unused imports

* wip

* don't use optional

* use functional error api

* rework store tests and add fs tests

* Block.new() to Block.init()

* don't use optional for engine blocks

* use result instead of optional for getBlock

* remove unused imports

* move stopping servers to `shutdown`

* use result instead of optional

* rework with results

* fix tests

* use waitFor in signal handlers

* error helper

* use `?` and mapFailure where possible

* remove unnecesary `=?`

* improve empty cid digest initialization

Co-authored-by: Tanguy <tanguy@status.im>
Co-authored-by: markspanbroek <mark@spanbroek.net>
This commit is contained in:
Dmitriy Ryajov 2022-01-10 09:32:56 -06:00 committed by GitHub
parent a7628e204a
commit fbe161a073
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 2152 additions and 917 deletions

37
.gitmodules vendored
View File

@ -118,3 +118,40 @@
url = https://github.com/status-im/stint.git
ignore = untracked
branch = master
[submodule "vendor/nim-httputils"]
ignore = untracked
branch = master
[submodule "vendor/nim-http-utils"]
path = vendor/nim-http-utils
url = https://github.com/status-im/nim-http-utils.git
ignore = untracked
branch = master
[submodule "vendor/nim-toml-serialization"]
path = vendor/nim-toml-serialization
url = https://github.com/status-im/nim-toml-serialization.git
ignore = untracked
branch = master
[submodule "vendor/unittest2"]
ignore = untracked
branch = master
[submodule "vendor/nim-unittest2"]
path = vendor/nim-unittest2
url = https://github.com/status-im/nim-unittest2.git
ignore = untracked
branch = master
[submodule "vendor/nameresolver"]
ignore = untracked
branch = master
[submodule "vendor/nim-nameresolver"]
ignore = untracked
branch = master
[submodule "vendor/dnsclient.nim"]
path = vendor/dnsclient.nim
url = https://github.com/ba0f3/dnsclient.nim.git
ignore = untracked
branch = master
[submodule "vendor/nim-websock"]
path = vendor/nim-websock
url = https://github.com/status-im/nim-websock.git
ignore = untracked
branch = master

View File

@ -32,6 +32,7 @@ else:
# ("-fno-asynchronous-unwind-tables" breaks Nim's exception raising, sometimes)
switch("passC", "-mno-avx512vl")
--tlsEmulation:off
--threads:on
--opt:speed
--excessiveStackTrace:on

View File

@ -0,0 +1,77 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronicles
import pkg/chronos
import pkg/confutils
import pkg/libp2p
import ./dagger/conf
import ./dagger/dagger
export dagger, conf, libp2p, chronos, chronicles
when isMainModule:
import std/os
import pkg/confutils/defs
import ./dagger/utils/fileutils
when defined(posix):
import system/ansi_c
let
config = DaggerConf.load()
case config.cmd:
of StartUpCommand.noCommand:
if not(checkAndCreateDataDir((config.dataDir).string)):
# We are unable to access/create data folder or data folder's
# permissions are insecure.
quit QuitFailure
trace "Data dir initialized", dir = config.dataDir
if not(checkAndCreateDataDir((config.dataDir / "repo").string)):
# We are unable to access/create data folder or data folder's
# permissions are insecure.
quit QuitFailure
trace "Repo dir initialized", dir = config.dataDir / "repo"
let server = DaggerServer.new(config)
## Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
try:
setupForeignThreadGc()
except Exception as exc: raiseAssert exc.msg # shouldn't happen
notice "Shutting down after having received SIGINT"
waitFor server.shutdown()
try:
setControlCHook(controlCHandler)
except Exception as exc: # TODO Exception
warn "Cannot set ctrl-c handler", msg = exc.msg
# equivalent SIGTERM handler
when defined(posix):
proc SIGTERMHandler(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM"
waitFor server.shutdown()
c_signal(SIGTERM, SIGTERMHandler)
waitFor server.run()
of StartUpCommand.initNode:
discard

View File

@ -32,8 +32,11 @@ logScope:
topics = "dagger blockexc engine"
const
DefaultTimeout* = 5.seconds
DefaultBlockTimeout* = 5.minutes
DefaultMaxPeersPerRequest* = 10
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
DefaultMaxRetries = 3
type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
@ -41,12 +44,16 @@ type
BlockExcEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance
network*: BlockExcNetwork # network interface
peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with
wantList*: seq[Cid] # local wants list
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
blockexcTasks: seq[Future[void]] # future to control blockexc task
blockexcRunning: bool # indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved
peersPerRequest: int # max number of peers to request from
scheduleTask*: TaskScheduler # schedule a new task with the task runner
request*: BlockExcRequest # block exchange network requests
wallet*: WalletRef # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing
@ -68,28 +75,58 @@ proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx =
if peer.len > 0:
return peer[0]
proc requestBlocks*(
# attach task scheduler to engine
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
##
trace "blockexc start"
if b.blockexcRunning:
warn "Starting blockexc twice"
return
b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
##
trace "NetworkStore stop"
if not b.blockexcRunning:
warn "Stopping blockexc without starting it"
return
b.blockexcRunning = false
for t in b.blockexcTasks:
if not t.finished:
trace "Awaiting task to stop"
await t.cancelAndWait()
trace "Task stopped"
trace "NetworkStore stopped"
proc requestBlock*(
b: BlockExcEngine,
cids: seq[Cid],
timeout = DefaultTimeout): seq[Future[bt.Block]] =
cid: Cid,
timeout = DefaultBlockTimeout): Future[bt.Block] =
## Request a block from remotes
##
# no Cids to request
if cids.len == 0:
return
let
blk = b.pendingBlocks.addOrAwait(cid).wait(timeout)
if b.peers.len <= 0:
warn "No peers to request blocks from"
# TODO: run discovery here to get peers for the block
return
var blocks: seq[Future[bt.Block]]
for c in cids:
if c notin b.pendingBlocks:
# install events to await blocks incoming from different sources
blocks.add(
b.pendingBlocks.addOrAwait(c).wait(timeout))
return blk
var peers = b.peers
@ -97,11 +134,7 @@ proc requestBlocks*(
# matching cid
var blockPeer: BlockExcPeerCtx
for i, p in peers:
let has = cids.anyIt(
it in p.peerHave
)
if has:
if cid in p.peerHave:
blockPeer = p
break
@ -114,36 +147,33 @@ proc requestBlocks*(
it != blockPeer
)
trace "Requesting blocks from peer", peer = blockPeer.id, len = cids.len
trace "Requesting block from peer", peer = blockPeer.id, cid
# request block
b.request.sendWantList(
b.network.request.sendWantList(
blockPeer.id,
cids,
@[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block
if peers.len == 0:
return blocks # no peers to send wants to
template sendWants(ctx: BlockExcPeerCtx) =
# just send wants
b.request.sendWantList(
ctx.id,
cids.filterIt( it notin ctx.peerHave ), # filter out those that we already know about
wantType = WantType.wantHave) # we only want to know if the peer has the block
return blk # no peers to send wants to
# filter out the peer we've already requested from
var stop = peers.high
if stop > b.peersPerRequest: stop = b.peersPerRequest
let stop = min(peers.high, b.peersPerRequest)
trace "Sending want list requests to remaining peers", count = stop + 1
for p in peers[0..stop]:
sendWants(p)
if cid notin p.peerHave:
# just send wants
b.network.request.sendWantList(
p.id,
@[cid],
wantType = WantType.wantHave) # we only want to know if the peer has the block
return blocks
return blk
proc blockPresenceHandler*(
b: BlockExcEngine,
peer: PeerID,
blocks: seq[BlockPresence]) =
blocks: seq[BlockPresence]) {.async.} =
## Handle block presence
##
@ -181,7 +211,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx,
blocks: seq[bt.Block]) =
let sendPayment = engine.request.sendPayment
let sendPayment = engine.network.request.sendPayment
if sendPayment.isNil:
return
@ -192,14 +222,17 @@ proc payForBlocks(engine: BlockExcEngine,
proc blocksHandler*(
b: BlockExcEngine,
peer: PeerID,
blocks: seq[bt.Block]) =
blocks: seq[bt.Block]) {.async.} =
## handle incoming blocks
##
trace "Got blocks from peer", peer, len = blocks.len
b.localStore.putBlocks(blocks)
b.resolveBlocks(blocks)
for blk in blocks:
if not (await b.localStore.putBlock(blk)):
trace "Unable to store block", cid = blk.cid
continue
b.resolveBlocks(blocks)
let peerCtx = b.getPeerCtx(peer)
if peerCtx != nil:
b.payForBlocks(peerCtx, blocks)
@ -207,7 +240,7 @@ proc blocksHandler*(
proc wantListHandler*(
b: BlockExcEngine,
peer: PeerID,
wantList: WantList) =
wantList: WantList) {.async.} =
## Handle incoming want lists
##
@ -234,12 +267,12 @@ proc wantListHandler*(
# peer might want to ask for the same cid with
# different want params
if e.sendDontHave and not(b.localStore.hasBlock(e.cid)):
if e.sendDontHave and e.cid notin b.localStore:
dontHaves.add(e.cid)
# send don't have's to remote
if dontHaves.len > 0:
b.request.sendPresence(
b.network.request.sendPresence(
peer,
dontHaves.mapIt(
BlockPresence(
@ -249,14 +282,14 @@ proc wantListHandler*(
if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer
proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) =
proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} =
let context = engine.getPeerCtx(peer)
if context.isNil:
return
context.account = account.some
proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) =
proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} =
without context =? engine.getPeerCtx(peer).option and
account =? context.account:
return
@ -280,10 +313,10 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
# broadcast our want list, the other peer will do the same
if b.wantList.len > 0:
b.request.sendWantList(peer, b.wantList, full = true)
b.network.request.sendWantList(peer, b.wantList, full = true)
if address =? b.pricing.?address:
b.request.sendAccount(peer, Account(address: address))
b.network.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
## Cleanup disconnected peer
@ -306,13 +339,18 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
# TODO: There should be all sorts of accounting of
# bytes sent/received here
if wantsBlocks.len > 0:
let blocks = await b.localStore.getBlocks(
wantsBlocks.mapIt(
it.cid
let blockFuts = await allFinished(wantsBlocks.mapIt(
b.localStore.getBlock(it.cid)
))
let blocks = blockFuts
.filterIt((not it.failed) and it.read.isOk)
.mapIt(!it.read)
if blocks.len > 0:
b.request.sendBlocks(task.id, blocks)
b.network.request.sendBlocks(
task.id,
blocks)
# Remove successfully sent blocks
task.peerWants.keepIf(
@ -330,28 +368,76 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
if presence.have and price =? b.pricing.?price:
presence.price = price
wants.add(BlockPresence.init(presence))
if wants.len > 0:
b.request.sendPresence(task.id, wants)
func new*(
if wants.len > 0:
b.network.request.sendPresence(task.id, wants)
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks
##
while b.blockexcRunning:
let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx)
trace "Exiting blockexc task runner"
proc new*(
T: type BlockExcEngine,
localStore: BlockStore,
wallet: WalletRef,
request: BlockExcRequest = BlockExcRequest(),
scheduleTask: TaskScheduler = nil,
network: BlockExcNetwork,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =
proc taskScheduler(task: BlockExcPeerCtx): bool =
if not isNil(scheduleTask):
return scheduleTask(task)
let b = BlockExcEngine(
let engine = BlockExcEngine(
localStore: localStore,
pendingBlocks: PendingBlocksManager.new(),
peersPerRequest: peersPerRequest,
scheduleTask: taskScheduler,
request: request,
wallet: wallet
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
maxRetries: maxRetries,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize))
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
engine.setupPeer(peerId)
else:
engine.dropPeer(peerId)
if not isNil(network.switch):
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(
peer: PeerID,
wantList: WantList): Future[void] {.gcsafe.} =
engine.wantListHandler(peer, wantList)
proc blockPresenceHandler(
peer: PeerID,
presence: seq[BlockPresence]): Future[void] {.gcsafe.} =
engine.blockPresenceHandler(peer, presence)
proc blocksHandler(
peer: PeerID,
blocks: seq[bt.Block]): Future[void] {.gcsafe.} =
engine.blocksHandler(peer, blocks)
proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} =
engine.accountHandler(peer, account)
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
engine.paymentHandler(peer, payment)
network.handlers = BlockExcHandlers(
onWantList: blockWantListHandler,
onBlocks: blocksHandler,
onPresence: blockPresenceHandler,
onAccount: accountHandler,
onPayment: paymentHandler
)
return b
return engine

View File

@ -29,11 +29,11 @@ logScope:
const Codec* = "/dagger/blockexc/1.0.0"
type
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.}
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.}
AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.}
PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.}
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]): Future[void] {.gcsafe.}
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.}
PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.}
BlockExcHandlers* = object
onWantList*: WantListHandler
@ -73,7 +73,7 @@ type
proc handleWantList(
b: BlockExcNetwork,
peer: NetworkPeer,
list: WantList) =
list: WantList): Future[void] =
## Handle incoming want list
##
@ -126,12 +126,12 @@ proc broadcastWantList*(
wantType,
full,
sendDontHave)
asyncSpawn b.peers[id].send(Message(wantlist: wantList))
b.peers[id].broadcast(Message(wantlist: wantList))
proc handleBlocks(
b: BlockExcNetwork,
peer: NetworkPeer,
blocks: seq[auto]) =
blocks: seq[auto]): Future[void] =
## Handle incoming blocks
##
@ -143,11 +143,9 @@ proc handleBlocks(
var blks: seq[bt.Block]
for blk in blocks:
when blk is pb.Block:
if b =? bt.Block.new(Cid.init(blk.prefix).get(), blk.data):
blks.add(b)
blks.add(bt.Block.init(Cid.init(blk.prefix).get(), blk.data))
elif blk is seq[byte]:
if b =? bt.Block.new(Cid.init(blk).get(), blk):
blks.add(b)
blks.add(bt.Block.init(Cid.init(blk).get(), blk))
else:
error("Invalid block type")
@ -176,12 +174,12 @@ proc broadcastBlocks*(
return
trace "Sending blocks to peer", peer = id, len = blocks.len
asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks)))
b.peers[id].broadcast(pb.Message(payload: makeBlocks(blocks)))
proc handleBlockPresence(
b: BlockExcNetwork,
peer: NetworkPeer,
presence: seq[BlockPresence]) =
presence: seq[BlockPresence]): Future[void] =
## Handle block presence
##
@ -202,11 +200,11 @@ proc broadcastBlockPresence*(
return
trace "Sending presence to peer", peer = id
asyncSpawn b.peers[id].send(Message(blockPresences: presence))
b.peers[id].broadcast(Message(blockPresences: presence))
proc handleAccount(network: BlockExcNetwork,
peer: NetworkPeer,
account: Account) =
account: Account): Future[void] =
if network.handlers.onAccount.isNil:
return
network.handlers.onAccount(peer.id, account)
@ -218,7 +216,7 @@ proc broadcastAccount*(network: BlockExcNetwork,
return
let message = Message(account: AccountMessage.init(account))
asyncSpawn network.peers[id].send(message)
network.peers[id].broadcast(message)
proc broadcastPayment*(network: BlockExcNetwork,
id: PeerId,
@ -227,11 +225,11 @@ proc broadcastPayment*(network: BlockExcNetwork,
return
let message = Message(payment: StateChannelUpdate.init(payment))
asyncSpawn network.peers[id].send(message)
network.peers[id].broadcast(message)
proc handlePayment(network: BlockExcNetwork,
peer: NetworkPeer,
payment: SignedState) =
payment: SignedState): Future[void] =
if network.handlers.onPayment.isNil:
return
network.handlers.onPayment(peer.id, payment)
@ -239,19 +237,19 @@ proc handlePayment(network: BlockExcNetwork,
proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} =
try:
if msg.wantlist.entries.len > 0:
b.handleWantList(peer, msg.wantlist)
await b.handleWantList(peer, msg.wantlist)
if msg.payload.len > 0:
b.handleBlocks(peer, msg.payload)
await b.handleBlocks(peer, msg.payload)
if msg.blockPresences.len > 0:
b.handleBlockPresence(peer, msg.blockPresences)
await b.handleBlockPresence(peer, msg.blockPresences)
if account =? Account.init(msg.account):
b.handleAccount(peer, account)
await b.handleAccount(peer, account)
if payment =? SignedState.init(msg.payment):
b.handlePayment(peer, payment)
await b.handlePayment(peer, payment)
except CatchableError as exc:
trace "Exception in blockexc rpc handler", exc = exc.msg

View File

@ -38,9 +38,10 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
try:
while not conn.atEof:
let data = await conn.readLp(MaxMessageSize)
let msg: Message = Protobuf.decode(data, Message)
trace "Got message for peer", peer = b.id, msg
let
data = await conn.readLp(MaxMessageSize)
msg: Message = Protobuf.decode(data, Message)
trace "Got message for peer", peer = b.id
await b.handler(b, msg)
except CatchableError as exc:
trace "Exception in blockexc read loop", exc = exc.msg
@ -62,9 +63,18 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
trace "Unable to get send connection for peer message not sent", peer = b.id
return
trace "Sending message to remote", peer = b.id, msg = $msg
trace "Sending message to remote", peer = b.id
await conn.writeLp(Protobuf.encode(msg))
proc broadcast*(b: NetworkPeer, msg: Message) =
proc sendAwaiter() {.async.} =
try:
await b.send(msg)
except CatchableError as exc:
trace "Exception broadcasting message to peer", peer = b.id, exc = exc.msg
asyncSpawn sendAwaiter()
func new*(
T: type NetworkPeer,
peer: PeerId,

View File

@ -9,6 +9,7 @@
import std/tables
import pkg/questionable
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
@ -24,8 +25,7 @@ type
proc addOrAwait*(
p: PendingBlocksManager,
cid: Cid):
Future[Block] {.async.} =
cid: Cid): Future[Block] {.async.} =
## Add an event for a block
##
@ -33,9 +33,8 @@ proc addOrAwait*(
p.blocks[cid] = newFuture[Block]()
trace "Adding pending future for block", cid
let blk = p.blocks[cid]
try:
return await blk
return await p.blocks[cid]
except CancelledError as exc:
trace "Blocks cancelled", exc = exc.msg, cid
raise exc

View File

@ -1,62 +0,0 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ./blockstream
export blockstream
type
BlockSetRef* = ref object of BlockStreamRef
stream*: BlockStreamRef
hcodec*: MultiCodec
proc hashBytes*(mh: MultiHash): seq[byte] =
mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)]
proc hashBytes*(b: Block): seq[byte] =
if mh =? b.cid.mhash:
return mh.hashBytes()
method nextBlock*(d: BlockSetRef): ?!Block =
d.stream.nextBlock()
proc treeHash*(d: BlockSetRef): ?!MultiHash =
var
stack: seq[seq[byte]]
while true:
let (blk1, blk2) = (d.nextBlock().option, d.nextBlock().option)
if blk1.isNone and blk2.isNone and stack.len == 1:
let res = MultiHash.digest($d.hcodec, stack[0])
if mh =? res:
return success mh
return failure($res.error)
if blk1.isSome: stack.add((!blk1).hashBytes())
if blk2.isSome: stack.add((!blk2).hashBytes())
while stack.len > 1:
let (b1, b2) = (stack.pop(), stack.pop())
let res = MultiHash.digest($d.hcodec, b1 & b2)
if mh =? res:
stack.add(mh.hashBytes())
else:
return failure($res.error)
func new*(
T: type BlockSetRef,
stream: BlockStreamRef,
hcodec: MultiCodec = multiCodec("sha2-256")): T =
T(stream: stream, hcodec: hcodec)

View File

@ -1,12 +0,0 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import ./blockstream/[blockstream, chunkedblockstream]
export blockstream, chunkedblockstream

View File

@ -1,29 +0,0 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/questionable
import pkg/questionable/results
import ../blocktype
export blocktype
type
BlockStreamRef* = ref object of RootObj
method nextBlock*(b: BlockStreamRef): ?!Block {.base.} =
doAssert(false, "Not implemented!")
iterator items*(b: BlockStreamRef): Block =
while true:
without blk =? b.nextBlock():
break
yield blk

View File

@ -1,28 +0,0 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/questionable
import pkg/questionable/results
import ./blockstream
import ../chunker
type
ChunkedBlockStreamRef* = ref object of BlockStreamRef
chunker*: Chunker
method nextBlock*(c: ChunkedBlockStreamRef): ?!Block =
let data: seq[byte] = c.chunker.getBytes()
if data.len > 0:
return Block.new(data)
func new*(T: type ChunkedBlockStreamRef, chunker: Chunker): T =
T(chunker: chunker)

View File

@ -10,8 +10,6 @@
{.push raises: [Defect].}
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/byteutils
type
@ -23,31 +21,22 @@ proc `$`*(b: Block): string =
result &= "cid: " & $b.cid
result &= "\ndata: " & string.fromBytes(b.data)
func new*(
func init*(
T: type Block,
data: openArray[byte] = [],
version = CIDv1,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("raw")): ?!T =
codec = multiCodec("raw")): T =
let hash = MultiHash.digest($hcodec, data).get()
success Block(
Block(
cid: Cid.init(version, codec, hash).get(),
data: @data)
func new*(
func init*(
T: type Block,
cid: Cid,
data: openArray[byte] = [],
verify: bool = false): ?!T =
let res = Block.new(
data,
cid.cidver,
cid.mhash.get().mcodec,
cid.mcodec
)
if b =? res:
if verify and cid != b.cid:
return failure("The suplied Cid doesn't match the data!")
res
verify: bool = false): T =
Block(
cid: cid,
data: @data)

View File

@ -11,12 +11,12 @@
{.push raises: [Defect].}
import std/sequtils
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/libp2p except shuffle
import ./rng
import ./blocktype
export blocktype
@ -26,118 +26,116 @@ const
type
# default reader type
ChunkBuffer* = ptr UncheckedArray[byte]
Reader* =
proc(data: var openArray[byte], offset: Natural = 0): int
{.gcsafe, closure, raises: [Defect].}
proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].}
ChunkerType* {.pure.} = enum
SizedChunker
FixedChunker
RabinChunker
Chunker* = ref object of RootObj
Chunker* = ref object
reader*: Reader
size*: Natural
pos*: Natural
case kind*: ChunkerType:
of SizedChunker:
of FixedChunker:
chunkSize*: Natural
pad*: bool # pad last block if less than size
of RabinChunker:
discard
proc getBytes*(c: Chunker): seq[byte] =
FileChunker* = Chunker
LPStreamChunker* = Chunker
proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
## returns a chunk of bytes from
## the instantiated chunker
##
if c.pos >= c.size:
return
var buff = newSeq[byte](c.chunkSize)
let read = await c.reader(cast[ChunkBuffer](addr buff[0]), buff.len)
var bytes = newSeq[byte](c.chunkSize)
let read = c.reader(bytes, c.pos)
c.pos += read
if read <= 0:
return @[]
if not c.pad and bytes.len != read:
bytes.setLen(read)
if not c.pad and buff.len > read:
buff.setLen(read)
return bytes
iterator items*(c: Chunker): seq[byte] =
while true:
let chunk = c.getBytes()
if chunk.len <= 0:
break
yield chunk
return buff
func new*(
T: type Chunker,
kind = ChunkerType.SizedChunker,
kind = ChunkerType.FixedChunker,
reader: Reader,
size: Natural,
chunkSize = DefaultChunkSize,
pad = false): T =
var chunker = Chunker(
kind: kind,
reader: reader,
size: size)
reader: reader)
if kind == ChunkerType.SizedChunker:
if kind == ChunkerType.FixedChunker:
chunker.pad = pad
chunker.chunkSize = chunkSize
return chunker
proc newRandomChunker*(
rng: Rng,
size: int64,
kind = ChunkerType.SizedChunker,
proc new*(
T: type LPStreamChunker,
stream: LPStream,
kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize,
pad = false): Chunker =
## create a chunker that produces
## random data
##
proc reader(data: var openArray[byte], offset: Natural = 0): int =
var alpha = toSeq(byte('A')..byte('z'))
var read = 0
while read <= data.high:
rng.shuffle(alpha)
for a in alpha:
if read > data.high:
break
data[read] = a
read.inc
return read
Chunker.new(
kind = ChunkerType.SizedChunker,
reader = reader,
size = size,
pad = pad,
chunkSize = chunkSize)
proc newFileChunker*(
file: File,
kind = ChunkerType.SizedChunker,
chunkSize = DefaultChunkSize,
pad = false): Chunker =
pad = false): T =
## create the default File chunker
##
proc reader(data: var openArray[byte], offset: Natural = 0): int =
proc reader(data: ChunkBuffer, len: int): Future[int]
{.gcsafe, async, raises: [Defect].} =
var res = 0
try:
return file.readBytes(data, 0, data.len)
except IOError as exc:
# TODO: revisit error handling - should this be fatal?
while res < len:
res += await stream.readOnce(addr data[res], len - res)
except LPStreamEOFError as exc:
trace "LPStreamChunker stream Eof", exc = exc.msg
except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg)
return res
Chunker.new(
kind = ChunkerType.SizedChunker,
kind = ChunkerType.FixedChunker,
reader = reader,
pad = pad,
chunkSize = chunkSize)
proc new*(
T: type FileChunker,
file: File,
kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize,
pad = false): T =
## create the default File chunker
##
proc reader(data: ChunkBuffer, len: int): Future[int]
{.gcsafe, async, raises: [Defect].} =
var total = 0
try:
while total < len:
let res = file.readBuffer(addr data[total], len - total)
if res <= 0:
break
total += res
except IOError as exc:
trace "Exception reading file", exc = exc.msg
except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg)
return total
Chunker.new(
kind = ChunkerType.FixedChunker,
reader = reader,
size = try: file.getFileSize() except: 0, # TODO: should do something smarter abou this
pad = pad,
chunkSize = chunkSize)

88
dagger/conf.nim Normal file
View File

@ -0,0 +1,88 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/os
import std/options
import pkg/chronicles
import pkg/confutils/defs
import pkg/libp2p
const
DefaultTcpListenMultiAddr = "/ip4/0.0.0.0/tcp/0"
type
StartUpCommand* {.pure.} = enum
noCommand,
initNode
DaggerConf* = object
logLevel* {.
defaultValue: LogLevel.INFO
desc: "Sets the log level" }: LogLevel
dataDir* {.
desc: "The directory where dagger will store configuration and data."
defaultValue: defaultDataDir()
defaultValueDesc: ""
abbr: "d"
name: "data-dir" }: OutDir
case cmd* {.
command
defaultValue: noCommand }: StartUpCommand
of noCommand:
listenAddrs* {.
desc: "Specifies one or more listening multiaddrs for the node to listen on."
defaultValue: @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
defaultValueDesc: "/ip4/0.0.0.0/tcp/0"
abbr: "a"
name: "listen-addrs" }: seq[MultiAddress]
bootstrapNodes* {.
desc: "Specifies one or more bootstrap nodes to use when connecting to the network."
abbr: "b"
name: "bootstrap-nodes" }: seq[MultiAddress]
maxPeers* {.
desc: "The maximum number of peers to connect to"
defaultValue: 160
name: "max-peers" }: int
agentString* {.
defaultValue: "Dagger"
desc: "Node agent string which is used as identifier in network"
name: "agent-string" }: string
apiPort* {.
desc: "The REST Api port",
defaultValue: 8080
defaultValueDesc: "8080"
name: "api-port"
abbr: "p" }: int
of initNode:
discard
proc defaultDataDir*(): string =
let dataDir = when defined(windows):
"AppData" / "Roaming" / "Dagger"
elif defined(macosx):
"Library" / "Application Support" / "Dagger"
else:
".cache" / "dagger"
getHomeDir() / dataDir
func parseCmdArg*(T: type MultiAddress, input: TaintedString): T
{.raises: [ValueError, LPError, Defect].} =
MultiAddress.init($input).tryGet()

View File

@ -7,3 +7,77 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import std/os
import pkg/chronicles
import pkg/chronos
import pkg/presto
import pkg/libp2p
import pkg/confutils
import pkg/confutils/defs
import pkg/nitro
import pkg/stew/io2
import ./node
import ./conf
import ./rng
import ./rest/api
import ./stores/fsstore
import ./stores/networkstore
import ./blockexchange
import ./utils/fileutils
type
DaggerServer* = ref object
runHandle: Future[void]
config: DaggerConf
restServer: RestServerRef
daggerNode: DaggerNodeRef
proc run*(s: DaggerServer) {.async.} =
s.restServer.start()
await s.daggerNode.start()
s.runHandle = newFuture[void]()
await s.runHandle
proc shutdown*(s: DaggerServer) {.async.} =
await allFuturesThrowing(
s.restServer.stop(), s.daggerNode.stop())
s.runHandle.complete()
proc new*(T: type DaggerServer, config: DaggerConf): T =
let
switch = SwitchBuilder
.new()
.withAddresses(config.listenAddrs)
.withRng(Rng.instance())
.withNoise()
.withMplex(5.minutes, 5.minutes)
.withMaxConnections(config.maxPeers)
.withAgentVersion(config.agentString)
.withTcpTransport({ServerFlags.ReuseAddr})
.build()
let
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = FSStore.new(config.dataDir / "repo")
engine = BlockExcEngine.new(localStore, wallet, network)
store = NetworkStore.new(engine, localStore)
daggerNode = DaggerNodeRef.new(switch, store, engine)
restServer = RestServerRef.new(
daggerNode.initRestApi(),
initTAddress("127.0.0.1" , config.apiPort),
bufferSize = (1024 * 64),
maxRequestBodySize = int.high)
.tryGet()
switch.mount(network)
T(
config: config,
daggerNode: daggerNode,
restServer: restServer)

22
dagger/errors.nim Normal file
View File

@ -0,0 +1,22 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/stew/results
type
DaggerError* = object of CatchableError # base dagger error
DaggerResult*[T] = Result[T, ref DaggerError]
template mapFailure*(
exp: untyped,
exc: typed = type DaggerError): untyped =
## Convert `Result[T, E]` to `Result[E, ref CatchableError]`
##
((exp.mapErr do (e: auto) -> ref CatchableError: (ref exc)(msg: $e)))

178
dagger/manifest.nim Normal file
View File

@ -0,0 +1,178 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/libp2p
import pkg/libp2p/protobuf/minprotobuf
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import ./blocktype
import ./errors
const
ManifestCodec* = multiCodec("dag-pb")
var
emptyDigest {.threadvar.}: array[CidVersion, MultiHash]
type
BlocksManifest* = object
blocks: seq[Cid]
htree: ?Cid
version*: CidVersion
hcodec*: MultiCodec
codec*: MultiCodec
proc len*(b: BlocksManifest): int = b.blocks.len
iterator items*(b: BlocksManifest): Cid =
for b in b.blocks:
yield b
proc hashBytes(mh: MultiHash): seq[byte] =
mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)]
proc cid*(b: var BlocksManifest): ?!Cid =
if htree =? b.htree:
return htree.success
var
stack: seq[MultiHash]
if stack.len == 1:
stack.add(emptyDigest[b.version])
for cid in b.blocks:
stack.add(? cid.mhash.mapFailure)
while stack.len > 1:
let
(b1, b2) = (stack.pop(), stack.pop())
mh = ? MultiHash.digest(
$b.hcodec,
(b1.hashBytes() & b2.hashBytes()))
.mapFailure
stack.add(mh)
if stack.len == 1:
let cid = ? Cid.init(b.version, b.codec, stack[0]).mapFailure
b.htree = cid.some
return cid.success
proc put*(b: var BlocksManifest, cid: Cid) =
b.htree = Cid.none
trace "Adding cid to manifest", cid
b.blocks.add(cid)
proc contains*(b: BlocksManifest, cid: Cid): bool =
cid in b.blocks
proc encode*(b: var BlocksManifest): ?!seq[byte] =
## Encode the manifest into a ``ManifestCodec``
## multicodec container (Dag-pb) for now
var pbNode = initProtoBuffer()
for c in b.blocks:
var pbLink = initProtoBuffer()
pbLink.write(1, c.data.buffer) # write Cid links
pbLink.finish()
pbNode.write(2, pbLink)
let cid = ? b.cid
pbNode.write(1, cid.data.buffer) # set the treeHash Cid as the data field
pbNode.finish()
return pbNode.buffer.success
proc decode*(_: type BlocksManifest, data: seq[byte]): ?!(Cid, seq[Cid]) =
## Decode a manifest from a byte seq
##
var
pbNode = initProtoBuffer(data)
cidBuf: seq[byte]
blocks: seq[Cid]
if pbNode.getField(1, cidBuf).isOk:
let cid = ? Cid.init(cidBuf).mapFailure
var linksBuf: seq[seq[byte]]
if pbNode.getRepeatedField(2, linksBuf).isOk:
for pbLinkBuf in linksBuf:
var
blocksBuf: seq[seq[byte]]
blockBuf: seq[byte]
pbLink = initProtoBuffer(pbLinkBuf)
if pbLink.getField(1, blockBuf).isOk:
let cidRes = Cid.init(blockBuf)
if cidRes.isOk:
blocks.add(cidRes.get())
return (cid, blocks).success
proc init*(
T: type BlocksManifest,
blocks: openArray[Cid] = [],
version = CIDv1,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("raw")): ?!T =
## Create a manifest using array of `Cid`s
##
# Only gets initialized once
once:
# TODO: The CIDs should be initialized at compile time,
# but the VM fails due to a `memmove` being invoked somewhere
for v in [CIDv0, CIDv1]:
let
cid = if v == CIDv1:
? Cid.init("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku").mapFailure
else:
? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure
mhash = ? cid.mhash.mapFailure
digest = ? MultiHash.digest(
$hcodec,
mhash.hashBytes()).mapFailure
emptyDigest[v] = digest
T(
blocks: @blocks,
version: version,
codec: codec,
hcodec: hcodec,
).success
proc init*(
T: type BlocksManifest,
blk: Block): ?!T =
## Create manifest from a raw manifest block
## (in dag-pb for for now)
##
let
(cid, blocks) = ? BlocksManifest.decode(blk.data)
mhash = ? cid.mhash.mapFailure
var
manifest = ? BlocksManifest.init(
blocks,
cid.version,
mhash.mcodec,
cid.mcodec)
if cid != (? manifest.cid):
return failure("Content hashes don't match!")
return manifest.success

181
dagger/node.nim Normal file
View File

@ -0,0 +1,181 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/options
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
# TODO: remove once exported by libp2p
import pkg/libp2p/routing_record
import pkg/libp2p/signed_envelope
import ./chunker
import ./blocktype as bt
import ./manifest
import ./stores/blockstore
import ./blockexchange
logScope:
topics = "dagger node"
const
FileChunkSize* = 4096 # file chunk read size
type
DaggerError = object of CatchableError
DaggerNodeRef* = ref object
switch*: Switch
networkId*: PeerID
blockStore*: BlockStore
engine*: BlockExcEngine
proc start*(node: DaggerNodeRef) {.async.} =
await node.switch.start()
await node.engine.start()
node.networkId = node.switch.peerInfo.peerId
notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
proc stop*(node: DaggerNodeRef) {.async.} =
await node.engine.stop()
await node.switch.stop()
proc findPeer*(
node: DaggerNodeRef,
peerId: PeerID): Future[?!PeerRecord] {.async.} =
discard
proc connect*(
node: DaggerNodeRef,
peerId: PeerID,
addrs: seq[MultiAddress]): Future[void] =
node.switch.connect(peerId, addrs)
proc streamBlocks*(
node: DaggerNodeRef,
stream: BufferStream,
blockManifest: BlocksManifest) {.async.} =
try:
# TODO: Read sequentially for now
# to prevent slurping the entire dataset
# since disk IO is blocking
for c in blockManifest:
without blk =? (await node.blockStore.getBlock(c)):
trace "Couldn't retrieve block", cid = c
continue
trace "Streaming block data", cid = blk.cid, bytes = blk.data.len
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Exception retrieving blocks", exc = exc.msg
finally:
await stream.pushEof()
proc retrieve*(
node: DaggerNodeRef,
stream: BufferStream,
cid: Cid): Future[?!void] {.async.} =
trace "Received retrieval request", cid
without blk =? await node.blockStore.getBlock(cid):
return failure(
newException(DaggerError, "Couldn't retrieve block for Cid!"))
without mc =? blk.cid.contentType():
return failure(
newException(DaggerError, "Couldn't identify Cid!"))
if mc == ManifestCodec:
trace "Retrieving data set", cid, mc
let res = BlocksManifest.init(blk)
if (res.isErr):
return failure(res.error.msg)
asyncSpawn node.streamBlocks(stream, res.get())
else:
asyncSpawn (proc(): Future[void] {.async.} =
try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid
discard
finally:
await stream.pushEof())()
return success()
proc store*(
node: DaggerNodeRef,
stream: LPStream): Future[?!Cid] {.async.} =
trace "Storing data"
without var blockManifest =? BlocksManifest.init():
return failure("Unable to create Block Set")
let
chunker = LPStreamChunker.new(stream)
try:
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
trace "Got data from stream", len = chunk.len
let
blk = bt.Block.init(chunk)
blockManifest.put(blk.cid)
if not (await node.blockStore.putBlock(blk)):
# trace "Unable to store block", cid = blk.cid
return failure("Unable to store block " & $blk.cid)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
finally:
await stream.close()
# Generate manifest
without data =? blockManifest.encode():
return failure(
newException(DaggerError, "Could not generate dataset manifest!"))
# Store as a dag-pb block
let manifest = bt.Block.init(data = data, codec = ManifestCodec)
if not (await node.blockStore.putBlock(manifest)):
trace "Unable to store manifest", cid = manifest.cid
return failure("Unable to store manifest " & $manifest.cid)
var cid: ?!Cid
if (cid = blockManifest.cid; cid.isErr):
trace "Unable to generate manifest Cid!", exc = cid.error.msg
return failure(cid.error.msg)
trace "Stored data", manifestCid = manifest.cid,
contentCid = !cid,
blocks = blockManifest.len
return manifest.cid.success
proc new*(
T: type DaggerNodeRef,
switch: Switch,
store: BlockStore,
engine: BlockExcEngine): T =
T(
switch: switch,
blockStore: store,
engine: engine)

190
dagger/rest/api.nim Normal file
View File

@ -0,0 +1,190 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/sequtils
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import pkg/presto
import pkg/libp2p
import pkg/libp2p/routing_record
import ../node
proc validate(
pattern: string,
value: string): int
{.gcsafe, raises: [Defect].} =
0
proc encodeString(cid: type Cid): Result[string, cstring] =
ok($cid)
proc decodeString(T: type Cid, value: string): Result[Cid, cstring] =
Cid.init(value)
.mapErr do(e: CidError) -> cstring:
case e
of CidError.Incorrect: "Incorrect Cid"
of CidError.Unsupported: "Unsupported Cid"
of CidError.Overrun: "Overrun Cid"
else: "Error parsing Cid"
proc encodeString(peerId: PeerID): Result[string, cstring] =
ok($peerId)
proc decodeString(T: type PeerID, value: string): Result[PeerID, cstring] =
PeerID.init(value)
proc encodeString(address: MultiAddress): Result[string, cstring] =
ok($address)
proc decodeString(T: type MultiAddress, value: string): Result[MultiAddress, cstring] =
MultiAddress
.init(value)
.mapErr do(e: string) -> cstring: cstring(e)
proc initRestApi*(node: DaggerNodeRef): RestRouter =
var router = RestRouter.init(validate)
router.api(
MethodGet,
"/api/dagger/v1/connect/{peerId}") do (
peerId: PeerID,
addrs: seq[MultiAddress]) -> RestApiResponse:
if peerId.isErr:
return RestApiResponse.error(
Http400,
$peerId.error())
let addresses = if addrs.isOk and addrs.get().len > 0:
addrs.get()
else:
let peerRecord = await node.findPeer(peerId.get())
if peerRecord.isErr:
return RestApiResponse.error(
Http400,
"Unable to find Peer!")
peerRecord.get().addresses.mapIt(
it.address
)
await node.connect(peerId.get(), addresses)
return RestApiResponse.response("")
router.api(
MethodGet,
"/api/dagger/v1/download/{id}") do (
id: Cid, resp: HttpResponseRef) -> RestApiResponse:
if id.isErr:
return RestApiResponse.error(
Http400,
$id.error())
let
stream = BufferStream.new()
var bytes = 0
try:
if (
let retr = await node.retrieve(stream, id.get());
retr.isErr):
return RestApiResponse.error(Http400, retr.error.msg)
await resp.prepareChunked()
while not stream.atEof:
var
buff = newSeqUninitialized[byte](FileChunkSize)
len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if buff.len <= 0:
break
bytes += buff.len
trace "Sending cunk", size = buff.len
await resp.sendChunk(addr buff[0], buff.len)
except CatchableError as exc:
trace "Excepting streaming blocks", exc = exc.msg
return RestApiResponse.error(Http500)
finally:
trace "Sent bytes", cid = id.get(), bytes
await stream.close()
await resp.finish()
router.rawApi(
MethodPost,
"/api/dagger/v1/upload") do (
) -> RestApiResponse:
trace "Handling file upload"
var bodyReader = request.getBodyReader()
if bodyReader.isErr():
return RestApiResponse.error(Http500)
# Attempt to handle `Expect` header
# some clients (curl), waits 1000ms
# before giving up
#
await request.handleExpect()
let
reader = bodyReader.get()
stream = BufferStream.new()
storeFut = node.store(stream)
var bytes = 0
try:
while not reader.atEof:
var
buff = newSeqUninitialized[byte](FileChunkSize)
len = await reader.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if len <= 0:
break
trace "Got chunk from endpoint", len = buff.len
await stream.pushData(buff)
bytes += len
await stream.pushEof()
without cid =? (await storeFut):
return RestApiResponse.error(Http500)
trace "Uploaded file", bytes, cid = $cid
return RestApiResponse.response($cid)
except CancelledError as exc:
await reader.closeWait()
return RestApiResponse.error(Http500)
except AsyncStreamError:
await reader.closeWait()
return RestApiResponse.error(Http500)
finally:
await stream.close()
await reader.closeWait()
# if we got here something went wrong?
return RestApiResponse.error(Http500)
router.api(
MethodGet,
"/api/dagger/v1/info") do () -> RestApiResponse:
var addrs: string
for a in node.switch.peerInfo.addrs:
addrs &= "- " & $a & "\n"
return RestApiResponse.response(
"Id: " & $node.switch.peerInfo.peerId &
"\nAddrs: \n" & addrs & "\n")
return router

View File

@ -1,3 +1,7 @@
import ./stores/[memorystore, blockstore, blockexchange]
import ./stores/[
memorystore,
blockstore,
networkstore,
fsstore]
export memorystore, blockstore, blockexchange
export memorystore, blockstore, networkstore, fsstore

View File

@ -1,174 +0,0 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import ../blocktype as bt
import ../utils/asyncheapqueue
import ./blockstore
import ../blockexchange/network
import ../blockexchange/engine
import ../blockexchange/peercontext
import ../blockexchange/protobuf/blockexc as pb
export blockstore, network, engine, asyncheapqueue
logScope:
topics = "dagger blockexc"
const
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
DefaultMaxRetries = 3
type
BlockExc* = ref object of BlockStore
engine*: BlockExcEngine # blockexc decision engine
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
blockexcTasks: seq[Future[void]] # future to control blockexc task
blockexcRunning: bool # indicates if the blockexc task is running
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
taskHandler: TaskHandler # handler provided by the engine called by the runner
proc blockexcTaskRunner(b: BlockExc) {.async.} =
## process tasks
##
while b.blockexcRunning:
let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx)
trace "Exiting blockexc task runner"
proc start*(b: BlockExc) {.async.} =
## Start the blockexc task
##
trace "blockexc start"
if b.blockexcTasks.len > 0:
warn "Starting blockexc twice"
return
b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(b.blockexcTaskRunner)
proc stop*(b: BlockExc) {.async.} =
## Stop the blockexc blockexc
##
trace "BlockExc stop"
if b.blockexcTasks.len <= 0:
warn "Stopping blockexc without starting it"
return
b.blockexcRunning = false
for t in b.blockexcTasks:
if not t.finished:
trace "Awaiting task to stop"
t.cancel()
trace "Task stopped"
trace "BlockExc stopped"
method getBlocks*(b: BlockExc, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
## Get a block from a remote peer
##
let blocks = await allFinished(b.engine.requestBlocks(cid))
return blocks.filterIt(
not it.failed
).mapIt(
it.read
)
method putBlocks*(b: BlockExc, blocks: seq[bt.Block]) =
b.engine.resolveBlocks(blocks)
procCall BlockStore(b).putBlocks(blocks)
proc new*(
T: type BlockExc,
localStore: BlockStore,
wallet: WalletRef,
network: BlockExcNetwork,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =
let engine = BlockExcEngine.new(
localStore = localStore,
wallet = wallet,
peersPerRequest = peersPerRequest,
request = network.request,
)
let b = BlockExc(
engine: engine,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
concurrentTasks: concurrentTasks,
maxRetries: maxRetries,
)
# attach engine's task handler
b.taskHandler = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} =
engine.taskHandler(task)
# attach task scheduler to engine
engine.scheduleTask = proc(task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
# TODO: temporary until libp2p moves back to PeerID
if event.kind == PeerEventKind.Joined:
b.engine.setupPeer(peerId)
else:
b.engine.dropPeer(peerId)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(
peer: PeerID,
wantList: WantList) {.gcsafe.} =
engine.wantListHandler(peer, wantList)
proc blockPresenceHandler(
peer: PeerID,
presence: seq[BlockPresence]) {.gcsafe.} =
engine.blockPresenceHandler(peer, presence)
proc blocksHandler(
peer: PeerID,
blocks: seq[bt.Block]) {.gcsafe.} =
engine.blocksHandler(peer, blocks)
proc accountHandler(peer: PeerId, account: Account) =
engine.accountHandler(peer, account)
proc paymentHandler(peer: PeerId, payment: SignedState) =
engine.paymentHandler(peer, payment)
network.handlers = BlockExcHandlers(
onWantList: blockWantListHandler,
onBlocks: blocksHandler,
onPresence: blockPresenceHandler,
onAccount: accountHandler,
onPayment: paymentHandler
)
return b

View File

@ -7,79 +7,53 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
{.push raises: [Defect].}
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ../blocktype
export blocktype, libp2p
type
ChangeType* {.pure.} = enum
Added, Removed
BlockStoreChangeEvt* = object
cids*: seq[Cid]
kind*: ChangeType
BlocksChangeHandler* = proc(evt: BlockStoreChangeEvt) {.gcsafe, closure.}
BlockStore* = ref object of RootObj
changeHandlers: array[ChangeType, seq[BlocksChangeHandler]]
proc addChangeHandler*(
s: BlockStore,
handler: BlocksChangeHandler,
changeType: ChangeType) =
s.changeHandlers[changeType].add(handler)
proc removeChangeHandler*(
s: BlockStore,
handler: BlocksChangeHandler,
changeType: ChangeType) =
s.changeHandlers[changeType].keepItIf( it != handler )
proc triggerChange(
s: BlockStore,
changeType: ChangeType,
cids: seq[Cid]) =
let evt = BlockStoreChangeEvt(
kind: changeType,
cids: cids,
)
for handler in s.changeHandlers[changeType]:
handler(evt)
{.push locks:"unknown".}
method getBlocks*(b: BlockStore, cid: seq[Cid]): Future[seq[Block]] {.base.} =
method getBlock*(
b: BlockStore,
cid: Cid): Future[?!Block] {.base.} =
## Get a block from the stores
##
doAssert(false, "Not implemented!")
method putBlock*(
s: BlockStore,
blk: Block): Future[bool] {.base.} =
## Put a block to the blockstore
##
doAssert(false, "Not implemented!")
method delBlock*(
s: BlockStore,
cid: Cid): Future[bool] {.base.} =
## Delete a block/s from the block store
##
doAssert(false, "Not implemented!")
{.pop.}
method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
## Check if the block exists in the blockstore
##
return false
method putBlocks*(s: BlockStore, blocks: seq[Block]) {.base.} =
## Put a block to the blockstore
##
s.triggerChange(ChangeType.Added, blocks.mapIt( it.cid ))
method delBlocks*(s: BlockStore, blocks: seq[Cid]) {.base.} =
## Delete a block/s from the block store
##
s.triggerChange(ChangeType.Removed, blocks)
{.pop.}
proc contains*(s: BlockStore, blk: Cid): bool =
s.hasBlock(blk)

115
dagger/stores/fsstore.nim Normal file
View File

@ -0,0 +1,115 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/os
import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/io2
import ./memorystore
import ./blockstore
import ../blocktype
export blockstore
logScope:
topics = "dagger fsstore"
type
FSStore* = ref object of BlockStore
cache: BlockStore
repoDir: string
postfixLen*: int
template blockPath*(self: FSStore, cid: Cid): string =
self.repoDir / ($cid)[^self.postfixLen..^1] / $cid
method getBlock*(
self: FSStore,
cid: Cid): Future[?!Block] {.async.} =
## Get a block from the stores
##
if cid notin self:
return Block.failure("Couldn't find block in fs store")
var data: seq[byte]
let path = self.blockPath(cid)
if (
let res = io2.readFile(path, data);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Cannot read file from fs store", path , error
return Block.failure("Cannot read file from fs store")
return Block.init(cid, data).success
method putBlock*(
self: FSStore,
blk: Block): Future[bool] {.async.} =
## Put a block to the blockstore
##
if blk.cid in self:
return true
# if directory exists it wont fail
if io2.createPath(self.blockPath(blk.cid).parentDir).isErr:
trace "Unable to create block prefix dir", dir = self.blockPath(blk.cid).parentDir
return false
let path = self.blockPath(blk.cid)
if (
let res = io2.writeFile(path, blk.data);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to store block", path, cid = blk.cid, error
return false
return true
method delBlock*(
self: FSStore,
cid: Cid): Future[bool] {.async.} =
## Delete a block/s from the block store
##
let path = self.blockPath(cid)
if (
let res = io2.removeFile(path);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to delete block", path, cid, error
return false
return true
{.pop.}
method hasBlock*(self: FSStore, cid: Cid): bool =
## Check if the block exists in the blockstore
##
self.blockPath(cid).isFile()
proc new*(
T: type FSStore,
repoDir: string,
postfixLen = 2,
cache: BlockStore = MemoryStore.new()): T =
T(
postfixLen: postfixLen,
repoDir: repoDir,
cache: cache)

View File

@ -1,45 +0,0 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import ./blockstore
type
BlockStoreManager* = ref object of BlockStore
stores: seq[BlockStore]
proc addProvider*(b: BlockStoreManager, provider: BlockStore) =
b.stores.add(provider)
proc removeProvider*(b: BlockStoreManager, provider: BlockStore) =
b.stores.keepItIf( it != provider )
method addChangeHandler*(
s: BlockStoreManager,
handler: BlocksChangeHandler,
changeType: ChangeType) =
## Add change handler to all registered
## block stores
##
for p in s.stores:
p.addChangeHandler(handler, changeType)
method removeChangeHandler*(
s: BlockStoreManager,
handler: BlocksChangeHandler,
changeType: ChangeType) =
## Remove change handler from all registered
## block stores
##
for p in s.stores:
p.removeChangeHandler(handler, changeType)

View File

@ -7,54 +7,73 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ../stores/blockstore
import ./blockstore
import ../blocktype
export blockstore
logScope:
topics = "dagger memstore"
type
MemoryStore* = ref object of BlockStore
blocks: seq[Block] # TODO: Should be an LRU cache
method getBlocks*(
s: MemoryStore,
cids: seq[Cid]): Future[seq[Block]] {.async.} =
method getBlock*(
b: MemoryStore,
cid: Cid): Future[?!Block] {.async.} =
## Get a block from the stores
##
var res: seq[Block]
for c in cids:
res.add(s.blocks.filterIt( it.cid == c ))
trace "Getting block", cid
let found = b.blocks.filterIt(
it.cid == cid
)
return res
if found.len <= 0:
return failure("Couldn't get block")
trace "Retrieved block", cid
return found[0].success
method hasBlock*(s: MemoryStore, cid: Cid): bool =
## check if the block exists
##
s.blocks.filterIt( it.cid == cid ).len > 0
s.blocks.anyIt( it.cid == cid )
method putBlocks*(s: MemoryStore, blocks: seq[Block]) =
method putBlock*(
s: MemoryStore,
blk: Block): Future[bool] {.async.} =
## Put a block to the blockstore
##
s.blocks.add(blocks)
procCall BlockStore(s).putBlocks(blocks)
trace "Putting block", cid = blk.cid
s.blocks.add(blk)
method delBlocks*(s: MemoryStore, cids: seq[Cid]) =
return blk.cid in s
method delBlock*(
s: MemoryStore,
cid: Cid): Future[bool] {.async.} =
## delete a block/s from the block store
##
for c in cids:
s.blocks.keepItIf( it.cid != c )
s.blocks.keepItIf( it.cid != cid )
return cid notin s
procCall BlockStore(s).delBlocks(cids)
func new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
func new*(_: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
MemoryStore(
blocks: @blocks
)

View File

@ -0,0 +1,91 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import ../blocktype as bt
import ../utils/asyncheapqueue
import ./blockstore
import ../blockexchange/network
import ../blockexchange/engine
import ../blockexchange/peercontext
export blockstore, network, engine, asyncheapqueue
logScope:
topics = "dagger networkstore"
type
NetworkStore* = ref object of BlockStore
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store
method getBlock*(
self: NetworkStore,
cid: Cid): Future[?!bt.Block] {.async.} =
## Get a block from a remote peer
##
trace "Getting block", cid
without var blk =? (await self.localStore.getBlock(cid)):
trace "Couldn't get from local store", cid
blk = try:
await self.engine.requestBlock(cid)
except CatchableError as exc:
trace "Exception requestig block", cid, exc = exc.msg
return failure(exc.msg)
trace "Retrieved block from local store", cid
return blk.success
method putBlock*(
self: NetworkStore,
blk: bt.Block): Future[bool] {.async.} =
trace "Puting block", cid = blk.cid
if not (await self.localStore.putBlock(blk)):
return false
self.engine.resolveBlocks(@[blk])
return true
method delBlock*(
self: NetworkStore,
cid: Cid): Future[bool] =
## Delete a block/s from the block store
##
self.localStore.delBlock(cid)
{.pop.}
method hasBlock*(
self: NetworkStore,
cid: Cid): bool =
## Check if the block exists in the blockstore
##
self.localStore.hasBlock(cid)
proc new*(
T: type NetworkStore,
engine: BlockExcEngine,
localStore: BlockStore): T =
let b = NetworkStore(
localStore: localStore,
engine: engine)
return b

106
dagger/utils/fileutils.nim Normal file
View File

@ -0,0 +1,106 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
## Partially taken from nim beacon chain
{.push raises: [Defect].}
import std/strutils
import pkg/chronicles
import stew/io2
export io2
when defined(windows):
import stew/[windows/acl]
proc secureCreatePath*(path: string): IoResult[void] =
when defined(windows):
let sres = createFoldersUserOnlySecurityDescriptor()
if sres.isErr():
error "Could not allocate security descriptor", path = path,
errorMsg = ioErrorMsg(sres.error), errorCode = $sres.error
err(sres.error)
else:
var sd = sres.get()
createPath(path, 0o700, secDescriptor = sd.getDescriptor())
else:
createPath(path, 0o700)
proc secureWriteFile*[T: byte|char](path: string,
data: openArray[T]): IoResult[void] =
when defined(windows):
let sres = createFilesUserOnlySecurityDescriptor()
if sres.isErr():
error "Could not allocate security descriptor", path = path,
errorMsg = ioErrorMsg(sres.error), errorCode = $sres.error
err(sres.error)
else:
var sd = sres.get()
writeFile(path, data, 0o600, secDescriptor = sd.getDescriptor())
else:
writeFile(path, data, 0o600)
proc checkAndCreateDataDir*(dataDir: string): bool =
when defined(posix):
let requiredPerms = 0o700
if isDir(dataDir):
let currPermsRes = getPermissions(dataDir)
if currPermsRes.isErr():
fatal "Could not check data directory permissions",
data_dir = dataDir, errorCode = $currPermsRes.error,
errorMsg = ioErrorMsg(currPermsRes.error)
return false
else:
let currPerms = currPermsRes.get()
if currPerms != requiredPerms:
warn "Data directory has insecure permissions. Correcting them.",
data_dir = dataDir,
current_permissions = currPerms.toOct(4),
required_permissions = requiredPerms.toOct(4)
let newPermsRes = setPermissions(dataDir, requiredPerms)
if newPermsRes.isErr():
fatal "Could not set data directory permissions",
data_dir = dataDir,
errorCode = $newPermsRes.error,
errorMsg = ioErrorMsg(newPermsRes.error),
old_permissions = currPerms.toOct(4),
new_permissions = requiredPerms.toOct(4)
return false
else:
let res = secureCreatePath(dataDir)
if res.isErr():
fatal "Could not create data directory", data_dir = dataDir,
errorMsg = ioErrorMsg(res.error), errorCode = $res.error
return false
elif defined(windows):
let amask = {AccessFlags.Read, AccessFlags.Write, AccessFlags.Execute}
if fileAccessible(dataDir, amask):
let cres = checkCurrentUserOnlyACL(dataDir)
if cres.isErr():
fatal "Could not check data folder's ACL",
data_dir = dataDir, errorCode = $cres.error,
errorMsg = ioErrorMsg(cres.error)
return false
else:
if cres.get() == false:
fatal "Data folder has insecure ACL", data_dir = dataDir
return false
else:
let res = secureCreatePath(dataDir)
if res.isErr():
fatal "Could not create data folder", data_dir = dataDir,
errorMsg = ioErrorMsg(res.error), errorCode = $res.error
return false
else:
fatal "Unsupported operation system"
return false
return true

View File

@ -14,31 +14,41 @@ import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import ./utils
import ../helpers
import ../examples
suite "BlockExc engine - 2 nodes":
suite "NetworkStore engine - 2 nodes":
let
chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks1 = chunker1.mapIt( !bt.Block.new(it) )
chunker2 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks2 = chunker2.mapIt( !bt.Block.new(it) )
chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
chunker2 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
var
switch1, switch2: Switch
wallet1, wallet2: WalletRef
pricing1, pricing2: Pricing
network1, network2: BlockExcNetwork
blockexc1, blockexc2: BlockExc
awaiters: seq[Future[void]]
blockexc1, blockexc2: NetworkStore
peerId1, peerId2: PeerID
peerCtx1, peerCtx2: BlockExcPeerCtx
done: Future[void]
blocks1, blocks2: seq[bt.Block]
engine1, engine2: BlockExcEngine
localStore1, localStore2: BlockStore
setup:
done = newFuture[void]()
while true:
let chunk = await chunker1.getBytes()
if chunk.len <= 0:
break
blocks1.add(bt.Block.init(chunk))
while true:
let chunk = await chunker2.getBytes()
if chunk.len <= 0:
break
blocks2.add(bt.Block.init(chunk))
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
@ -46,23 +56,27 @@ suite "BlockExc engine - 2 nodes":
wallet2 = WalletRef.example
pricing1 = Pricing.example
pricing2 = Pricing.example
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
await switch1.start()
await switch2.start()
peerId1 = switch1.peerInfo.peerId
peerId2 = switch2.peerInfo.peerId
localStore1 = MemoryStore.new(blocks1.mapIt( it ))
network1 = BlockExcNetwork.new(switch = switch1)
blockexc1 = BlockExc.new(MemoryStore.new(blocks1), wallet1, network1)
engine1 = BlockExcEngine.new(localStore1, wallet1, network1)
blockexc1 = NetworkStore.new(engine1, localStore1)
switch1.mount(network1)
localStore2 = MemoryStore.new(blocks2.mapIt( it ))
network2 = BlockExcNetwork.new(switch = switch2)
blockexc2 = BlockExc.new(MemoryStore.new(blocks2), wallet2, network2)
engine2 = BlockExcEngine.new(localStore2, wallet2, network2)
blockexc2 = NetworkStore.new(engine2, localStore2)
switch2.mount(network2)
await allFuturesThrowing(
blockexc1.start(),
blockexc2.start(),
engine1.start(),
engine2.start(),
)
# initialize our want lists
@ -84,13 +98,11 @@ suite "BlockExc engine - 2 nodes":
teardown:
await allFuturesThrowing(
blockexc1.stop(),
blockexc2.stop(),
engine1.stop(),
engine2.stop(),
switch1.stop(),
switch2.stop())
await allFuturesThrowing(awaiters)
test "should exchange want lists on connect":
check not isNil(peerCtx1)
check not isNil(peerCtx2)
@ -107,8 +119,8 @@ suite "BlockExc engine - 2 nodes":
check peerCtx2.account.?address == pricing2.address.some
test "should send want-have for block":
let blk = !bt.Block.new("Block 1".toBytes)
blockexc2.engine.localStore.putBlocks(@[blk])
let blk = bt.Block.init("Block 1".toBytes)
check await blockexc2.engine.localStore.putBlock(blk)
let entry = Entry(
`block`: blk.cid.data.buffer,
@ -118,69 +130,77 @@ suite "BlockExc engine - 2 nodes":
sendDontHave: false)
peerCtx1.peerWants.add(entry)
check blockexc2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
check blockexc2
.engine
.taskQueue
.pushOrUpdateNoWait(peerCtx1).isOk
await sleepAsync(100.millis)
check blockexc1.engine.localStore.hasBlock(blk.cid)
test "should get blocks from remote":
let blocks = await blockexc1.getBlocks(blocks2.mapIt( it.cid ))
check blocks == blocks2
let blocks = await allFinished(
blocks2.mapIt( blockexc1.getBlock(it.cid) ))
check blocks.mapIt( !it.read ) == blocks2
test "remote should send blocks when available":
let blk = !bt.Block.new("Block 1".toBytes)
let blk = bt.Block.init("Block 1".toBytes)
# should fail retrieving block from remote
check not await blockexc1.getBlocks(@[blk.cid])
check not await blockexc1.getBlock(blk.cid)
.withTimeout(100.millis) # should expire
proc onBlocks(evt: BlockStoreChangeEvt) =
check evt.cids == @[blk.cid]
done.complete()
blockexc1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added)
# first put the required block in the local store
blockexc2.engine.localStore.putBlocks(@[blk])
check await blockexc2.engine.localStore.putBlock(blk)
# second trigger blockexc to resolve any pending requests
# for the block
blockexc2.putBlocks(@[blk])
check await blockexc2.putBlock(blk)
await done
# should succeed retrieving block from remote
check await blockexc1.getBlock(blk.cid)
.withTimeout(100.millis) # should succede
test "receives payments for blocks that were sent":
let blocks = await blockexc1.getBlocks(blocks2.mapIt(it.cid))
let blocks = await allFinished(
blocks2.mapIt( blockexc1.getBlock(it.cid) ))
await sleepAsync(100.millis)
let channel = !peerCtx1.paymentChannel
check wallet2.balance(channel, Asset) > 0
suite "BlockExc - multiple nodes":
suite "NetworkStore - multiple nodes":
let
chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
blockexc: seq[BlockExc]
awaiters: seq[Future[void]]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.init(chunk))
for e in generateNodes(5):
switch.add(e.switch)
blockexc.add(e.blockexc)
await e.blockexc.start()
await e.blockexc.engine.start()
awaiters = switch.mapIt(
(await it.start())
).concat()
await allFuturesThrowing(
switch.mapIt( it.start() )
)
teardown:
await allFuturesThrowing(
switch.mapIt( it.stop() )
)
await allFuturesThrowing(awaiters)
switch = @[]
blockexc = @[]
test "should receive haves for own want list":
let
@ -191,18 +211,22 @@ suite "BlockExc - multiple nodes":
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
blockexc[0].engine.localStore.putBlocks(blocks[0..3])
blockexc[1].engine.localStore.putBlocks(blocks[4..7])
blockexc[2].engine.localStore.putBlocks(blocks[8..11])
blockexc[3].engine.localStore.putBlocks(blocks[12..15])
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
await allFutures(
blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) ))
await allFutures(
blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) ))
await allFutures(
blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) ))
await connectNodes(switch)
await sleepAsync(1.seconds)
check:
engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) ==
blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string])
engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) ==
blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string])
@ -215,11 +239,18 @@ suite "BlockExc - multiple nodes":
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
blockexc[0].engine.localStore.putBlocks(blocks[0..3])
blockexc[1].engine.localStore.putBlocks(blocks[4..7])
blockexc[2].engine.localStore.putBlocks(blocks[8..11])
blockexc[3].engine.localStore.putBlocks(blocks[12..15])
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
await allFutures(
blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) ))
await allFutures(
blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) ))
await allFutures(
blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) ))
await connectNodes(switch)
let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid ))
check wantListBlocks == blocks[0..3]
await sleepAsync(1.seconds)
let wantListBlocks = await allFinished(
blocks[0..3].mapIt( downloader.getBlock(it.cid) ))
check wantListBlocks.mapIt( !it.read ) == blocks[0..3]

View File

@ -17,19 +17,26 @@ import pkg/dagger/utils/asyncheapqueue
import ../helpers
import ../examples
suite "BlockExc engine basic":
suite "NetworkStore engine basic":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
wallet = WalletRef.example
var
blocks: seq[bt.Block]
done: Future[void]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.init(chunk))
done = newFuture[void]()
test "should send want list to new peers":
@ -45,11 +52,15 @@ suite "BlockExc engine basic":
done.complete()
let request = BlockExcRequest(
let
network = BlockExcNetwork(request: BlockExcRequest(
sendWantList: sendWantList,
)
))
let engine = BlockExcEngine.new(MemoryStore.new(blocks), wallet, request)
engine = BlockExcEngine.new(
MemoryStore.new(blocks.mapIt( it )),
wallet,
network)
engine.wantList = blocks.mapIt( it.cid )
engine.setupPeer(peerId)
@ -62,30 +73,41 @@ suite "BlockExc engine basic":
check account.address == pricing.address
done.complete()
let request = BlockExcRequest(sendAccount: sendAccount)
let engine = BlockExcEngine.new(MemoryStore.new, wallet, request)
engine.pricing = pricing.some
let
network = BlockExcNetwork(request: BlockExcRequest(
sendAccount: sendAccount,
))
engine = BlockExcEngine.new(MemoryStore.new, wallet, network)
engine.pricing = pricing.some
engine.setupPeer(peerId)
await done.wait(100.millis)
suite "BlockExc engine handlers":
suite "NetworkStore engine handlers":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
wallet = WalletRef.example
var
engine: BlockExcEngine
peerCtx: BlockExcPeerCtx
done: Future[void]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.init(chunk))
done = newFuture[void]()
engine = BlockExcEngine.new(MemoryStore.new(), wallet)
engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork())
peerCtx = BlockExcPeerCtx(
id: peerId
)
@ -93,15 +115,13 @@ suite "BlockExc engine handlers":
test "should handle want list":
let wantList = makeWantList(blocks.mapIt( it.cid ))
proc taskScheduler(ctx: BlockExcPeerCtx): bool =
proc handler() {.async.} =
let ctx = await engine.taskQueue.pop()
check ctx.id == peerId
check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid )
done.complete()
engine.scheduleTask = taskScheduler
engine.wantListHandler(peerId, wantList)
let done = handler()
await engine.wantListHandler(peerId, wantList)
await done
test "should handle want list - `dont-have`":
@ -114,11 +134,11 @@ suite "BlockExc engine handlers":
done.complete()
engine.request = BlockExcRequest(
engine.network = BlockExcNetwork(request: BlockExcRequest(
sendPresence: sendPresence
)
))
engine.wantListHandler(peerId, wantList)
await engine.wantListHandler(peerId, wantList)
await done
@ -132,9 +152,13 @@ suite "BlockExc engine handlers":
done.complete()
engine.request = BlockExcRequest(sendPresence: sendPresence)
engine.localStore.putBlocks(@[blocks[0], blocks[1]])
engine.wantListHandler(peerId, wantList)
engine.network = BlockExcNetwork(request: BlockExcRequest(
sendPresence: sendPresence
))
check await engine.localStore.putBlock(blocks[0])
check await engine.localStore.putBlock(blocks[1])
await engine.wantListHandler(peerId, wantList)
await done
@ -143,7 +167,7 @@ suite "BlockExc engine handlers":
engine.pendingBlocks.addOrAwait( it.cid )
)
engine.blocksHandler(peerId, blocks)
await engine.blocksHandler(peerId, blocks)
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
for b in blocks:
@ -155,20 +179,22 @@ suite "BlockExc engine handlers":
peerContext.account = account.some
peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable
engine.request.sendPayment = proc(receiver: PeerID, payment: SignedState) =
engine.network = BlockExcNetwork(request: BlockExcRequest(
sendPayment: proc(receiver: PeerID, payment: SignedState) =
let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b)
let balances = !payment.state.outcome.balances(Asset)
check receiver == peerId
check balances[account.address.toDestination] == amount
done.complete()
))
engine.blocksHandler(peerId, blocks)
await engine.blocksHandler(peerId, blocks)
await done.wait(100.millis)
test "should handle block presence":
let price = UInt256.example
engine.blockPresenceHandler(
await engine.blockPresenceHandler(
peerId,
blocks.mapIt(
PresenceMessage.init(
@ -186,8 +212,7 @@ suite "Task Handler":
let
rng = Rng.instance()
chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
chunker = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256)
wallet = WalletRef.example
var
@ -195,10 +220,18 @@ suite "Task Handler":
peersCtx: seq[BlockExcPeerCtx]
peers: seq[PeerID]
done: Future[void]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.init(chunk))
done = newFuture[void]()
engine = BlockExcEngine.new(MemoryStore.new(), wallet)
engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork())
peersCtx = @[]
for i in 0..3:
@ -221,8 +254,9 @@ suite "Task Handler":
blks[1].cid == blocks[0].cid
blks[0].cid == blocks[1].cid
engine.localStore.putBlocks(blocks)
engine.request.sendBlocks = sendBlocks
for blk in blocks:
check await engine.localStore.putBlock(blk)
engine.network.request.sendBlocks = sendBlocks
# second block to send by priority
peersCtx[0].peerWants.add(
@ -248,7 +282,7 @@ suite "Task Handler":
test "Should send presence":
let present = blocks
let missing = @[!bt.Block.new("missing".toBytes)]
let missing = @[bt.Block.init("missing".toBytes)]
let price = (!engine.pricing).price
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) =
@ -258,8 +292,9 @@ suite "Task Handler":
Presence(cid: missing[0].cid, have: false)
]
engine.localStore.putBlocks(blocks)
engine.request.sendPresence = sendPresence
for blk in blocks:
check await engine.localStore.putBlock(blk)
engine.network.request.sendPresence = sendPresence
# have block
peersCtx[0].peerWants.add(

View File

@ -17,24 +17,31 @@ import pkg/dagger/blockexchange
import ../helpers
import ../examples
suite "BlockExc network":
suite "NetworkStore network":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
var
network: BlockExcNetwork
networkPeer: NetworkPeer
buffer: BufferStream
blocks: seq[bt.Block]
done: Future[void]
proc getConn(): Future[Connection] {.async.} =
return Connection(buffer)
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.init(chunk))
done = newFuture[void]()
buffer = BufferStream.new()
network = BlockExcNetwork.new(
@ -45,7 +52,7 @@ suite "BlockExc network":
discard await networkPeer.connect()
test "Want List handler":
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} =
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} =
# check that we got the correct amount of entries
check wantList.entries.len == 4
@ -72,7 +79,7 @@ suite "BlockExc network":
await done.wait(500.millis)
test "Blocks Handler":
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} =
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} =
check blks == blocks
done.complete()
@ -84,7 +91,9 @@ suite "BlockExc network":
await done.wait(500.millis)
test "Presence Handler":
proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} =
proc presenceHandler(
peer: PeerID,
precense: seq[BlockPresence]) {.gcsafe, async.} =
for b in blocks:
check:
b.cid in precense
@ -106,7 +115,7 @@ suite "BlockExc network":
test "handles account messages":
let account = Account(address: EthAddress.example)
proc handleAccount(peer: PeerID, received: Account) =
proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} =
check received == account
done.complete()
@ -120,7 +129,7 @@ suite "BlockExc network":
test "handles payment messages":
let payment = SignedState.example
proc handlePayment(peer: PeerID, received: SignedState) =
proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} =
check received == payment
done.complete()
@ -131,23 +140,29 @@ suite "BlockExc network":
await done.wait(100.millis)
suite "BlockExc Network - e2e":
suite "NetworkStore Network - e2e":
let
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
var
switch1, switch2: Switch
network1, network2: BlockExcNetwork
awaiters: seq[Future[void]]
blocks: seq[bt.Block]
done: Future[void]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.init(chunk))
done = newFuture[void]()
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
await switch1.start()
await switch2.start()
network1 = BlockExcNetwork.new(
switch = switch1)
@ -166,10 +181,8 @@ suite "BlockExc Network - e2e":
switch1.stop(),
switch2.stop())
await allFuturesThrowing(awaiters)
test "broadcast want list":
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} =
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} =
# check that we got the correct amount of entries
check wantList.entries.len == 4
@ -193,7 +206,7 @@ suite "BlockExc Network - e2e":
await done.wait(500.millis)
test "broadcast blocks":
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} =
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} =
check blks == blocks
done.complete()
@ -205,7 +218,9 @@ suite "BlockExc Network - e2e":
await done.wait(500.millis)
test "broadcast precense":
proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} =
proc presenceHandler(
peer: PeerID,
precense: seq[BlockPresence]) {.gcsafe, async.} =
for b in blocks:
check:
b.cid in precense
@ -227,7 +242,7 @@ suite "BlockExc Network - e2e":
test "broadcasts account":
let account = Account(address: EthAddress.example)
proc handleAccount(peer: PeerID, received: Account) =
proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} =
check received == account
done.complete()
@ -240,7 +255,7 @@ suite "BlockExc Network - e2e":
test "broadcasts payment":
let payment = SignedState.example
proc handlePayment(peer: PeerID, received: SignedState) =
proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} =
check received == payment
done.complete()

View File

@ -42,7 +42,7 @@ proc example*(_: type Pricing): Pricing =
proc example*(_: type Block): Block =
let length = rand(4096)
let bytes = newSeqWith(length, rand(uint8))
!Block.new(bytes)
Block.init(bytes)
proc example*(_: type PeerId): PeerID =
let key = PrivateKey.random(Rng.instance[]).get

View File

@ -1,22 +1,14 @@
import pkg/libp2p/varint
import pkg/dagger/chunker
import pkg/dagger/blocktype
import pkg/dagger/blockstream
import pkg/questionable
import pkg/questionable/results
export chunker
import ./helpers/nodeutils
import ./helpers/randomchunker
type
TestStreamProc* = proc(): ?!Block {.raises: [Defect].}
TestStream* = ref object of BlockStreamRef
handler*: TestStreamProc
method nextBlock*(b: TestStream): ?!Block =
b.handler()
export randomchunker, nodeutils
proc lenPrefix*(msg: openArray[byte]): seq[byte] =
## Write `msg` with a varint-encoded length prefix

View File

@ -13,20 +13,22 @@ proc generateNodes*(
blocks: openArray[bt.Block] = [],
secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise,
]): seq[tuple[switch: Switch, blockexc: BlockExc]] =
]): seq[tuple[switch: Switch, blockexc: NetworkStore]] =
for i in 0..<num:
let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
wallet = WalletRef.example
network = BlockExcNetwork.new(switch)
blockexc = BlockExc.new(MemoryStore.new(blocks), wallet, network)
localStore = MemoryStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network)
networkStore = NetworkStore.new(engine, localStore)
switch.mount(network)
# initialize our want lists
blockexc.engine.wantList = blocks.mapIt( it.cid )
engine.wantList = blocks.mapIt( it.cid )
switch.mount(network)
result.add((switch, blockexc))
result.add((switch, networkStore))
proc connectNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes:

View File

@ -0,0 +1,49 @@
import std/sequtils
import pkg/chronos
import pkg/dagger/chunker
import pkg/dagger/rng
export chunker
type
RandomChunker* = Chunker
proc new*(
T: type RandomChunker,
rng: Rng,
kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize,
size: int,
pad = false): T =
## create a chunker that produces
## random data
##
var consumed = 0
proc reader(data: ChunkBuffer, len: int): Future[int]
{.async, gcsafe, raises: [Defect].} =
var alpha = toSeq(byte('A')..byte('z'))
if consumed >= size:
return 0
var read = 0
while read < len:
rng.shuffle(alpha)
for a in alpha:
if read >= len:
break
data[read] = a
read.inc
consumed += read
return read
Chunker.new(
kind = ChunkerType.FixedChunker,
reader = reader,
pad = pad,
chunkSize = chunkSize)

View File

@ -1,84 +0,0 @@
import std/sequtils
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/questionable
import pkg/questionable/results
import pkg/dagger/rng
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import ../helpers
suite "Memory Store":
var store: MemoryStore
var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
var blocks = chunker.mapIt( !Block.new(it) )
setup:
store = MemoryStore.new(blocks)
test "getBlocks single":
let blk = await store.getBlocks(@[blocks[0].cid])
check blk[0] == blocks[0]
test "getBlocks multiple":
let blk = await store.getBlocks(blocks[0..2].mapIt( it.cid ))
check blk == blocks[0..2]
test "hasBlock":
check store.hasBlock(blocks[0].cid)
test "delBlocks single":
let blks = blocks[1..3].mapIt( it.cid )
store.delBlocks(blks)
check not store.hasBlock(blks[0])
check not store.hasBlock(blks[1])
check not store.hasBlock(blks[2])
test "add blocks change handler":
let blocks = @[
!Block.new("Block 1".toBytes),
!Block.new("Block 2".toBytes),
!Block.new("Block 3".toBytes),
]
var triggered = false
store.addChangeHandler(
proc(evt: BlockStoreChangeEvt) =
check evt.kind == ChangeType.Added
check evt.cids == blocks.mapIt( it.cid )
triggered = true
, ChangeType.Added
)
store.putBlocks(blocks)
check triggered
test "add blocks change handler":
let blocks = @[
!Block.new("Block 1".toBytes),
!Block.new("Block 2".toBytes),
!Block.new("Block 3".toBytes),
]
var triggered = false
store.addChangeHandler(
proc(evt: BlockStoreChangeEvt) =
check evt.kind == ChangeType.Removed
check evt.cids == blocks.mapIt( it.cid )
triggered = true
, ChangeType.Removed
)
store.putBlocks(blocks)
check store.hasBlock(blocks[0].cid)
check store.hasBlock(blocks[1].cid)
check store.hasBlock(blocks[2].cid)
store.delBlocks(blocks.mapIt( it.cid ))
check triggered

View File

@ -0,0 +1,65 @@
import std/sequtils
import std/os
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/dagger/rng
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import pkg/dagger/stores
import ../helpers
suite "FS Store":
let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
var
store: FSStore
repoDir: string
newBlock = Block.init("New Block".toBytes())
setup:
repoDir = path.parentDir / "repo"
createDir(repoDir)
store = FSStore.new(repoDir)
teardown:
removeDir(repoDir)
test "putBlock":
check await store.putBlock(newBlock)
check fileExists(store.blockPath(newBlock.cid))
check newBlock.cid in store
test "getBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
let blk = await store.getBlock(newBlock.cid)
check blk.option == newBlock.some
test "fail getBlock":
let blk = await store.getBlock(newBlock.cid)
check blk.isErr
test "hasBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
check store.hasBlock(newBlock.cid)
test "fail hasBlock":
check not store.hasBlock(newBlock.cid)
test "delBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
check await store.delBlock(newBlock.cid)
check not fileExists(store.blockPath(newBlock.cid))

View File

@ -0,0 +1,61 @@
import std/sequtils
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/questionable
import pkg/questionable/results
import pkg/dagger/rng
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import ../helpers
suite "Memory Store tests":
test "putBlock":
let
newBlock = Block.init("New Block".toBytes())
store = MemoryStore.new()
check await store.putBlock(newBlock)
check newBlock.cid in store
test "getBlock":
let
newBlock = Block.init("New Block".toBytes())
store = MemoryStore.new(@[newBlock])
let blk = await store.getBlock(newBlock.cid)
check blk.isOk
check blk == newBlock.success
test "fail getBlock":
let
newBlock = Block.init("New Block".toBytes())
store = MemoryStore.new(@[])
let blk = await store.getBlock(newBlock.cid)
check blk.isErr
test "hasBlock":
let
newBlock = Block.init("New Block".toBytes())
store = MemoryStore.new(@[newBlock])
check store.hasBlock(newBlock.cid)
test "fail hasBlock":
let
newBlock = Block.init("New Block".toBytes())
store = MemoryStore.new(@[])
check not store.hasBlock(newBlock.cid)
test "delBlock":
let
newBlock = Block.init("New Block".toBytes())
store = MemoryStore.new(@[newBlock])
check await store.delBlock(newBlock.cid)
check newBlock.cid notin store

View File

@ -3,5 +3,6 @@ import ./blockexc/testnetwork
import ./blockexc/protobuf/testpayments as testprotobufpayments
import ./blockexc/protobuf/testpresence
import ./blockexc/engine/testpayments as testenginepayments
import ./blockexc/testblockexc
{.warning[UnusedImport]: off.}

View File

@ -1,50 +0,0 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils as stew
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import pkg/dagger/blockstream
import pkg/dagger/blockset
import ./helpers
suite "BlockSet":
test "Should produce valid tree hash checksum":
let
blocks = @[
!Block.new("Block 1".toBytes),
!Block.new("Block 2".toBytes),
!Block.new("Block 3".toBytes),
!Block.new("Block 4".toBytes),
!Block.new("Block 5".toBytes),
!Block.new("Block 6".toBytes),
!Block.new("Block 7".toBytes),
]
checksum = @[byte(43), 2, 105, 202, 45, 227,
178, 211, 83, 246, 56, 250, 210,
160, 210, 98, 123, 87, 139, 157,
188, 221, 252, 255, 17, 11, 79,
85, 220, 161, 238, 108]
var idx = 0
proc nextBlockHandler(): ?!Block =
let blk = if idx < blocks.len: blocks[idx] else: return
idx.inc()
return success blk
let
blockStream = TestStream(handler: nextBlockHandler)
blockSet = BlockSetRef.new(stream = blockStream)
let res = blockSet.treeHash()
check res.isOK
if h =? res:
check h.hashBytes() == checksum
return
check false

View File

@ -1,36 +1,68 @@
import std/unittest
import pkg/asynctest
import pkg/stew/byteutils
import pkg/dagger/chunker
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
suite "Chunking":
test "should return proper size chunks":
proc reader(data: var openArray[byte], offset: Natural = 0): int
{.gcsafe, closure, raises: [Defect].} =
let contents = "1234567890".toBytes
copyMem(addr data[0], unsafeAddr contents[offset], data.len)
return data.len
var offset = 0
let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0]
proc reader(data: ChunkBuffer, len: int): Future[int]
{.gcsafe, async, raises: [Defect].} =
if offset >= contents.len:
return 0
copyMem(data, unsafeAddr contents[offset], len)
offset += 2
return len
let chunker = Chunker.new(
reader = reader,
size = 10,
chunkSize = 2)
check chunker.getBytes() == "12".toBytes
check chunker.getBytes() == "34".toBytes
check chunker.getBytes() == "56".toBytes
check chunker.getBytes() == "78".toBytes
check chunker.getBytes() == "90".toBytes
check chunker.getBytes() == "".toBytes
check:
(await chunker.getBytes()) == [1.byte, 2]
(await chunker.getBytes()) == [3.byte, 4]
(await chunker.getBytes()) == [5.byte, 6]
(await chunker.getBytes()) == [7.byte, 8]
(await chunker.getBytes()) == [9.byte, 0]
(await chunker.getBytes()) == []
test "should chunk LPStream":
var offset = 0
let stream = BufferStream.new()
let chunker = LPStreamChunker.new(
stream = stream,
chunkSize = 2)
proc writer() {.async.} =
for d in [@[1.byte, 2, 3, 4], @[5.byte, 6, 7, 8], @[9.byte, 0]]:
await stream.pushData(d)
await stream.pushEof()
await stream.close()
let writerFut = writer()
check:
(await chunker.getBytes()) == [1.byte, 2]
(await chunker.getBytes()) == [3.byte, 4]
(await chunker.getBytes()) == [5.byte, 6]
(await chunker.getBytes()) == [7.byte, 8]
(await chunker.getBytes()) == [9.byte, 0]
(await chunker.getBytes()) == []
await writerFut
test "should chunk file":
let (fileName, _, _) = instantiationInfo() # get this file's name
let path = "tests/dagger/" & filename
let file = open(path)
let fileChunker = newFileChunker(file = file)
let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
file = open(path)
fileChunker = FileChunker.new(file = file, chunkSize = 256)
var data: seq[byte]
while true:
let buff = fileChunker.getBytes()
let buff = await fileChunker.getBytes()
if buff.len <= 0:
break

View File

@ -0,0 +1,54 @@
import std/sequtils
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import pkg/dagger/manifest
import ./helpers
suite "Manifest":
test "Should produce valid tree hash checksum":
without var manifest =? BlocksManifest.init(
blocks = @[
Block.init("Block 1".toBytes).cid,
Block.init("Block 2".toBytes).cid,
Block.init("Block 3".toBytes).cid,
Block.init("Block 4".toBytes).cid,
Block.init("Block 5".toBytes).cid,
Block.init("Block 6".toBytes).cid,
Block.init("Block 7".toBytes).cid,
]):
fail()
let
checksum = @[18.byte, 32, 14, 78, 178, 161,
50, 175, 26, 57, 68, 6, 163, 128,
19, 131, 212, 203, 93, 98, 219,
34, 243, 217, 132, 191, 86, 255,
171, 160, 77, 167, 91, 145]
var mh: MultiHash
check MultiHash.decode(checksum, mh).get() > 0
let checkSumCid = Cid.init(manifest.version, manifest.codec, mh).get()
check checkSumCid == !(manifest.cid)
test "Should encode/decode to/from manifest":
let
blocks = (0..<1000).mapIt( Block.init(("Block " & $it).toBytes).cid )
var
manifest = BlocksManifest.init(blocks).get()
let
e = manifest.encode().get()
(cid, decoded) = BlocksManifest.decode(e).get()
check decoded == blocks

138
tests/dagger/testnode.nim Normal file
View File

@ -0,0 +1,138 @@
import std/os
import std/options
import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import pkg/nitro
import pkg/libp2p
import pkg/libp2p/errors
import pkg/dagger/stores
import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/node
import pkg/dagger/manifest
import pkg/dagger/blocktype as bt
import ./helpers
suite "Test Node":
let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
var
file: File
chunker: Chunker
switch: Switch
wallet: WalletRef
network: BlockExcNetwork
localStore: MemoryStore
engine: BlockExcEngine
store: NetworkStore
node: DaggerNodeRef
setup:
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
chunker = FileChunker.new(file = file)
switch = newStandardSwitch()
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = MemoryStore.new()
engine = BlockExcEngine.new(localStore, wallet, network)
store = NetworkStore.new(engine, localStore)
node = DaggerNodeRef.new(switch, store, engine)
await node.start()
teardown:
close(file)
await node.stop()
test "Store Data Stream":
let
stream = BufferStream.new()
storeFut = node.store(stream)
var
manifest = BlocksManifest.init().tryGet()
try:
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
await stream.pushData(chunk)
manifest.put(bt.Block.init(chunk).cid)
finally:
await stream.pushEof()
await stream.close()
let
manifestCid = (await storeFut).tryGet()
check:
manifestCid in localStore
var
manifestBlock = (await localStore.getBlock(manifestCid)).get()
localManifest = BlocksManifest.init(manifestBlock).tryGet()
check:
manifest.len == localManifest.len
manifest.cid == localManifest.cid
test "Retrieve Data Stream":
var
manifest = BlocksManifest.init().tryGet()
original: seq[byte]
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.init(chunk)
original &= chunk
check await localStore.putBlock(blk)
manifest.put(blk.cid)
let
manifestBlock = bt.Block.init(
manifest.encode().tryGet(),
codec = ManifestCodec)
check await localStore.putBlock(manifestBlock)
let stream = BufferStream.new()
check (await node.retrieve(stream, manifestBlock.cid)).isOk
var data: seq[byte]
while true:
var
buf = newSeq[byte](FileChunkSize)
res = await stream.readOnce(addr buf[0], buf.len)
if res <= 0:
break
buf.setLen(res)
data &= buf
check data == original
test "Retrieve One Block":
let
testString = "Block 1"
blk = bt.Block.init(testString.toBytes)
var
stream = BufferStream.new()
check (await localStore.putBlock(blk))
check (await node.retrieve(stream, blk.cid)).isOk
var data = newSeq[byte](testString.len)
await stream.readExactly(addr data[0], data.len)
check string.fromBytes(data) == testString

View File

@ -1,4 +1,4 @@
import ./stores/testblockexc
import ./stores/testblockstore
import ./stores/testfsstore
import ./stores/testmemorystore
{.warning[UnusedImport]: off.}

BIN
tests/fixtures/test.jpg vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 MiB

View File

@ -2,6 +2,7 @@ import ./dagger/teststores
import ./dagger/testblockexc
import ./dagger/testasyncheapqueue
import ./dagger/testchunking
import ./dagger/testblockset
import ./dagger/testmanifest
import ./dagger/testnode
{.warning[UnusedImport]: off.}

1
vendor/dnsclient.nim vendored Submodule

@ -0,0 +1 @@
Subproject commit 536cc6b7933e5f86590bb27083c0ffeab31255f9

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 37c62af57951b4afb9c653d4d8f86ed2bdee10f0
Subproject commit 7dc58d42b6905a7fd7531875fa76060f8f744e4e

1
vendor/nim-confutils vendored Submodule

@ -0,0 +1 @@
Subproject commit ab4ba1cbfdccdb8c0398894ffc25169bc61faeed

1
vendor/nim-http-utils vendored Submodule

@ -0,0 +1 @@
Subproject commit 689da19e9e9cfff4ced85e2b25c6b2b5598ed079

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 6f779c47c83f8be9d98958a08c4b49508fb05767
Subproject commit 58f383e661521314df314e7096c24db5a7490372

2
vendor/nim-presto vendored

@ -1 +1 @@
Subproject commit bfcbeceb65d90b47af9adc94dcb671d7dd827443
Subproject commit c41bc8aefc7e5342eb927f874140b80d0e989a95

1
vendor/nim-toml-serialization vendored Submodule

@ -0,0 +1 @@
Subproject commit 4e15e00ed9e27a8d28b40b69ef06c6a4a388ae93

1
vendor/nim-unittest2 vendored Submodule

@ -0,0 +1 @@
Subproject commit 02c49b8a994dd3f9eddfaab45262f9b8fa507f8e

1
vendor/nim-websock vendored Submodule

@ -0,0 +1 @@
Subproject commit a697e3585d583ab6b91a159ea7d023461002c927

@ -1 +1 @@
Subproject commit 3b83e229432979636702e506a7634325eccb3bb0
Subproject commit 25a4c270330026442e09f6cddfb7a944da7cfa4b