diff --git a/.gitmodules b/.gitmodules index 0c36c07c..362a5f68 100644 --- a/.gitmodules +++ b/.gitmodules @@ -118,3 +118,40 @@ url = https://github.com/status-im/stint.git ignore = untracked branch = master +[submodule "vendor/nim-httputils"] + ignore = untracked + branch = master +[submodule "vendor/nim-http-utils"] + path = vendor/nim-http-utils + url = https://github.com/status-im/nim-http-utils.git + ignore = untracked + branch = master +[submodule "vendor/nim-toml-serialization"] + path = vendor/nim-toml-serialization + url = https://github.com/status-im/nim-toml-serialization.git + ignore = untracked + branch = master +[submodule "vendor/unittest2"] + ignore = untracked + branch = master +[submodule "vendor/nim-unittest2"] + path = vendor/nim-unittest2 + url = https://github.com/status-im/nim-unittest2.git + ignore = untracked + branch = master +[submodule "vendor/nameresolver"] + ignore = untracked + branch = master +[submodule "vendor/nim-nameresolver"] + ignore = untracked + branch = master +[submodule "vendor/dnsclient.nim"] + path = vendor/dnsclient.nim + url = https://github.com/ba0f3/dnsclient.nim.git + ignore = untracked + branch = master +[submodule "vendor/nim-websock"] + path = vendor/nim-websock + url = https://github.com/status-im/nim-websock.git + ignore = untracked + branch = master diff --git a/config.nims b/config.nims index 0abfe838..dfcb1f7a 100644 --- a/config.nims +++ b/config.nims @@ -32,6 +32,7 @@ else: # ("-fno-asynchronous-unwind-tables" breaks Nim's exception raising, sometimes) switch("passC", "-mno-avx512vl") +--tlsEmulation:off --threads:on --opt:speed --excessiveStackTrace:on diff --git a/dagger.nim b/dagger.nim index e69de29b..25c4e1fa 100644 --- a/dagger.nim +++ b/dagger.nim @@ -0,0 +1,77 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronicles +import pkg/chronos +import pkg/confutils +import pkg/libp2p + +import ./dagger/conf +import ./dagger/dagger + +export dagger, conf, libp2p, chronos, chronicles + +when isMainModule: + import std/os + + import pkg/confutils/defs + + import ./dagger/utils/fileutils + + when defined(posix): + import system/ansi_c + + let + config = DaggerConf.load() + + case config.cmd: + of StartUpCommand.noCommand: + + if not(checkAndCreateDataDir((config.dataDir).string)): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + quit QuitFailure + + trace "Data dir initialized", dir = config.dataDir + + if not(checkAndCreateDataDir((config.dataDir / "repo").string)): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + quit QuitFailure + + trace "Repo dir initialized", dir = config.dataDir / "repo" + + let server = DaggerServer.new(config) + + ## Ctrl+C handling + proc controlCHandler() {.noconv.} = + when defined(windows): + # workaround for https://github.com/nim-lang/Nim/issues/4057 + try: + setupForeignThreadGc() + except Exception as exc: raiseAssert exc.msg # shouldn't happen + notice "Shutting down after having received SIGINT" + waitFor server.shutdown() + + try: + setControlCHook(controlCHandler) + except Exception as exc: # TODO Exception + warn "Cannot set ctrl-c handler", msg = exc.msg + + # equivalent SIGTERM handler + when defined(posix): + proc SIGTERMHandler(signal: cint) {.noconv.} = + notice "Shutting down after having received SIGTERM" + waitFor server.shutdown() + + c_signal(SIGTERM, SIGTERMHandler) + + waitFor server.run() + of StartUpCommand.initNode: + discard diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index 1ea987a4..e16a8523 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -32,23 +32,30 @@ logScope: topics = "dagger blockexc engine" const - DefaultTimeout* = 5.seconds + DefaultBlockTimeout* = 5.minutes DefaultMaxPeersPerRequest* = 10 + DefaultTaskQueueSize = 100 + DefaultConcurrentTasks = 10 + DefaultMaxRetries = 3 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} BlockExcEngine* = ref object of RootObj - localStore*: BlockStore # where we localStore blocks for this instance - peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with - wantList*: seq[Cid] # local wants list - pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved - peersPerRequest: int # max number of peers to request from - scheduleTask*: TaskScheduler # schedule a new task with the task runner - request*: BlockExcRequest # block exchange network requests - wallet*: WalletRef # nitro wallet for micropayments - pricing*: ?Pricing # optional bandwidth pricing + localStore*: BlockStore # where we localStore blocks for this instance + network*: BlockExcNetwork # network interface + peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with + wantList*: seq[Cid] # local wants list + taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for + concurrentTasks: int # number of concurrent peers we're serving at any given time + maxRetries: int # max number of tries for a failed block + blockexcTasks: seq[Future[void]] # future to control blockexc task + blockexcRunning: bool # indicates if the blockexc task is running + pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved + peersPerRequest: int # max number of peers to request from + wallet*: WalletRef # nitro wallet for micropayments + pricing*: ?Pricing # optional bandwidth pricing Pricing* = object address*: EthAddress @@ -68,28 +75,58 @@ proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx = if peer.len > 0: return peer[0] -proc requestBlocks*( +# attach task scheduler to engine +proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = + b.taskQueue.pushOrUpdateNoWait(task).isOk() + +proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} + +proc start*(b: BlockExcEngine) {.async.} = + ## Start the blockexc task + ## + + trace "blockexc start" + + if b.blockexcRunning: + warn "Starting blockexc twice" + return + + b.blockexcRunning = true + for i in 0.. b.peersPerRequest: stop = b.peersPerRequest + let stop = min(peers.high, b.peersPerRequest) trace "Sending want list requests to remaining peers", count = stop + 1 for p in peers[0..stop]: - sendWants(p) + if cid notin p.peerHave: + # just send wants + b.network.request.sendWantList( + p.id, + @[cid], + wantType = WantType.wantHave) # we only want to know if the peer has the block - return blocks + return blk proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerID, - blocks: seq[BlockPresence]) = + blocks: seq[BlockPresence]) {.async.} = ## Handle block presence ## @@ -181,7 +211,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, blocks: seq[bt.Block]) = - let sendPayment = engine.request.sendPayment + let sendPayment = engine.network.request.sendPayment if sendPayment.isNil: return @@ -192,14 +222,17 @@ proc payForBlocks(engine: BlockExcEngine, proc blocksHandler*( b: BlockExcEngine, peer: PeerID, - blocks: seq[bt.Block]) = + blocks: seq[bt.Block]) {.async.} = ## handle incoming blocks ## trace "Got blocks from peer", peer, len = blocks.len - b.localStore.putBlocks(blocks) - b.resolveBlocks(blocks) + for blk in blocks: + if not (await b.localStore.putBlock(blk)): + trace "Unable to store block", cid = blk.cid + continue + b.resolveBlocks(blocks) let peerCtx = b.getPeerCtx(peer) if peerCtx != nil: b.payForBlocks(peerCtx, blocks) @@ -207,7 +240,7 @@ proc blocksHandler*( proc wantListHandler*( b: BlockExcEngine, peer: PeerID, - wantList: WantList) = + wantList: WantList) {.async.} = ## Handle incoming want lists ## @@ -234,12 +267,12 @@ proc wantListHandler*( # peer might want to ask for the same cid with # different want params - if e.sendDontHave and not(b.localStore.hasBlock(e.cid)): + if e.sendDontHave and e.cid notin b.localStore: dontHaves.add(e.cid) # send don't have's to remote if dontHaves.len > 0: - b.request.sendPresence( + b.network.request.sendPresence( peer, dontHaves.mapIt( BlockPresence( @@ -249,14 +282,14 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer -proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) = +proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} = let context = engine.getPeerCtx(peer) if context.isNil: return context.account = account.some -proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) = +proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = without context =? engine.getPeerCtx(peer).option and account =? context.account: return @@ -280,10 +313,10 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = # broadcast our want list, the other peer will do the same if b.wantList.len > 0: - b.request.sendWantList(peer, b.wantList, full = true) + b.network.request.sendWantList(peer, b.wantList, full = true) if address =? b.pricing.?address: - b.request.sendAccount(peer, Account(address: address)) + b.network.request.sendAccount(peer, Account(address: address)) proc dropPeer*(b: BlockExcEngine, peer: PeerID) = ## Cleanup disconnected peer @@ -306,13 +339,18 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # TODO: There should be all sorts of accounting of # bytes sent/received here if wantsBlocks.len > 0: - let blocks = await b.localStore.getBlocks( - wantsBlocks.mapIt( - it.cid + let blockFuts = await allFinished(wantsBlocks.mapIt( + b.localStore.getBlock(it.cid) )) + let blocks = blockFuts + .filterIt((not it.failed) and it.read.isOk) + .mapIt(!it.read) + if blocks.len > 0: - b.request.sendBlocks(task.id, blocks) + b.network.request.sendBlocks( + task.id, + blocks) # Remove successfully sent blocks task.peerWants.keepIf( @@ -330,28 +368,76 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = if presence.have and price =? b.pricing.?price: presence.price = price wants.add(BlockPresence.init(presence)) - if wants.len > 0: - b.request.sendPresence(task.id, wants) -func new*( + if wants.len > 0: + b.network.request.sendPresence(task.id, wants) + +proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = + ## process tasks + ## + + while b.blockexcRunning: + let peerCtx = await b.taskQueue.pop() + asyncSpawn b.taskHandler(peerCtx) + + trace "Exiting blockexc task runner" + +proc new*( T: type BlockExcEngine, localStore: BlockStore, wallet: WalletRef, - request: BlockExcRequest = BlockExcRequest(), - scheduleTask: TaskScheduler = nil, + network: BlockExcNetwork, + concurrentTasks = DefaultConcurrentTasks, + maxRetries = DefaultMaxRetries, peersPerRequest = DefaultMaxPeersPerRequest): T = - proc taskScheduler(task: BlockExcPeerCtx): bool = - if not isNil(scheduleTask): - return scheduleTask(task) - - let b = BlockExcEngine( + let engine = BlockExcEngine( localStore: localStore, pendingBlocks: PendingBlocksManager.new(), peersPerRequest: peersPerRequest, - scheduleTask: taskScheduler, - request: request, - wallet: wallet + network: network, + wallet: wallet, + concurrentTasks: concurrentTasks, + maxRetries: maxRetries, + taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize)) + + proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = + if event.kind == PeerEventKind.Joined: + engine.setupPeer(peerId) + else: + engine.dropPeer(peerId) + + if not isNil(network.switch): + network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) + network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + + proc blockWantListHandler( + peer: PeerID, + wantList: WantList): Future[void] {.gcsafe.} = + engine.wantListHandler(peer, wantList) + + proc blockPresenceHandler( + peer: PeerID, + presence: seq[BlockPresence]): Future[void] {.gcsafe.} = + engine.blockPresenceHandler(peer, presence) + + proc blocksHandler( + peer: PeerID, + blocks: seq[bt.Block]): Future[void] {.gcsafe.} = + engine.blocksHandler(peer, blocks) + + proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = + engine.accountHandler(peer, account) + + proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = + engine.paymentHandler(peer, payment) + + network.handlers = BlockExcHandlers( + onWantList: blockWantListHandler, + onBlocks: blocksHandler, + onPresence: blockPresenceHandler, + onAccount: accountHandler, + onPayment: paymentHandler ) - return b + return engine diff --git a/dagger/blockexchange/network.nim b/dagger/blockexchange/network.nim index 1db4395e..e046e2bd 100644 --- a/dagger/blockexchange/network.nim +++ b/dagger/blockexchange/network.nim @@ -29,11 +29,11 @@ logScope: const Codec* = "/dagger/blockexc/1.0.0" type - WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.} - BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.} - BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} - AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.} - PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} + WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.} + BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]): Future[void] {.gcsafe.} + BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.} + AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.} + PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.} BlockExcHandlers* = object onWantList*: WantListHandler @@ -73,7 +73,7 @@ type proc handleWantList( b: BlockExcNetwork, peer: NetworkPeer, - list: WantList) = + list: WantList): Future[void] = ## Handle incoming want list ## @@ -126,12 +126,12 @@ proc broadcastWantList*( wantType, full, sendDontHave) - asyncSpawn b.peers[id].send(Message(wantlist: wantList)) + b.peers[id].broadcast(Message(wantlist: wantList)) proc handleBlocks( b: BlockExcNetwork, peer: NetworkPeer, - blocks: seq[auto]) = + blocks: seq[auto]): Future[void] = ## Handle incoming blocks ## @@ -143,11 +143,9 @@ proc handleBlocks( var blks: seq[bt.Block] for blk in blocks: when blk is pb.Block: - if b =? bt.Block.new(Cid.init(blk.prefix).get(), blk.data): - blks.add(b) + blks.add(bt.Block.init(Cid.init(blk.prefix).get(), blk.data)) elif blk is seq[byte]: - if b =? bt.Block.new(Cid.init(blk).get(), blk): - blks.add(b) + blks.add(bt.Block.init(Cid.init(blk).get(), blk)) else: error("Invalid block type") @@ -176,12 +174,12 @@ proc broadcastBlocks*( return trace "Sending blocks to peer", peer = id, len = blocks.len - asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks))) + b.peers[id].broadcast(pb.Message(payload: makeBlocks(blocks))) proc handleBlockPresence( b: BlockExcNetwork, peer: NetworkPeer, - presence: seq[BlockPresence]) = + presence: seq[BlockPresence]): Future[void] = ## Handle block presence ## @@ -202,11 +200,11 @@ proc broadcastBlockPresence*( return trace "Sending presence to peer", peer = id - asyncSpawn b.peers[id].send(Message(blockPresences: presence)) + b.peers[id].broadcast(Message(blockPresences: presence)) proc handleAccount(network: BlockExcNetwork, peer: NetworkPeer, - account: Account) = + account: Account): Future[void] = if network.handlers.onAccount.isNil: return network.handlers.onAccount(peer.id, account) @@ -218,7 +216,7 @@ proc broadcastAccount*(network: BlockExcNetwork, return let message = Message(account: AccountMessage.init(account)) - asyncSpawn network.peers[id].send(message) + network.peers[id].broadcast(message) proc broadcastPayment*(network: BlockExcNetwork, id: PeerId, @@ -227,11 +225,11 @@ proc broadcastPayment*(network: BlockExcNetwork, return let message = Message(payment: StateChannelUpdate.init(payment)) - asyncSpawn network.peers[id].send(message) + network.peers[id].broadcast(message) proc handlePayment(network: BlockExcNetwork, peer: NetworkPeer, - payment: SignedState) = + payment: SignedState): Future[void] = if network.handlers.onPayment.isNil: return network.handlers.onPayment(peer.id, payment) @@ -239,19 +237,19 @@ proc handlePayment(network: BlockExcNetwork, proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} = try: if msg.wantlist.entries.len > 0: - b.handleWantList(peer, msg.wantlist) + await b.handleWantList(peer, msg.wantlist) if msg.payload.len > 0: - b.handleBlocks(peer, msg.payload) + await b.handleBlocks(peer, msg.payload) if msg.blockPresences.len > 0: - b.handleBlockPresence(peer, msg.blockPresences) + await b.handleBlockPresence(peer, msg.blockPresences) if account =? Account.init(msg.account): - b.handleAccount(peer, account) + await b.handleAccount(peer, account) if payment =? SignedState.init(msg.payment): - b.handlePayment(peer, payment) + await b.handlePayment(peer, payment) except CatchableError as exc: trace "Exception in blockexc rpc handler", exc = exc.msg diff --git a/dagger/blockexchange/networkpeer.nim b/dagger/blockexchange/networkpeer.nim index 7e011021..269291cc 100644 --- a/dagger/blockexchange/networkpeer.nim +++ b/dagger/blockexchange/networkpeer.nim @@ -38,9 +38,10 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = try: while not conn.atEof: - let data = await conn.readLp(MaxMessageSize) - let msg: Message = Protobuf.decode(data, Message) - trace "Got message for peer", peer = b.id, msg + let + data = await conn.readLp(MaxMessageSize) + msg: Message = Protobuf.decode(data, Message) + trace "Got message for peer", peer = b.id await b.handler(b, msg) except CatchableError as exc: trace "Exception in blockexc read loop", exc = exc.msg @@ -62,9 +63,18 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = trace "Unable to get send connection for peer message not sent", peer = b.id return - trace "Sending message to remote", peer = b.id, msg = $msg + trace "Sending message to remote", peer = b.id await conn.writeLp(Protobuf.encode(msg)) +proc broadcast*(b: NetworkPeer, msg: Message) = + proc sendAwaiter() {.async.} = + try: + await b.send(msg) + except CatchableError as exc: + trace "Exception broadcasting message to peer", peer = b.id, exc = exc.msg + + asyncSpawn sendAwaiter() + func new*( T: type NetworkPeer, peer: PeerId, diff --git a/dagger/blockexchange/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim index b783c0d1..3b426bb8 100644 --- a/dagger/blockexchange/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -9,6 +9,7 @@ import std/tables +import pkg/questionable import pkg/chronicles import pkg/chronos import pkg/libp2p @@ -24,8 +25,7 @@ type proc addOrAwait*( p: PendingBlocksManager, - cid: Cid): - Future[Block] {.async.} = + cid: Cid): Future[Block] {.async.} = ## Add an event for a block ## @@ -33,9 +33,8 @@ proc addOrAwait*( p.blocks[cid] = newFuture[Block]() trace "Adding pending future for block", cid - let blk = p.blocks[cid] try: - return await blk + return await p.blocks[cid] except CancelledError as exc: trace "Blocks cancelled", exc = exc.msg, cid raise exc diff --git a/dagger/blockset.nim b/dagger/blockset.nim deleted file mode 100644 index 2f323c1f..00000000 --- a/dagger/blockset.nim +++ /dev/null @@ -1,62 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -{.push raises: [Defect].} - -import pkg/libp2p -import pkg/questionable -import pkg/questionable/results - -import ./blockstream -export blockstream - -type - BlockSetRef* = ref object of BlockStreamRef - stream*: BlockStreamRef - hcodec*: MultiCodec - -proc hashBytes*(mh: MultiHash): seq[byte] = - mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)] - -proc hashBytes*(b: Block): seq[byte] = - if mh =? b.cid.mhash: - return mh.hashBytes() - -method nextBlock*(d: BlockSetRef): ?!Block = - d.stream.nextBlock() - -proc treeHash*(d: BlockSetRef): ?!MultiHash = - var - stack: seq[seq[byte]] - - while true: - let (blk1, blk2) = (d.nextBlock().option, d.nextBlock().option) - if blk1.isNone and blk2.isNone and stack.len == 1: - let res = MultiHash.digest($d.hcodec, stack[0]) - if mh =? res: - return success mh - - return failure($res.error) - - if blk1.isSome: stack.add((!blk1).hashBytes()) - if blk2.isSome: stack.add((!blk2).hashBytes()) - - while stack.len > 1: - let (b1, b2) = (stack.pop(), stack.pop()) - let res = MultiHash.digest($d.hcodec, b1 & b2) - if mh =? res: - stack.add(mh.hashBytes()) - else: - return failure($res.error) - -func new*( - T: type BlockSetRef, - stream: BlockStreamRef, - hcodec: MultiCodec = multiCodec("sha2-256")): T = - T(stream: stream, hcodec: hcodec) diff --git a/dagger/blockstream.nim b/dagger/blockstream.nim deleted file mode 100644 index cb7601ce..00000000 --- a/dagger/blockstream.nim +++ /dev/null @@ -1,12 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import ./blockstream/[blockstream, chunkedblockstream] - -export blockstream, chunkedblockstream diff --git a/dagger/blockstream/blockstream.nim b/dagger/blockstream/blockstream.nim deleted file mode 100644 index fd342c9c..00000000 --- a/dagger/blockstream/blockstream.nim +++ /dev/null @@ -1,29 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -{.push raises: [Defect].} - -import pkg/questionable -import pkg/questionable/results -import ../blocktype - -export blocktype - -type - BlockStreamRef* = ref object of RootObj - -method nextBlock*(b: BlockStreamRef): ?!Block {.base.} = - doAssert(false, "Not implemented!") - -iterator items*(b: BlockStreamRef): Block = - while true: - without blk =? b.nextBlock(): - break - - yield blk diff --git a/dagger/blockstream/chunkedblockstream.nim b/dagger/blockstream/chunkedblockstream.nim deleted file mode 100644 index beddd77e..00000000 --- a/dagger/blockstream/chunkedblockstream.nim +++ /dev/null @@ -1,28 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -{.push raises: [Defect].} - -import pkg/questionable -import pkg/questionable/results - -import ./blockstream -import ../chunker - -type - ChunkedBlockStreamRef* = ref object of BlockStreamRef - chunker*: Chunker - -method nextBlock*(c: ChunkedBlockStreamRef): ?!Block = - let data: seq[byte] = c.chunker.getBytes() - if data.len > 0: - return Block.new(data) - -func new*(T: type ChunkedBlockStreamRef, chunker: Chunker): T = - T(chunker: chunker) diff --git a/dagger/blocktype.nim b/dagger/blocktype.nim index c4e65afd..1d53e967 100644 --- a/dagger/blocktype.nim +++ b/dagger/blocktype.nim @@ -10,8 +10,6 @@ {.push raises: [Defect].} import pkg/libp2p -import pkg/questionable -import pkg/questionable/results import pkg/stew/byteutils type @@ -23,31 +21,22 @@ proc `$`*(b: Block): string = result &= "cid: " & $b.cid result &= "\ndata: " & string.fromBytes(b.data) -func new*( +func init*( T: type Block, data: openArray[byte] = [], version = CIDv1, hcodec = multiCodec("sha2-256"), - codec = multiCodec("raw")): ?!T = + codec = multiCodec("raw")): T = let hash = MultiHash.digest($hcodec, data).get() - success Block( + Block( cid: Cid.init(version, codec, hash).get(), data: @data) -func new*( +func init*( T: type Block, cid: Cid, data: openArray[byte] = [], - verify: bool = false): ?!T = - let res = Block.new( - data, - cid.cidver, - cid.mhash.get().mcodec, - cid.mcodec - ) - - if b =? res: - if verify and cid != b.cid: - return failure("The suplied Cid doesn't match the data!") - - res + verify: bool = false): T = + Block( + cid: cid, + data: @data) diff --git a/dagger/chunker.nim b/dagger/chunker.nim index c1922f10..65c8fbb4 100644 --- a/dagger/chunker.nim +++ b/dagger/chunker.nim @@ -11,12 +11,12 @@ {.push raises: [Defect].} -import std/sequtils - +import pkg/chronicles import pkg/questionable import pkg/questionable/results +import pkg/chronos +import pkg/libp2p except shuffle -import ./rng import ./blocktype export blocktype @@ -26,118 +26,116 @@ const type # default reader type + ChunkBuffer* = ptr UncheckedArray[byte] Reader* = - proc(data: var openArray[byte], offset: Natural = 0): int - {.gcsafe, closure, raises: [Defect].} + proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].} ChunkerType* {.pure.} = enum - SizedChunker + FixedChunker RabinChunker - Chunker* = ref object of RootObj + Chunker* = ref object reader*: Reader - size*: Natural - pos*: Natural case kind*: ChunkerType: - of SizedChunker: + of FixedChunker: chunkSize*: Natural pad*: bool # pad last block if less than size of RabinChunker: discard -proc getBytes*(c: Chunker): seq[byte] = + FileChunker* = Chunker + LPStreamChunker* = Chunker + +proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = ## returns a chunk of bytes from ## the instantiated chunker ## - if c.pos >= c.size: - return + var buff = newSeq[byte](c.chunkSize) + let read = await c.reader(cast[ChunkBuffer](addr buff[0]), buff.len) - var bytes = newSeq[byte](c.chunkSize) - let read = c.reader(bytes, c.pos) - c.pos += read + if read <= 0: + return @[] - if not c.pad and bytes.len != read: - bytes.setLen(read) + if not c.pad and buff.len > read: + buff.setLen(read) - return bytes - -iterator items*(c: Chunker): seq[byte] = - while true: - let chunk = c.getBytes() - if chunk.len <= 0: - break - - yield chunk + return buff func new*( T: type Chunker, - kind = ChunkerType.SizedChunker, + kind = ChunkerType.FixedChunker, reader: Reader, - size: Natural, chunkSize = DefaultChunkSize, pad = false): T = var chunker = Chunker( kind: kind, - reader: reader, - size: size) + reader: reader) - if kind == ChunkerType.SizedChunker: + if kind == ChunkerType.FixedChunker: chunker.pad = pad chunker.chunkSize = chunkSize return chunker -proc newRandomChunker*( - rng: Rng, - size: int64, - kind = ChunkerType.SizedChunker, +proc new*( + T: type LPStreamChunker, + stream: LPStream, + kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, - pad = false): Chunker = - ## create a chunker that produces - ## random data - ## - - proc reader(data: var openArray[byte], offset: Natural = 0): int = - var alpha = toSeq(byte('A')..byte('z')) - - var read = 0 - while read <= data.high: - rng.shuffle(alpha) - for a in alpha: - if read > data.high: - break - - data[read] = a - read.inc - - return read - - Chunker.new( - kind = ChunkerType.SizedChunker, - reader = reader, - size = size, - pad = pad, - chunkSize = chunkSize) - -proc newFileChunker*( - file: File, - kind = ChunkerType.SizedChunker, - chunkSize = DefaultChunkSize, - pad = false): Chunker = + pad = false): T = ## create the default File chunker ## - proc reader(data: var openArray[byte], offset: Natural = 0): int = + proc reader(data: ChunkBuffer, len: int): Future[int] + {.gcsafe, async, raises: [Defect].} = + var res = 0 try: - return file.readBytes(data, 0, data.len) - except IOError as exc: - # TODO: revisit error handling - should this be fatal? + while res < len: + res += await stream.readOnce(addr data[res], len - res) + except LPStreamEOFError as exc: + trace "LPStreamChunker stream Eof", exc = exc.msg + except CatchableError as exc: + trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) + return res + Chunker.new( - kind = ChunkerType.SizedChunker, + kind = ChunkerType.FixedChunker, + reader = reader, + pad = pad, + chunkSize = chunkSize) + +proc new*( + T: type FileChunker, + file: File, + kind = ChunkerType.FixedChunker, + chunkSize = DefaultChunkSize, + pad = false): T = + ## create the default File chunker + ## + + proc reader(data: ChunkBuffer, len: int): Future[int] + {.gcsafe, async, raises: [Defect].} = + var total = 0 + try: + while total < len: + let res = file.readBuffer(addr data[total], len - total) + if res <= 0: + break + + total += res + except IOError as exc: + trace "Exception reading file", exc = exc.msg + except CatchableError as exc: + trace "CatchableError exception", exc = exc.msg + raise newException(Defect, exc.msg) + + return total + + Chunker.new( + kind = ChunkerType.FixedChunker, reader = reader, - size = try: file.getFileSize() except: 0, # TODO: should do something smarter abou this pad = pad, chunkSize = chunkSize) diff --git a/dagger/conf.nim b/dagger/conf.nim new file mode 100644 index 00000000..77586c0d --- /dev/null +++ b/dagger/conf.nim @@ -0,0 +1,88 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import std/os +import std/options + +import pkg/chronicles +import pkg/confutils/defs +import pkg/libp2p + +const + DefaultTcpListenMultiAddr = "/ip4/0.0.0.0/tcp/0" + +type + StartUpCommand* {.pure.} = enum + noCommand, + initNode + + DaggerConf* = object + logLevel* {. + defaultValue: LogLevel.INFO + desc: "Sets the log level" }: LogLevel + + dataDir* {. + desc: "The directory where dagger will store configuration and data." + defaultValue: defaultDataDir() + defaultValueDesc: "" + abbr: "d" + name: "data-dir" }: OutDir + + case cmd* {. + command + defaultValue: noCommand }: StartUpCommand + + of noCommand: + listenAddrs* {. + desc: "Specifies one or more listening multiaddrs for the node to listen on." + defaultValue: @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] + defaultValueDesc: "/ip4/0.0.0.0/tcp/0" + abbr: "a" + name: "listen-addrs" }: seq[MultiAddress] + + bootstrapNodes* {. + desc: "Specifies one or more bootstrap nodes to use when connecting to the network." + abbr: "b" + name: "bootstrap-nodes" }: seq[MultiAddress] + + maxPeers* {. + desc: "The maximum number of peers to connect to" + defaultValue: 160 + name: "max-peers" }: int + + agentString* {. + defaultValue: "Dagger" + desc: "Node agent string which is used as identifier in network" + name: "agent-string" }: string + + apiPort* {. + desc: "The REST Api port", + defaultValue: 8080 + defaultValueDesc: "8080" + name: "api-port" + abbr: "p" }: int + + of initNode: + discard + +proc defaultDataDir*(): string = + let dataDir = when defined(windows): + "AppData" / "Roaming" / "Dagger" + elif defined(macosx): + "Library" / "Application Support" / "Dagger" + else: + ".cache" / "dagger" + + getHomeDir() / dataDir + +func parseCmdArg*(T: type MultiAddress, input: TaintedString): T + {.raises: [ValueError, LPError, Defect].} = + MultiAddress.init($input).tryGet() diff --git a/dagger/dagger.nim b/dagger/dagger.nim index ec0ba0b2..3ba62ca9 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -7,3 +7,77 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/sequtils +import std/os + +import pkg/chronicles +import pkg/chronos +import pkg/presto +import pkg/libp2p +import pkg/confutils +import pkg/confutils/defs +import pkg/nitro +import pkg/stew/io2 + +import ./node +import ./conf +import ./rng +import ./rest/api +import ./stores/fsstore +import ./stores/networkstore +import ./blockexchange +import ./utils/fileutils + +type + DaggerServer* = ref object + runHandle: Future[void] + config: DaggerConf + restServer: RestServerRef + daggerNode: DaggerNodeRef + +proc run*(s: DaggerServer) {.async.} = + s.restServer.start() + await s.daggerNode.start() + + s.runHandle = newFuture[void]() + await s.runHandle + +proc shutdown*(s: DaggerServer) {.async.} = + await allFuturesThrowing( + s.restServer.stop(), s.daggerNode.stop()) + + s.runHandle.complete() + +proc new*(T: type DaggerServer, config: DaggerConf): T = + + let + switch = SwitchBuilder + .new() + .withAddresses(config.listenAddrs) + .withRng(Rng.instance()) + .withNoise() + .withMplex(5.minutes, 5.minutes) + .withMaxConnections(config.maxPeers) + .withAgentVersion(config.agentString) + .withTcpTransport({ServerFlags.ReuseAddr}) + .build() + + let + wallet = WalletRef.new(EthPrivateKey.random()) + network = BlockExcNetwork.new(switch) + localStore = FSStore.new(config.dataDir / "repo") + engine = BlockExcEngine.new(localStore, wallet, network) + store = NetworkStore.new(engine, localStore) + daggerNode = DaggerNodeRef.new(switch, store, engine) + restServer = RestServerRef.new( + daggerNode.initRestApi(), + initTAddress("127.0.0.1" , config.apiPort), + bufferSize = (1024 * 64), + maxRequestBodySize = int.high) + .tryGet() + + switch.mount(network) + T( + config: config, + daggerNode: daggerNode, + restServer: restServer) diff --git a/dagger/errors.nim b/dagger/errors.nim new file mode 100644 index 00000000..847db46b --- /dev/null +++ b/dagger/errors.nim @@ -0,0 +1,22 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/stew/results + +type + DaggerError* = object of CatchableError # base dagger error + DaggerResult*[T] = Result[T, ref DaggerError] + +template mapFailure*( + exp: untyped, + exc: typed = type DaggerError): untyped = + ## Convert `Result[T, E]` to `Result[E, ref CatchableError]` + ## + + ((exp.mapErr do (e: auto) -> ref CatchableError: (ref exc)(msg: $e))) diff --git a/dagger/manifest.nim b/dagger/manifest.nim new file mode 100644 index 00000000..749cb063 --- /dev/null +++ b/dagger/manifest.nim @@ -0,0 +1,178 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/libp2p +import pkg/libp2p/protobuf/minprotobuf +import pkg/questionable +import pkg/questionable/results +import pkg/chronicles +import pkg/chronos + +import ./blocktype +import ./errors + +const + ManifestCodec* = multiCodec("dag-pb") + +var + emptyDigest {.threadvar.}: array[CidVersion, MultiHash] + +type + BlocksManifest* = object + blocks: seq[Cid] + htree: ?Cid + version*: CidVersion + hcodec*: MultiCodec + codec*: MultiCodec + +proc len*(b: BlocksManifest): int = b.blocks.len + +iterator items*(b: BlocksManifest): Cid = + for b in b.blocks: + yield b + +proc hashBytes(mh: MultiHash): seq[byte] = + mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)] + +proc cid*(b: var BlocksManifest): ?!Cid = + if htree =? b.htree: + return htree.success + + var + stack: seq[MultiHash] + + if stack.len == 1: + stack.add(emptyDigest[b.version]) + + for cid in b.blocks: + stack.add(? cid.mhash.mapFailure) + + while stack.len > 1: + let + (b1, b2) = (stack.pop(), stack.pop()) + mh = ? MultiHash.digest( + $b.hcodec, + (b1.hashBytes() & b2.hashBytes())) + .mapFailure + stack.add(mh) + + if stack.len == 1: + let cid = ? Cid.init(b.version, b.codec, stack[0]).mapFailure + b.htree = cid.some + return cid.success + +proc put*(b: var BlocksManifest, cid: Cid) = + b.htree = Cid.none + trace "Adding cid to manifest", cid + b.blocks.add(cid) + +proc contains*(b: BlocksManifest, cid: Cid): bool = + cid in b.blocks + +proc encode*(b: var BlocksManifest): ?!seq[byte] = + ## Encode the manifest into a ``ManifestCodec`` + ## multicodec container (Dag-pb) for now + var pbNode = initProtoBuffer() + + for c in b.blocks: + var pbLink = initProtoBuffer() + pbLink.write(1, c.data.buffer) # write Cid links + pbLink.finish() + pbNode.write(2, pbLink) + + let cid = ? b.cid + pbNode.write(1, cid.data.buffer) # set the treeHash Cid as the data field + pbNode.finish() + + return pbNode.buffer.success + +proc decode*(_: type BlocksManifest, data: seq[byte]): ?!(Cid, seq[Cid]) = + ## Decode a manifest from a byte seq + ## + var + pbNode = initProtoBuffer(data) + cidBuf: seq[byte] + blocks: seq[Cid] + + if pbNode.getField(1, cidBuf).isOk: + let cid = ? Cid.init(cidBuf).mapFailure + var linksBuf: seq[seq[byte]] + if pbNode.getRepeatedField(2, linksBuf).isOk: + for pbLinkBuf in linksBuf: + var + blocksBuf: seq[seq[byte]] + blockBuf: seq[byte] + pbLink = initProtoBuffer(pbLinkBuf) + + if pbLink.getField(1, blockBuf).isOk: + let cidRes = Cid.init(blockBuf) + if cidRes.isOk: + blocks.add(cidRes.get()) + + return (cid, blocks).success + +proc init*( + T: type BlocksManifest, + blocks: openArray[Cid] = [], + version = CIDv1, + hcodec = multiCodec("sha2-256"), + codec = multiCodec("raw")): ?!T = + ## Create a manifest using array of `Cid`s + ## + + # Only gets initialized once + once: + # TODO: The CIDs should be initialized at compile time, + # but the VM fails due to a `memmove` being invoked somewhere + + for v in [CIDv0, CIDv1]: + let + cid = if v == CIDv1: + ? Cid.init("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku").mapFailure + else: + ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure + + mhash = ? cid.mhash.mapFailure + digest = ? MultiHash.digest( + $hcodec, + mhash.hashBytes()).mapFailure + + emptyDigest[v] = digest + + T( + blocks: @blocks, + version: version, + codec: codec, + hcodec: hcodec, + ).success + +proc init*( + T: type BlocksManifest, + blk: Block): ?!T = + ## Create manifest from a raw manifest block + ## (in dag-pb for for now) + ## + + let + (cid, blocks) = ? BlocksManifest.decode(blk.data) + mhash = ? cid.mhash.mapFailure + + var + manifest = ? BlocksManifest.init( + blocks, + cid.version, + mhash.mcodec, + cid.mcodec) + + if cid != (? manifest.cid): + return failure("Content hashes don't match!") + + return manifest.success diff --git a/dagger/node.nim b/dagger/node.nim new file mode 100644 index 00000000..2877de65 --- /dev/null +++ b/dagger/node.nim @@ -0,0 +1,181 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/options + +import pkg/questionable +import pkg/questionable/results +import pkg/chronicles +import pkg/chronos +import pkg/libp2p + +# TODO: remove once exported by libp2p +import pkg/libp2p/routing_record +import pkg/libp2p/signed_envelope + +import ./chunker +import ./blocktype as bt +import ./manifest +import ./stores/blockstore +import ./blockexchange + +logScope: + topics = "dagger node" + +const + FileChunkSize* = 4096 # file chunk read size + +type + DaggerError = object of CatchableError + + DaggerNodeRef* = ref object + switch*: Switch + networkId*: PeerID + blockStore*: BlockStore + engine*: BlockExcEngine + +proc start*(node: DaggerNodeRef) {.async.} = + await node.switch.start() + await node.engine.start() + node.networkId = node.switch.peerInfo.peerId + notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs + +proc stop*(node: DaggerNodeRef) {.async.} = + await node.engine.stop() + await node.switch.stop() + +proc findPeer*( + node: DaggerNodeRef, + peerId: PeerID): Future[?!PeerRecord] {.async.} = + discard + +proc connect*( + node: DaggerNodeRef, + peerId: PeerID, + addrs: seq[MultiAddress]): Future[void] = + node.switch.connect(peerId, addrs) + +proc streamBlocks*( + node: DaggerNodeRef, + stream: BufferStream, + blockManifest: BlocksManifest) {.async.} = + + try: + # TODO: Read sequentially for now + # to prevent slurping the entire dataset + # since disk IO is blocking + for c in blockManifest: + without blk =? (await node.blockStore.getBlock(c)): + trace "Couldn't retrieve block", cid = c + continue + + trace "Streaming block data", cid = blk.cid, bytes = blk.data.len + await stream.pushData(blk.data) + except CatchableError as exc: + trace "Exception retrieving blocks", exc = exc.msg + finally: + await stream.pushEof() + +proc retrieve*( + node: DaggerNodeRef, + stream: BufferStream, + cid: Cid): Future[?!void] {.async.} = + + trace "Received retrieval request", cid + without blk =? await node.blockStore.getBlock(cid): + return failure( + newException(DaggerError, "Couldn't retrieve block for Cid!")) + + without mc =? blk.cid.contentType(): + return failure( + newException(DaggerError, "Couldn't identify Cid!")) + + if mc == ManifestCodec: + trace "Retrieving data set", cid, mc + + let res = BlocksManifest.init(blk) + if (res.isErr): + return failure(res.error.msg) + + asyncSpawn node.streamBlocks(stream, res.get()) + else: + asyncSpawn (proc(): Future[void] {.async.} = + try: + await stream.pushData(blk.data) + except CatchableError as exc: + trace "Unable to send block", cid + discard + finally: + await stream.pushEof())() + + return success() + +proc store*( + node: DaggerNodeRef, + stream: LPStream): Future[?!Cid] {.async.} = + trace "Storing data" + + without var blockManifest =? BlocksManifest.init(): + return failure("Unable to create Block Set") + + let + chunker = LPStreamChunker.new(stream) + + try: + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + trace "Got data from stream", len = chunk.len + let + blk = bt.Block.init(chunk) + + blockManifest.put(blk.cid) + if not (await node.blockStore.putBlock(blk)): + # trace "Unable to store block", cid = blk.cid + return failure("Unable to store block " & $blk.cid) + + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure(exc.msg) + finally: + await stream.close() + + # Generate manifest + without data =? blockManifest.encode(): + return failure( + newException(DaggerError, "Could not generate dataset manifest!")) + + # Store as a dag-pb block + let manifest = bt.Block.init(data = data, codec = ManifestCodec) + if not (await node.blockStore.putBlock(manifest)): + trace "Unable to store manifest", cid = manifest.cid + return failure("Unable to store manifest " & $manifest.cid) + + var cid: ?!Cid + if (cid = blockManifest.cid; cid.isErr): + trace "Unable to generate manifest Cid!", exc = cid.error.msg + return failure(cid.error.msg) + + trace "Stored data", manifestCid = manifest.cid, + contentCid = !cid, + blocks = blockManifest.len + + return manifest.cid.success + +proc new*( + T: type DaggerNodeRef, + switch: Switch, + store: BlockStore, + engine: BlockExcEngine): T = + T( + switch: switch, + blockStore: store, + engine: engine) diff --git a/dagger/rest/api.nim b/dagger/rest/api.nim new file mode 100644 index 00000000..5381056f --- /dev/null +++ b/dagger/rest/api.nim @@ -0,0 +1,190 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import std/sequtils + +import pkg/questionable +import pkg/questionable/results +import pkg/chronicles +import pkg/chronos +import pkg/presto +import pkg/libp2p + +import pkg/libp2p/routing_record + +import ../node + +proc validate( + pattern: string, + value: string): int + {.gcsafe, raises: [Defect].} = + 0 + +proc encodeString(cid: type Cid): Result[string, cstring] = + ok($cid) + +proc decodeString(T: type Cid, value: string): Result[Cid, cstring] = + Cid.init(value) + .mapErr do(e: CidError) -> cstring: + case e + of CidError.Incorrect: "Incorrect Cid" + of CidError.Unsupported: "Unsupported Cid" + of CidError.Overrun: "Overrun Cid" + else: "Error parsing Cid" + +proc encodeString(peerId: PeerID): Result[string, cstring] = + ok($peerId) + +proc decodeString(T: type PeerID, value: string): Result[PeerID, cstring] = + PeerID.init(value) + +proc encodeString(address: MultiAddress): Result[string, cstring] = + ok($address) + +proc decodeString(T: type MultiAddress, value: string): Result[MultiAddress, cstring] = + MultiAddress + .init(value) + .mapErr do(e: string) -> cstring: cstring(e) + +proc initRestApi*(node: DaggerNodeRef): RestRouter = + var router = RestRouter.init(validate) + router.api( + MethodGet, + "/api/dagger/v1/connect/{peerId}") do ( + peerId: PeerID, + addrs: seq[MultiAddress]) -> RestApiResponse: + if peerId.isErr: + return RestApiResponse.error( + Http400, + $peerId.error()) + + let addresses = if addrs.isOk and addrs.get().len > 0: + addrs.get() + else: + let peerRecord = await node.findPeer(peerId.get()) + if peerRecord.isErr: + return RestApiResponse.error( + Http400, + "Unable to find Peer!") + + peerRecord.get().addresses.mapIt( + it.address + ) + + await node.connect(peerId.get(), addresses) + return RestApiResponse.response("") + + router.api( + MethodGet, + "/api/dagger/v1/download/{id}") do ( + id: Cid, resp: HttpResponseRef) -> RestApiResponse: + if id.isErr: + return RestApiResponse.error( + Http400, + $id.error()) + + let + stream = BufferStream.new() + + var bytes = 0 + try: + if ( + let retr = await node.retrieve(stream, id.get()); + retr.isErr): + return RestApiResponse.error(Http400, retr.error.msg) + + await resp.prepareChunked() + while not stream.atEof: + var + buff = newSeqUninitialized[byte](FileChunkSize) + len = await stream.readOnce(addr buff[0], buff.len) + + buff.setLen(len) + if buff.len <= 0: + break + + bytes += buff.len + trace "Sending cunk", size = buff.len + await resp.sendChunk(addr buff[0], buff.len) + except CatchableError as exc: + trace "Excepting streaming blocks", exc = exc.msg + return RestApiResponse.error(Http500) + finally: + trace "Sent bytes", cid = id.get(), bytes + await stream.close() + await resp.finish() + + router.rawApi( + MethodPost, + "/api/dagger/v1/upload") do ( + ) -> RestApiResponse: + trace "Handling file upload" + var bodyReader = request.getBodyReader() + if bodyReader.isErr(): + return RestApiResponse.error(Http500) + + # Attempt to handle `Expect` header + # some clients (curl), waits 1000ms + # before giving up + # + await request.handleExpect() + + let + reader = bodyReader.get() + stream = BufferStream.new() + storeFut = node.store(stream) + + var bytes = 0 + try: + while not reader.atEof: + var + buff = newSeqUninitialized[byte](FileChunkSize) + len = await reader.readOnce(addr buff[0], buff.len) + + buff.setLen(len) + if len <= 0: + break + + trace "Got chunk from endpoint", len = buff.len + await stream.pushData(buff) + bytes += len + + await stream.pushEof() + without cid =? (await storeFut): + return RestApiResponse.error(Http500) + + trace "Uploaded file", bytes, cid = $cid + return RestApiResponse.response($cid) + except CancelledError as exc: + await reader.closeWait() + return RestApiResponse.error(Http500) + except AsyncStreamError: + await reader.closeWait() + return RestApiResponse.error(Http500) + finally: + await stream.close() + await reader.closeWait() + + # if we got here something went wrong? + return RestApiResponse.error(Http500) + + router.api( + MethodGet, + "/api/dagger/v1/info") do () -> RestApiResponse: + var addrs: string + for a in node.switch.peerInfo.addrs: + addrs &= "- " & $a & "\n" + + return RestApiResponse.response( + "Id: " & $node.switch.peerInfo.peerId & + "\nAddrs: \n" & addrs & "\n") + + return router diff --git a/dagger/stores.nim b/dagger/stores.nim index bc0e604a..6b060c46 100644 --- a/dagger/stores.nim +++ b/dagger/stores.nim @@ -1,3 +1,7 @@ -import ./stores/[memorystore, blockstore, blockexchange] +import ./stores/[ + memorystore, + blockstore, + networkstore, + fsstore] -export memorystore, blockstore, blockexchange +export memorystore, blockstore, networkstore, fsstore diff --git a/dagger/stores/blockexchange.nim b/dagger/stores/blockexchange.nim deleted file mode 100644 index 450074fd..00000000 --- a/dagger/stores/blockexchange.nim +++ /dev/null @@ -1,174 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import std/sequtils - -import pkg/chronicles -import pkg/chronos -import pkg/libp2p -import pkg/libp2p/errors - -import ../blocktype as bt -import ../utils/asyncheapqueue - -import ./blockstore -import ../blockexchange/network -import ../blockexchange/engine -import ../blockexchange/peercontext -import ../blockexchange/protobuf/blockexc as pb - -export blockstore, network, engine, asyncheapqueue - -logScope: - topics = "dagger blockexc" - -const - DefaultTaskQueueSize = 100 - DefaultConcurrentTasks = 10 - DefaultMaxRetries = 3 - -type - BlockExc* = ref object of BlockStore - engine*: BlockExcEngine # blockexc decision engine - taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for - blockexcTasks: seq[Future[void]] # future to control blockexc task - blockexcRunning: bool # indicates if the blockexc task is running - concurrentTasks: int # number of concurrent peers we're serving at any given time - maxRetries: int # max number of tries for a failed block - taskHandler: TaskHandler # handler provided by the engine called by the runner - -proc blockexcTaskRunner(b: BlockExc) {.async.} = - ## process tasks - ## - - while b.blockexcRunning: - let peerCtx = await b.taskQueue.pop() - asyncSpawn b.taskHandler(peerCtx) - - trace "Exiting blockexc task runner" - -proc start*(b: BlockExc) {.async.} = - ## Start the blockexc task - ## - - trace "blockexc start" - - if b.blockexcTasks.len > 0: - warn "Starting blockexc twice" - return - - b.blockexcRunning = true - for i in 0.. 0 + s.blocks.anyIt( it.cid == cid ) -method putBlocks*(s: MemoryStore, blocks: seq[Block]) = +method putBlock*( + s: MemoryStore, + blk: Block): Future[bool] {.async.} = ## Put a block to the blockstore ## - s.blocks.add(blocks) - procCall BlockStore(s).putBlocks(blocks) + trace "Putting block", cid = blk.cid + s.blocks.add(blk) -method delBlocks*(s: MemoryStore, cids: seq[Cid]) = + return blk.cid in s + +method delBlock*( + s: MemoryStore, + cid: Cid): Future[bool] {.async.} = ## delete a block/s from the block store ## - for c in cids: - s.blocks.keepItIf( it.cid != c ) + s.blocks.keepItIf( it.cid != cid ) + return cid notin s - procCall BlockStore(s).delBlocks(cids) - -func new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore = +func new*(_: type MemoryStore, blocks: openArray[Block] = []): MemoryStore = MemoryStore( blocks: @blocks ) diff --git a/dagger/stores/networkstore.nim b/dagger/stores/networkstore.nim new file mode 100644 index 00000000..bec41f4e --- /dev/null +++ b/dagger/stores/networkstore.nim @@ -0,0 +1,91 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/chronicles +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/errors + +import ../blocktype as bt +import ../utils/asyncheapqueue + +import ./blockstore +import ../blockexchange/network +import ../blockexchange/engine +import ../blockexchange/peercontext + +export blockstore, network, engine, asyncheapqueue + +logScope: + topics = "dagger networkstore" + +type + NetworkStore* = ref object of BlockStore + engine*: BlockExcEngine # blockexc decision engine + localStore*: BlockStore # local block store + +method getBlock*( + self: NetworkStore, + cid: Cid): Future[?!bt.Block] {.async.} = + ## Get a block from a remote peer + ## + + trace "Getting block", cid + without var blk =? (await self.localStore.getBlock(cid)): + trace "Couldn't get from local store", cid + blk = try: + await self.engine.requestBlock(cid) + except CatchableError as exc: + trace "Exception requestig block", cid, exc = exc.msg + return failure(exc.msg) + + trace "Retrieved block from local store", cid + return blk.success + +method putBlock*( + self: NetworkStore, + blk: bt.Block): Future[bool] {.async.} = + trace "Puting block", cid = blk.cid + + if not (await self.localStore.putBlock(blk)): + return false + + self.engine.resolveBlocks(@[blk]) + return true + +method delBlock*( + self: NetworkStore, + cid: Cid): Future[bool] = + ## Delete a block/s from the block store + ## + + self.localStore.delBlock(cid) + +{.pop.} + +method hasBlock*( + self: NetworkStore, + cid: Cid): bool = + ## Check if the block exists in the blockstore + ## + + self.localStore.hasBlock(cid) + +proc new*( + T: type NetworkStore, + engine: BlockExcEngine, + localStore: BlockStore): T = + + let b = NetworkStore( + localStore: localStore, + engine: engine) + + return b diff --git a/dagger/utils/fileutils.nim b/dagger/utils/fileutils.nim new file mode 100644 index 00000000..53b8b46e --- /dev/null +++ b/dagger/utils/fileutils.nim @@ -0,0 +1,106 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +## Partially taken from nim beacon chain + +{.push raises: [Defect].} + +import std/strutils + +import pkg/chronicles +import stew/io2 + +export io2 + +when defined(windows): + import stew/[windows/acl] + +proc secureCreatePath*(path: string): IoResult[void] = + when defined(windows): + let sres = createFoldersUserOnlySecurityDescriptor() + if sres.isErr(): + error "Could not allocate security descriptor", path = path, + errorMsg = ioErrorMsg(sres.error), errorCode = $sres.error + err(sres.error) + else: + var sd = sres.get() + createPath(path, 0o700, secDescriptor = sd.getDescriptor()) + else: + createPath(path, 0o700) + +proc secureWriteFile*[T: byte|char](path: string, + data: openArray[T]): IoResult[void] = + when defined(windows): + let sres = createFilesUserOnlySecurityDescriptor() + if sres.isErr(): + error "Could not allocate security descriptor", path = path, + errorMsg = ioErrorMsg(sres.error), errorCode = $sres.error + err(sres.error) + else: + var sd = sres.get() + writeFile(path, data, 0o600, secDescriptor = sd.getDescriptor()) + else: + writeFile(path, data, 0o600) + +proc checkAndCreateDataDir*(dataDir: string): bool = + when defined(posix): + let requiredPerms = 0o700 + if isDir(dataDir): + let currPermsRes = getPermissions(dataDir) + if currPermsRes.isErr(): + fatal "Could not check data directory permissions", + data_dir = dataDir, errorCode = $currPermsRes.error, + errorMsg = ioErrorMsg(currPermsRes.error) + return false + else: + let currPerms = currPermsRes.get() + if currPerms != requiredPerms: + warn "Data directory has insecure permissions. Correcting them.", + data_dir = dataDir, + current_permissions = currPerms.toOct(4), + required_permissions = requiredPerms.toOct(4) + let newPermsRes = setPermissions(dataDir, requiredPerms) + if newPermsRes.isErr(): + fatal "Could not set data directory permissions", + data_dir = dataDir, + errorCode = $newPermsRes.error, + errorMsg = ioErrorMsg(newPermsRes.error), + old_permissions = currPerms.toOct(4), + new_permissions = requiredPerms.toOct(4) + return false + else: + let res = secureCreatePath(dataDir) + if res.isErr(): + fatal "Could not create data directory", data_dir = dataDir, + errorMsg = ioErrorMsg(res.error), errorCode = $res.error + return false + elif defined(windows): + let amask = {AccessFlags.Read, AccessFlags.Write, AccessFlags.Execute} + if fileAccessible(dataDir, amask): + let cres = checkCurrentUserOnlyACL(dataDir) + if cres.isErr(): + fatal "Could not check data folder's ACL", + data_dir = dataDir, errorCode = $cres.error, + errorMsg = ioErrorMsg(cres.error) + return false + else: + if cres.get() == false: + fatal "Data folder has insecure ACL", data_dir = dataDir + return false + else: + let res = secureCreatePath(dataDir) + if res.isErr(): + fatal "Could not create data folder", data_dir = dataDir, + errorMsg = ioErrorMsg(res.error), errorCode = $res.error + return false + else: + fatal "Unsupported operation system" + return false + + return true diff --git a/tests/dagger/stores/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim similarity index 56% rename from tests/dagger/stores/testblockexc.nim rename to tests/dagger/blockexc/testblockexc.nim index 9d5ebb32..855dbb6b 100644 --- a/tests/dagger/stores/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -14,31 +14,41 @@ import pkg/dagger/blockexchange import pkg/dagger/chunker import pkg/dagger/blocktype as bt -import ./utils import ../helpers import ../examples -suite "BlockExc engine - 2 nodes": +suite "NetworkStore engine - 2 nodes": let - chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - blocks1 = chunker1.mapIt( !bt.Block.new(it) ) - chunker2 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - blocks2 = chunker2.mapIt( !bt.Block.new(it) ) + chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) + chunker2 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) var switch1, switch2: Switch wallet1, wallet2: WalletRef pricing1, pricing2: Pricing network1, network2: BlockExcNetwork - blockexc1, blockexc2: BlockExc - awaiters: seq[Future[void]] + blockexc1, blockexc2: NetworkStore peerId1, peerId2: PeerID peerCtx1, peerCtx2: BlockExcPeerCtx - done: Future[void] + blocks1, blocks2: seq[bt.Block] + engine1, engine2: BlockExcEngine + localStore1, localStore2: BlockStore setup: - done = newFuture[void]() + while true: + let chunk = await chunker1.getBytes() + if chunk.len <= 0: + break + + blocks1.add(bt.Block.init(chunk)) + + while true: + let chunk = await chunker2.getBytes() + if chunk.len <= 0: + break + + blocks2.add(bt.Block.init(chunk)) switch1 = newStandardSwitch() switch2 = newStandardSwitch() @@ -46,23 +56,27 @@ suite "BlockExc engine - 2 nodes": wallet2 = WalletRef.example pricing1 = Pricing.example pricing2 = Pricing.example - awaiters.add(await switch1.start()) - awaiters.add(await switch2.start()) + await switch1.start() + await switch2.start() peerId1 = switch1.peerInfo.peerId peerId2 = switch2.peerInfo.peerId + localStore1 = MemoryStore.new(blocks1.mapIt( it )) network1 = BlockExcNetwork.new(switch = switch1) - blockexc1 = BlockExc.new(MemoryStore.new(blocks1), wallet1, network1) + engine1 = BlockExcEngine.new(localStore1, wallet1, network1) + blockexc1 = NetworkStore.new(engine1, localStore1) switch1.mount(network1) + localStore2 = MemoryStore.new(blocks2.mapIt( it )) network2 = BlockExcNetwork.new(switch = switch2) - blockexc2 = BlockExc.new(MemoryStore.new(blocks2), wallet2, network2) + engine2 = BlockExcEngine.new(localStore2, wallet2, network2) + blockexc2 = NetworkStore.new(engine2, localStore2) switch2.mount(network2) await allFuturesThrowing( - blockexc1.start(), - blockexc2.start(), + engine1.start(), + engine2.start(), ) # initialize our want lists @@ -84,13 +98,11 @@ suite "BlockExc engine - 2 nodes": teardown: await allFuturesThrowing( - blockexc1.stop(), - blockexc2.stop(), + engine1.stop(), + engine2.stop(), switch1.stop(), switch2.stop()) - await allFuturesThrowing(awaiters) - test "should exchange want lists on connect": check not isNil(peerCtx1) check not isNil(peerCtx2) @@ -107,8 +119,8 @@ suite "BlockExc engine - 2 nodes": check peerCtx2.account.?address == pricing2.address.some test "should send want-have for block": - let blk = !bt.Block.new("Block 1".toBytes) - blockexc2.engine.localStore.putBlocks(@[blk]) + let blk = bt.Block.init("Block 1".toBytes) + check await blockexc2.engine.localStore.putBlock(blk) let entry = Entry( `block`: blk.cid.data.buffer, @@ -118,69 +130,77 @@ suite "BlockExc engine - 2 nodes": sendDontHave: false) peerCtx1.peerWants.add(entry) - check blockexc2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk + check blockexc2 + .engine + .taskQueue + .pushOrUpdateNoWait(peerCtx1).isOk await sleepAsync(100.millis) check blockexc1.engine.localStore.hasBlock(blk.cid) test "should get blocks from remote": - let blocks = await blockexc1.getBlocks(blocks2.mapIt( it.cid )) - check blocks == blocks2 + let blocks = await allFinished( + blocks2.mapIt( blockexc1.getBlock(it.cid) )) + check blocks.mapIt( !it.read ) == blocks2 test "remote should send blocks when available": - let blk = !bt.Block.new("Block 1".toBytes) + let blk = bt.Block.init("Block 1".toBytes) # should fail retrieving block from remote - check not await blockexc1.getBlocks(@[blk.cid]) + check not await blockexc1.getBlock(blk.cid) .withTimeout(100.millis) # should expire - proc onBlocks(evt: BlockStoreChangeEvt) = - check evt.cids == @[blk.cid] - done.complete() - - blockexc1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added) - # first put the required block in the local store - blockexc2.engine.localStore.putBlocks(@[blk]) + check await blockexc2.engine.localStore.putBlock(blk) # second trigger blockexc to resolve any pending requests # for the block - blockexc2.putBlocks(@[blk]) + check await blockexc2.putBlock(blk) - await done + # should succeed retrieving block from remote + check await blockexc1.getBlock(blk.cid) + .withTimeout(100.millis) # should succede test "receives payments for blocks that were sent": - let blocks = await blockexc1.getBlocks(blocks2.mapIt(it.cid)) + let blocks = await allFinished( + blocks2.mapIt( blockexc1.getBlock(it.cid) )) await sleepAsync(100.millis) let channel = !peerCtx1.paymentChannel check wallet2.balance(channel, Asset) > 0 -suite "BlockExc - multiple nodes": +suite "NetworkStore - multiple nodes": let - chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256) - blocks = chunker.mapIt( !bt.Block.new(it) ) + chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] - blockexc: seq[BlockExc] - awaiters: seq[Future[void]] + blockexc: seq[NetworkStore] + blocks: seq[bt.Block] setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.init(chunk)) + for e in generateNodes(5): switch.add(e.switch) blockexc.add(e.blockexc) - await e.blockexc.start() + await e.blockexc.engine.start() - awaiters = switch.mapIt( - (await it.start()) - ).concat() + await allFuturesThrowing( + switch.mapIt( it.start() ) + ) teardown: await allFuturesThrowing( switch.mapIt( it.stop() ) ) - await allFuturesThrowing(awaiters) + switch = @[] + blockexc = @[] test "should receive haves for own want list": let @@ -191,18 +211,22 @@ suite "BlockExc - multiple nodes": engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) - blockexc[0].engine.localStore.putBlocks(blocks[0..3]) - blockexc[1].engine.localStore.putBlocks(blocks[4..7]) - blockexc[2].engine.localStore.putBlocks(blocks[8..11]) - blockexc[3].engine.localStore.putBlocks(blocks[12..15]) + await allFutures( + blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + await allFutures( + blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + await allFutures( + blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + await allFutures( + blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) await connectNodes(switch) - await sleepAsync(1.seconds) check: engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) == blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) == blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) @@ -215,11 +239,18 @@ suite "BlockExc - multiple nodes": engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) - blockexc[0].engine.localStore.putBlocks(blocks[0..3]) - blockexc[1].engine.localStore.putBlocks(blocks[4..7]) - blockexc[2].engine.localStore.putBlocks(blocks[8..11]) - blockexc[3].engine.localStore.putBlocks(blocks[12..15]) + await allFutures( + blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + await allFutures( + blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + await allFutures( + blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + await allFutures( + blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) await connectNodes(switch) - let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid )) - check wantListBlocks == blocks[0..3] + await sleepAsync(1.seconds) + + let wantListBlocks = await allFinished( + blocks[0..3].mapIt( downloader.getBlock(it.cid) )) + check wantListBlocks.mapIt( !it.read ) == blocks[0..3] diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index 2852636a..1f743d52 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -17,19 +17,26 @@ import pkg/dagger/utils/asyncheapqueue import ../helpers import ../examples -suite "BlockExc engine basic": +suite "NetworkStore engine basic": let rng = Rng.instance() seckey = PrivateKey.random(rng[]).tryGet() peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() - chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - blocks = chunker.mapIt( !bt.Block.new(it) ) + chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example var + blocks: seq[bt.Block] done: Future[void] setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.init(chunk)) + done = newFuture[void]() test "should send want list to new peers": @@ -45,11 +52,15 @@ suite "BlockExc engine basic": done.complete() - let request = BlockExcRequest( - sendWantList: sendWantList, - ) + let + network = BlockExcNetwork(request: BlockExcRequest( + sendWantList: sendWantList, + )) - let engine = BlockExcEngine.new(MemoryStore.new(blocks), wallet, request) + engine = BlockExcEngine.new( + MemoryStore.new(blocks.mapIt( it )), + wallet, + network) engine.wantList = blocks.mapIt( it.cid ) engine.setupPeer(peerId) @@ -62,30 +73,41 @@ suite "BlockExc engine basic": check account.address == pricing.address done.complete() - let request = BlockExcRequest(sendAccount: sendAccount) - let engine = BlockExcEngine.new(MemoryStore.new, wallet, request) - engine.pricing = pricing.some + let + network = BlockExcNetwork(request: BlockExcRequest( + sendAccount: sendAccount, + )) + engine = BlockExcEngine.new(MemoryStore.new, wallet, network) + + engine.pricing = pricing.some engine.setupPeer(peerId) await done.wait(100.millis) -suite "BlockExc engine handlers": +suite "NetworkStore engine handlers": let rng = Rng.instance() seckey = PrivateKey.random(rng[]).tryGet() peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() - chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - blocks = chunker.mapIt( !bt.Block.new(it) ) + chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example var engine: BlockExcEngine peerCtx: BlockExcPeerCtx done: Future[void] + blocks: seq[bt.Block] setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.init(chunk)) + done = newFuture[void]() - engine = BlockExcEngine.new(MemoryStore.new(), wallet) + engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) peerCtx = BlockExcPeerCtx( id: peerId ) @@ -93,15 +115,13 @@ suite "BlockExc engine handlers": test "should handle want list": let wantList = makeWantList(blocks.mapIt( it.cid )) - proc taskScheduler(ctx: BlockExcPeerCtx): bool = + proc handler() {.async.} = + let ctx = await engine.taskQueue.pop() check ctx.id == peerId check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid ) - done.complete() - - engine.scheduleTask = taskScheduler - engine.wantListHandler(peerId, wantList) - + let done = handler() + await engine.wantListHandler(peerId, wantList) await done test "should handle want list - `dont-have`": @@ -114,11 +134,11 @@ suite "BlockExc engine handlers": done.complete() - engine.request = BlockExcRequest( - sendPresence: sendPresence - ) + engine.network = BlockExcNetwork(request: BlockExcRequest( + sendPresence: sendPresence + )) - engine.wantListHandler(peerId, wantList) + await engine.wantListHandler(peerId, wantList) await done @@ -132,9 +152,13 @@ suite "BlockExc engine handlers": done.complete() - engine.request = BlockExcRequest(sendPresence: sendPresence) - engine.localStore.putBlocks(@[blocks[0], blocks[1]]) - engine.wantListHandler(peerId, wantList) + engine.network = BlockExcNetwork(request: BlockExcRequest( + sendPresence: sendPresence + )) + + check await engine.localStore.putBlock(blocks[0]) + check await engine.localStore.putBlock(blocks[1]) + await engine.wantListHandler(peerId, wantList) await done @@ -143,7 +167,7 @@ suite "BlockExc engine handlers": engine.pendingBlocks.addOrAwait( it.cid ) ) - engine.blocksHandler(peerId, blocks) + await engine.blocksHandler(peerId, blocks) let resolved = await allFinished(pending) check resolved.mapIt( it.read ) == blocks for b in blocks: @@ -155,20 +179,22 @@ suite "BlockExc engine handlers": peerContext.account = account.some peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable - engine.request.sendPayment = proc(receiver: PeerID, payment: SignedState) = - let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) - let balances = !payment.state.outcome.balances(Asset) - check receiver == peerId - check balances[account.address.toDestination] == amount - done.complete() + engine.network = BlockExcNetwork(request: BlockExcRequest( + sendPayment: proc(receiver: PeerID, payment: SignedState) = + let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) + let balances = !payment.state.outcome.balances(Asset) + check receiver == peerId + check balances[account.address.toDestination] == amount + done.complete() + )) - engine.blocksHandler(peerId, blocks) + await engine.blocksHandler(peerId, blocks) await done.wait(100.millis) test "should handle block presence": let price = UInt256.example - engine.blockPresenceHandler( + await engine.blockPresenceHandler( peerId, blocks.mapIt( PresenceMessage.init( @@ -186,8 +212,7 @@ suite "Task Handler": let rng = Rng.instance() - chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256) - blocks = chunker.mapIt( !bt.Block.new(it) ) + chunker = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256) wallet = WalletRef.example var @@ -195,10 +220,18 @@ suite "Task Handler": peersCtx: seq[BlockExcPeerCtx] peers: seq[PeerID] done: Future[void] + blocks: seq[bt.Block] setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.init(chunk)) + done = newFuture[void]() - engine = BlockExcEngine.new(MemoryStore.new(), wallet) + engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) peersCtx = @[] for i in 0..3: @@ -221,8 +254,9 @@ suite "Task Handler": blks[1].cid == blocks[0].cid blks[0].cid == blocks[1].cid - engine.localStore.putBlocks(blocks) - engine.request.sendBlocks = sendBlocks + for blk in blocks: + check await engine.localStore.putBlock(blk) + engine.network.request.sendBlocks = sendBlocks # second block to send by priority peersCtx[0].peerWants.add( @@ -248,7 +282,7 @@ suite "Task Handler": test "Should send presence": let present = blocks - let missing = @[!bt.Block.new("missing".toBytes)] + let missing = @[bt.Block.init("missing".toBytes)] let price = (!engine.pricing).price proc sendPresence(id: PeerID, presence: seq[BlockPresence]) = @@ -258,8 +292,9 @@ suite "Task Handler": Presence(cid: missing[0].cid, have: false) ] - engine.localStore.putBlocks(blocks) - engine.request.sendPresence = sendPresence + for blk in blocks: + check await engine.localStore.putBlock(blk) + engine.network.request.sendPresence = sendPresence # have block peersCtx[0].peerWants.add( diff --git a/tests/dagger/blockexc/testnetwork.nim b/tests/dagger/blockexc/testnetwork.nim index 79341915..6a2e9cbc 100644 --- a/tests/dagger/blockexc/testnetwork.nim +++ b/tests/dagger/blockexc/testnetwork.nim @@ -17,24 +17,31 @@ import pkg/dagger/blockexchange import ../helpers import ../examples -suite "BlockExc network": +suite "NetworkStore network": let rng = Rng.instance() seckey = PrivateKey.random(rng[]).tryGet() peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() - chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - blocks = chunker.mapIt( !bt.Block.new(it) ) + chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) var network: BlockExcNetwork networkPeer: NetworkPeer buffer: BufferStream + blocks: seq[bt.Block] done: Future[void] proc getConn(): Future[Connection] {.async.} = return Connection(buffer) setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.init(chunk)) + done = newFuture[void]() buffer = BufferStream.new() network = BlockExcNetwork.new( @@ -45,7 +52,7 @@ suite "BlockExc network": discard await networkPeer.connect() test "Want List handler": - proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} = + proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} = # check that we got the correct amount of entries check wantList.entries.len == 4 @@ -72,7 +79,7 @@ suite "BlockExc network": await done.wait(500.millis) test "Blocks Handler": - proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} = + proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} = check blks == blocks done.complete() @@ -84,7 +91,9 @@ suite "BlockExc network": await done.wait(500.millis) test "Presence Handler": - proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} = + proc presenceHandler( + peer: PeerID, + precense: seq[BlockPresence]) {.gcsafe, async.} = for b in blocks: check: b.cid in precense @@ -106,7 +115,7 @@ suite "BlockExc network": test "handles account messages": let account = Account(address: EthAddress.example) - proc handleAccount(peer: PeerID, received: Account) = + proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} = check received == account done.complete() @@ -120,7 +129,7 @@ suite "BlockExc network": test "handles payment messages": let payment = SignedState.example - proc handlePayment(peer: PeerID, received: SignedState) = + proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} = check received == payment done.complete() @@ -131,23 +140,29 @@ suite "BlockExc network": await done.wait(100.millis) -suite "BlockExc Network - e2e": +suite "NetworkStore Network - e2e": let - chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - blocks = chunker.mapIt( !bt.Block.new(it) ) + chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) var switch1, switch2: Switch network1, network2: BlockExcNetwork - awaiters: seq[Future[void]] + blocks: seq[bt.Block] done: Future[void] setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.init(chunk)) + done = newFuture[void]() switch1 = newStandardSwitch() switch2 = newStandardSwitch() - awaiters.add(await switch1.start()) - awaiters.add(await switch2.start()) + await switch1.start() + await switch2.start() network1 = BlockExcNetwork.new( switch = switch1) @@ -166,10 +181,8 @@ suite "BlockExc Network - e2e": switch1.stop(), switch2.stop()) - await allFuturesThrowing(awaiters) - test "broadcast want list": - proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} = + proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} = # check that we got the correct amount of entries check wantList.entries.len == 4 @@ -193,7 +206,7 @@ suite "BlockExc Network - e2e": await done.wait(500.millis) test "broadcast blocks": - proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} = + proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} = check blks == blocks done.complete() @@ -205,7 +218,9 @@ suite "BlockExc Network - e2e": await done.wait(500.millis) test "broadcast precense": - proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} = + proc presenceHandler( + peer: PeerID, + precense: seq[BlockPresence]) {.gcsafe, async.} = for b in blocks: check: b.cid in precense @@ -227,7 +242,7 @@ suite "BlockExc Network - e2e": test "broadcasts account": let account = Account(address: EthAddress.example) - proc handleAccount(peer: PeerID, received: Account) = + proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} = check received == account done.complete() @@ -240,7 +255,7 @@ suite "BlockExc Network - e2e": test "broadcasts payment": let payment = SignedState.example - proc handlePayment(peer: PeerID, received: SignedState) = + proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} = check received == payment done.complete() diff --git a/tests/dagger/examples.nim b/tests/dagger/examples.nim index 178911c4..ef647b20 100644 --- a/tests/dagger/examples.nim +++ b/tests/dagger/examples.nim @@ -42,7 +42,7 @@ proc example*(_: type Pricing): Pricing = proc example*(_: type Block): Block = let length = rand(4096) let bytes = newSeqWith(length, rand(uint8)) - !Block.new(bytes) + Block.init(bytes) proc example*(_: type PeerId): PeerID = let key = PrivateKey.random(Rng.instance[]).get diff --git a/tests/dagger/helpers.nim b/tests/dagger/helpers.nim index 831b4500..1dc180ca 100644 --- a/tests/dagger/helpers.nim +++ b/tests/dagger/helpers.nim @@ -1,22 +1,14 @@ import pkg/libp2p/varint -import pkg/dagger/chunker import pkg/dagger/blocktype -import pkg/dagger/blockstream import pkg/questionable import pkg/questionable/results -export chunker +import ./helpers/nodeutils +import ./helpers/randomchunker -type - TestStreamProc* = proc(): ?!Block {.raises: [Defect].} - - TestStream* = ref object of BlockStreamRef - handler*: TestStreamProc - -method nextBlock*(b: TestStream): ?!Block = - b.handler() +export randomchunker, nodeutils proc lenPrefix*(msg: openArray[byte]): seq[byte] = ## Write `msg` with a varint-encoded length prefix diff --git a/tests/dagger/stores/utils.nim b/tests/dagger/helpers/nodeutils.nim similarity index 70% rename from tests/dagger/stores/utils.nim rename to tests/dagger/helpers/nodeutils.nim index b6b3309c..8e8f586e 100644 --- a/tests/dagger/stores/utils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -13,20 +13,22 @@ proc generateNodes*( blocks: openArray[bt.Block] = [], secureManagers: openarray[SecureProtocol] = [ SecureProtocol.Noise, - ]): seq[tuple[switch: Switch, blockexc: BlockExc]] = + ]): seq[tuple[switch: Switch, blockexc: NetworkStore]] = for i in 0..= size: + return 0 + + var read = 0 + while read < len: + rng.shuffle(alpha) + for a in alpha: + if read >= len: + break + + data[read] = a + read.inc + + consumed += read + return read + + Chunker.new( + kind = ChunkerType.FixedChunker, + reader = reader, + pad = pad, + chunkSize = chunkSize) diff --git a/tests/dagger/stores/testblockstore.nim b/tests/dagger/stores/testblockstore.nim deleted file mode 100644 index 54a0bb04..00000000 --- a/tests/dagger/stores/testblockstore.nim +++ /dev/null @@ -1,84 +0,0 @@ -import std/sequtils - -import pkg/chronos -import pkg/asynctest -import pkg/libp2p -import pkg/stew/byteutils -import pkg/questionable -import pkg/questionable/results -import pkg/dagger/rng -import pkg/dagger/stores/memorystore -import pkg/dagger/chunker - -import ../helpers - -suite "Memory Store": - - var store: MemoryStore - var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - var blocks = chunker.mapIt( !Block.new(it) ) - - setup: - store = MemoryStore.new(blocks) - - test "getBlocks single": - let blk = await store.getBlocks(@[blocks[0].cid]) - check blk[0] == blocks[0] - - test "getBlocks multiple": - let blk = await store.getBlocks(blocks[0..2].mapIt( it.cid )) - check blk == blocks[0..2] - - test "hasBlock": - check store.hasBlock(blocks[0].cid) - - test "delBlocks single": - let blks = blocks[1..3].mapIt( it.cid ) - store.delBlocks(blks) - - check not store.hasBlock(blks[0]) - check not store.hasBlock(blks[1]) - check not store.hasBlock(blks[2]) - - test "add blocks change handler": - let blocks = @[ - !Block.new("Block 1".toBytes), - !Block.new("Block 2".toBytes), - !Block.new("Block 3".toBytes), - ] - - var triggered = false - store.addChangeHandler( - proc(evt: BlockStoreChangeEvt) = - check evt.kind == ChangeType.Added - check evt.cids == blocks.mapIt( it.cid ) - triggered = true - , ChangeType.Added - ) - - store.putBlocks(blocks) - check triggered - - test "add blocks change handler": - let blocks = @[ - !Block.new("Block 1".toBytes), - !Block.new("Block 2".toBytes), - !Block.new("Block 3".toBytes), - ] - - var triggered = false - store.addChangeHandler( - proc(evt: BlockStoreChangeEvt) = - check evt.kind == ChangeType.Removed - check evt.cids == blocks.mapIt( it.cid ) - triggered = true - , ChangeType.Removed - ) - - store.putBlocks(blocks) - check store.hasBlock(blocks[0].cid) - check store.hasBlock(blocks[1].cid) - check store.hasBlock(blocks[2].cid) - - store.delBlocks(blocks.mapIt( it.cid )) - check triggered diff --git a/tests/dagger/stores/testfsstore.nim b/tests/dagger/stores/testfsstore.nim new file mode 100644 index 00000000..9bcc3f5e --- /dev/null +++ b/tests/dagger/stores/testfsstore.nim @@ -0,0 +1,65 @@ +import std/sequtils +import std/os + +import pkg/questionable +import pkg/questionable/results + +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils + +import pkg/dagger/rng +import pkg/dagger/stores/memorystore +import pkg/dagger/chunker +import pkg/dagger/stores + +import ../helpers + +suite "FS Store": + let + (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name + + var + store: FSStore + repoDir: string + newBlock = Block.init("New Block".toBytes()) + + setup: + repoDir = path.parentDir / "repo" + createDir(repoDir) + store = FSStore.new(repoDir) + + teardown: + removeDir(repoDir) + + test "putBlock": + check await store.putBlock(newBlock) + check fileExists(store.blockPath(newBlock.cid)) + check newBlock.cid in store + + test "getBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + let blk = await store.getBlock(newBlock.cid) + check blk.option == newBlock.some + + test "fail getBlock": + let blk = await store.getBlock(newBlock.cid) + check blk.isErr + + test "hasBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + + check store.hasBlock(newBlock.cid) + + test "fail hasBlock": + check not store.hasBlock(newBlock.cid) + + test "delBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + + check await store.delBlock(newBlock.cid) + check not fileExists(store.blockPath(newBlock.cid)) diff --git a/tests/dagger/stores/testmemorystore.nim b/tests/dagger/stores/testmemorystore.nim new file mode 100644 index 00000000..d84aa802 --- /dev/null +++ b/tests/dagger/stores/testmemorystore.nim @@ -0,0 +1,61 @@ +import std/sequtils + +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils +import pkg/questionable +import pkg/questionable/results +import pkg/dagger/rng +import pkg/dagger/stores/memorystore +import pkg/dagger/chunker + +import ../helpers + +suite "Memory Store tests": + test "putBlock": + let + newBlock = Block.init("New Block".toBytes()) + store = MemoryStore.new() + + check await store.putBlock(newBlock) + check newBlock.cid in store + + test "getBlock": + let + newBlock = Block.init("New Block".toBytes()) + store = MemoryStore.new(@[newBlock]) + + let blk = await store.getBlock(newBlock.cid) + check blk.isOk + check blk == newBlock.success + + test "fail getBlock": + let + newBlock = Block.init("New Block".toBytes()) + store = MemoryStore.new(@[]) + + let blk = await store.getBlock(newBlock.cid) + check blk.isErr + + test "hasBlock": + let + newBlock = Block.init("New Block".toBytes()) + store = MemoryStore.new(@[newBlock]) + + check store.hasBlock(newBlock.cid) + + test "fail hasBlock": + let + newBlock = Block.init("New Block".toBytes()) + store = MemoryStore.new(@[]) + + check not store.hasBlock(newBlock.cid) + + test "delBlock": + let + newBlock = Block.init("New Block".toBytes()) + store = MemoryStore.new(@[newBlock]) + + check await store.delBlock(newBlock.cid) + check newBlock.cid notin store diff --git a/tests/dagger/testblockexc.nim b/tests/dagger/testblockexc.nim index 8868c1b9..8ba2e05b 100644 --- a/tests/dagger/testblockexc.nim +++ b/tests/dagger/testblockexc.nim @@ -3,5 +3,6 @@ import ./blockexc/testnetwork import ./blockexc/protobuf/testpayments as testprotobufpayments import ./blockexc/protobuf/testpresence import ./blockexc/engine/testpayments as testenginepayments +import ./blockexc/testblockexc {.warning[UnusedImport]: off.} diff --git a/tests/dagger/testblockset.nim b/tests/dagger/testblockset.nim deleted file mode 100644 index a6c01cd7..00000000 --- a/tests/dagger/testblockset.nim +++ /dev/null @@ -1,50 +0,0 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/asynctest -import pkg/libp2p -import pkg/stew/byteutils as stew - -import pkg/dagger/chunker -import pkg/dagger/blocktype as bt -import pkg/dagger/blockstream -import pkg/dagger/blockset - -import ./helpers - -suite "BlockSet": - test "Should produce valid tree hash checksum": - let - blocks = @[ - !Block.new("Block 1".toBytes), - !Block.new("Block 2".toBytes), - !Block.new("Block 3".toBytes), - !Block.new("Block 4".toBytes), - !Block.new("Block 5".toBytes), - !Block.new("Block 6".toBytes), - !Block.new("Block 7".toBytes), - ] - - checksum = @[byte(43), 2, 105, 202, 45, 227, - 178, 211, 83, 246, 56, 250, 210, - 160, 210, 98, 123, 87, 139, 157, - 188, 221, 252, 255, 17, 11, 79, - 85, 220, 161, 238, 108] - - var idx = 0 - proc nextBlockHandler(): ?!Block = - let blk = if idx < blocks.len: blocks[idx] else: return - idx.inc() - return success blk - - let - blockStream = TestStream(handler: nextBlockHandler) - blockSet = BlockSetRef.new(stream = blockStream) - - let res = blockSet.treeHash() - check res.isOK - if h =? res: - check h.hashBytes() == checksum - return - - check false diff --git a/tests/dagger/testchunking.nim b/tests/dagger/testchunking.nim index 767a9c4c..ad759d95 100644 --- a/tests/dagger/testchunking.nim +++ b/tests/dagger/testchunking.nim @@ -1,36 +1,68 @@ -import std/unittest +import pkg/asynctest import pkg/stew/byteutils import pkg/dagger/chunker +import pkg/chronicles +import pkg/chronos +import pkg/libp2p suite "Chunking": test "should return proper size chunks": - proc reader(data: var openArray[byte], offset: Natural = 0): int - {.gcsafe, closure, raises: [Defect].} = - let contents = "1234567890".toBytes - copyMem(addr data[0], unsafeAddr contents[offset], data.len) - return data.len + var offset = 0 + let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0] + proc reader(data: ChunkBuffer, len: int): Future[int] + {.gcsafe, async, raises: [Defect].} = + if offset >= contents.len: + return 0 + + copyMem(data, unsafeAddr contents[offset], len) + offset += 2 + return len let chunker = Chunker.new( reader = reader, - size = 10, chunkSize = 2) - check chunker.getBytes() == "12".toBytes - check chunker.getBytes() == "34".toBytes - check chunker.getBytes() == "56".toBytes - check chunker.getBytes() == "78".toBytes - check chunker.getBytes() == "90".toBytes - check chunker.getBytes() == "".toBytes + check: + (await chunker.getBytes()) == [1.byte, 2] + (await chunker.getBytes()) == [3.byte, 4] + (await chunker.getBytes()) == [5.byte, 6] + (await chunker.getBytes()) == [7.byte, 8] + (await chunker.getBytes()) == [9.byte, 0] + (await chunker.getBytes()) == [] + + test "should chunk LPStream": + var offset = 0 + let stream = BufferStream.new() + let chunker = LPStreamChunker.new( + stream = stream, + chunkSize = 2) + + proc writer() {.async.} = + for d in [@[1.byte, 2, 3, 4], @[5.byte, 6, 7, 8], @[9.byte, 0]]: + await stream.pushData(d) + await stream.pushEof() + await stream.close() + + let writerFut = writer() + check: + (await chunker.getBytes()) == [1.byte, 2] + (await chunker.getBytes()) == [3.byte, 4] + (await chunker.getBytes()) == [5.byte, 6] + (await chunker.getBytes()) == [7.byte, 8] + (await chunker.getBytes()) == [9.byte, 0] + (await chunker.getBytes()) == [] + + await writerFut test "should chunk file": - let (fileName, _, _) = instantiationInfo() # get this file's name - let path = "tests/dagger/" & filename - let file = open(path) - let fileChunker = newFileChunker(file = file) + let + (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name + file = open(path) + fileChunker = FileChunker.new(file = file, chunkSize = 256) var data: seq[byte] while true: - let buff = fileChunker.getBytes() + let buff = await fileChunker.getBytes() if buff.len <= 0: break diff --git a/tests/dagger/testmanifest.nim b/tests/dagger/testmanifest.nim new file mode 100644 index 00000000..b8735d82 --- /dev/null +++ b/tests/dagger/testmanifest.nim @@ -0,0 +1,54 @@ +import std/sequtils + +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils + +import pkg/dagger/chunker +import pkg/dagger/blocktype as bt +import pkg/dagger/manifest + +import ./helpers + +suite "Manifest": + test "Should produce valid tree hash checksum": + without var manifest =? BlocksManifest.init( + blocks = @[ + Block.init("Block 1".toBytes).cid, + Block.init("Block 2".toBytes).cid, + Block.init("Block 3".toBytes).cid, + Block.init("Block 4".toBytes).cid, + Block.init("Block 5".toBytes).cid, + Block.init("Block 6".toBytes).cid, + Block.init("Block 7".toBytes).cid, + ]): + fail() + + let + checksum = @[18.byte, 32, 14, 78, 178, 161, + 50, 175, 26, 57, 68, 6, 163, 128, + 19, 131, 212, 203, 93, 98, 219, + 34, 243, 217, 132, 191, 86, 255, + 171, 160, 77, 167, 91, 145] + + var mh: MultiHash + check MultiHash.decode(checksum, mh).get() > 0 + + let checkSumCid = Cid.init(manifest.version, manifest.codec, mh).get() + check checkSumCid == !(manifest.cid) + + test "Should encode/decode to/from manifest": + let + blocks = (0..<1000).mapIt( Block.init(("Block " & $it).toBytes).cid ) + + var + manifest = BlocksManifest.init(blocks).get() + + let + e = manifest.encode().get() + (cid, decoded) = BlocksManifest.decode(e).get() + + check decoded == blocks diff --git a/tests/dagger/testnode.nim b/tests/dagger/testnode.nim new file mode 100644 index 00000000..eb3e40e7 --- /dev/null +++ b/tests/dagger/testnode.nim @@ -0,0 +1,138 @@ +import std/os +import std/options + +import pkg/asynctest +import pkg/chronos +import pkg/stew/byteutils + +import pkg/nitro +import pkg/libp2p +import pkg/libp2p/errors + +import pkg/dagger/stores +import pkg/dagger/blockexchange +import pkg/dagger/chunker +import pkg/dagger/node +import pkg/dagger/manifest +import pkg/dagger/blocktype as bt + +import ./helpers + +suite "Test Node": + let + (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name + + var + file: File + chunker: Chunker + switch: Switch + wallet: WalletRef + network: BlockExcNetwork + localStore: MemoryStore + engine: BlockExcEngine + store: NetworkStore + node: DaggerNodeRef + + setup: + file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") + chunker = FileChunker.new(file = file) + switch = newStandardSwitch() + wallet = WalletRef.new(EthPrivateKey.random()) + network = BlockExcNetwork.new(switch) + localStore = MemoryStore.new() + engine = BlockExcEngine.new(localStore, wallet, network) + store = NetworkStore.new(engine, localStore) + node = DaggerNodeRef.new(switch, store, engine) + + await node.start() + + teardown: + close(file) + await node.stop() + + test "Store Data Stream": + let + stream = BufferStream.new() + storeFut = node.store(stream) + + var + manifest = BlocksManifest.init().tryGet() + + try: + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + await stream.pushData(chunk) + manifest.put(bt.Block.init(chunk).cid) + finally: + await stream.pushEof() + await stream.close() + + let + manifestCid = (await storeFut).tryGet() + + check: + manifestCid in localStore + + var + manifestBlock = (await localStore.getBlock(manifestCid)).get() + localManifest = BlocksManifest.init(manifestBlock).tryGet() + + check: + manifest.len == localManifest.len + manifest.cid == localManifest.cid + + test "Retrieve Data Stream": + var + manifest = BlocksManifest.init().tryGet() + original: seq[byte] + + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + let + blk = bt.Block.init(chunk) + + original &= chunk + check await localStore.putBlock(blk) + manifest.put(blk.cid) + + let + manifestBlock = bt.Block.init( + manifest.encode().tryGet(), + codec = ManifestCodec) + + check await localStore.putBlock(manifestBlock) + + let stream = BufferStream.new() + check (await node.retrieve(stream, manifestBlock.cid)).isOk + + var data: seq[byte] + while true: + var + buf = newSeq[byte](FileChunkSize) + res = await stream.readOnce(addr buf[0], buf.len) + + if res <= 0: + break + + buf.setLen(res) + data &= buf + + check data == original + + test "Retrieve One Block": + let + testString = "Block 1" + blk = bt.Block.init(testString.toBytes) + + var + stream = BufferStream.new() + + check (await localStore.putBlock(blk)) + check (await node.retrieve(stream, blk.cid)).isOk + + var data = newSeq[byte](testString.len) + await stream.readExactly(addr data[0], data.len) + check string.fromBytes(data) == testString diff --git a/tests/dagger/teststores.nim b/tests/dagger/teststores.nim index 689f2fb3..fe3da5a3 100644 --- a/tests/dagger/teststores.nim +++ b/tests/dagger/teststores.nim @@ -1,4 +1,4 @@ -import ./stores/testblockexc -import ./stores/testblockstore +import ./stores/testfsstore +import ./stores/testmemorystore {.warning[UnusedImport]: off.} diff --git a/tests/fixtures/test.jpg b/tests/fixtures/test.jpg new file mode 100644 index 00000000..dc6ee598 Binary files /dev/null and b/tests/fixtures/test.jpg differ diff --git a/tests/testAll.nim b/tests/testAll.nim index fbf20fa4..f9e7167e 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -2,6 +2,7 @@ import ./dagger/teststores import ./dagger/testblockexc import ./dagger/testasyncheapqueue import ./dagger/testchunking -import ./dagger/testblockset +import ./dagger/testmanifest +import ./dagger/testnode {.warning[UnusedImport]: off.} diff --git a/vendor/dnsclient.nim b/vendor/dnsclient.nim new file mode 160000 index 00000000..536cc6b7 --- /dev/null +++ b/vendor/dnsclient.nim @@ -0,0 +1 @@ +Subproject commit 536cc6b7933e5f86590bb27083c0ffeab31255f9 diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 37c62af5..7dc58d42 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 37c62af57951b4afb9c653d4d8f86ed2bdee10f0 +Subproject commit 7dc58d42b6905a7fd7531875fa76060f8f744e4e diff --git a/vendor/nim-confutils b/vendor/nim-confutils new file mode 160000 index 00000000..ab4ba1cb --- /dev/null +++ b/vendor/nim-confutils @@ -0,0 +1 @@ +Subproject commit ab4ba1cbfdccdb8c0398894ffc25169bc61faeed diff --git a/vendor/nim-http-utils b/vendor/nim-http-utils new file mode 160000 index 00000000..689da19e --- /dev/null +++ b/vendor/nim-http-utils @@ -0,0 +1 @@ +Subproject commit 689da19e9e9cfff4ced85e2b25c6b2b5598ed079 diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 6f779c47..58f383e6 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 6f779c47c83f8be9d98958a08c4b49508fb05767 +Subproject commit 58f383e661521314df314e7096c24db5a7490372 diff --git a/vendor/nim-presto b/vendor/nim-presto index bfcbeceb..c41bc8ae 160000 --- a/vendor/nim-presto +++ b/vendor/nim-presto @@ -1 +1 @@ -Subproject commit bfcbeceb65d90b47af9adc94dcb671d7dd827443 +Subproject commit c41bc8aefc7e5342eb927f874140b80d0e989a95 diff --git a/vendor/nim-toml-serialization b/vendor/nim-toml-serialization new file mode 160000 index 00000000..4e15e00e --- /dev/null +++ b/vendor/nim-toml-serialization @@ -0,0 +1 @@ +Subproject commit 4e15e00ed9e27a8d28b40b69ef06c6a4a388ae93 diff --git a/vendor/nim-unittest2 b/vendor/nim-unittest2 new file mode 160000 index 00000000..02c49b8a --- /dev/null +++ b/vendor/nim-unittest2 @@ -0,0 +1 @@ +Subproject commit 02c49b8a994dd3f9eddfaab45262f9b8fa507f8e diff --git a/vendor/nim-websock b/vendor/nim-websock new file mode 160000 index 00000000..a697e358 --- /dev/null +++ b/vendor/nim-websock @@ -0,0 +1 @@ +Subproject commit a697e3585d583ab6b91a159ea7d023461002c927 diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 3b83e229..25a4c270 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 3b83e229432979636702e506a7634325eccb3bb0 +Subproject commit 25a4c270330026442e09f6cddfb7a944da7cfa4b