diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..3eaac2c6 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,173 @@ +name: nim-dagger CI +on: [push, pull_request] + +jobs: + build: + strategy: + fail-fast: false + max-parallel: 20 + matrix: + branch: [v1.2.6] + target: + # Unit tests + - os: linux + cpu: amd64 + TEST_KIND: unit-tests + - os: linux + cpu: i386 + TEST_KIND: unit-tests + - os: macos + cpu: amd64 + TEST_KIND: unit-tests + - os: windows + cpu: i386 + TEST_KIND: unit-tests + - os: windows + cpu: amd64 + TEST_KIND: unit-tests + include: + - target: + os: linux + builder: ubuntu-18.04 + - target: + os: macos + builder: macos-10.15 + - target: + os: windows + builder: windows-2019 + name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (${{ matrix.branch }})' + runs-on: ${{ matrix.builder }} + steps: + - name: Checkout nim-dagger + uses: actions/checkout@v2 + with: + path: nim-dagger + submodules: true + - name: Install build dependencies (Linux i386) + if: runner.os == 'Linux' && matrix.target.cpu == 'i386' + run: | + sudo dpkg --add-architecture i386 + sudo apt-fast update -qq + sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \ + --no-install-recommends -yq gcc-multilib g++-multilib \ + libssl-dev:i386 + mkdir -p external/bin + cat << EOF > external/bin/gcc + #!/bin/bash + exec $(which gcc) -m32 "\$@" + EOF + cat << EOF > external/bin/g++ + #!/bin/bash + exec $(which g++) -m32 "\$@" + EOF + chmod 755 external/bin/gcc external/bin/g++ + echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH + + - name: Install build dependencies (Windows) + if: runner.os == 'Windows' + shell: bash + run: | + mkdir external + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + arch=64 + else + arch=32 + fi + curl -L "https://nim-lang.org/download/mingw$arch-6.3.0.7z" -o "external/mingw$arch.7z" + curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip + 7z x "external/mingw$arch.7z" -oexternal/ + 7z x external/windeps.zip -oexternal/dlls + echo '${{ github.workspace }}'"/external/mingw$arch/bin" >> $GITHUB_PATH + echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH + + - name: Setup environment + shell: bash + run: echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH + + - name: Get latest Nim commit hash + id: versions + shell: bash + run: | + getHash() { + git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1 + } + nimHash=$(getHash nim-lang/Nim '${{ matrix.branch }}') + csourcesHash=$(getHash nim-lang/csources) + echo "::set-output name=nim::$nimHash" + echo "::set-output name=csources::$csourcesHash" + + - name: Restore prebuilt Nim from cache + id: nim-cache + uses: actions/cache@v1 + with: + path: nim + key: "nim-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nim }}" + + - name: Restore prebuilt csources from cache + if: steps.nim-cache.outputs.cache-hit != 'true' + id: csources-cache + uses: actions/cache@v1 + with: + path: csources/bin + key: "csources-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.csources }}" + + - name: Checkout Nim csources + if: > + steps.csources-cache.outputs.cache-hit != 'true' && + steps.nim-cache.outputs.cache-hit != 'true' + uses: actions/checkout@v2 + with: + repository: nim-lang/csources + path: csources + ref: ${{ steps.versions.outputs.csources }} + + - name: Checkout Nim + if: steps.nim-cache.outputs.cache-hit != 'true' + uses: actions/checkout@v2 + with: + repository: nim-lang/Nim + path: nim + ref: ${{ steps.versions.outputs.nim }} + + - name: Build Nim and associated tools + if: steps.nim-cache.outputs.cache-hit != 'true' + shell: bash + run: | + ncpu= + ext= + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + ext=.exe + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + if [[ ! -e csources/bin/nim$ext ]]; then + make -C csources -j $ncpu CC=gcc ucpu='${{ matrix.target.cpu }}' + else + echo 'Using prebuilt csources' + fi + cp -v csources/bin/nim$ext nim/bin + cd nim + nim c koch + ./koch boot -d:release + ./koch tools -d:release + # clean up to save cache space + rm koch + rm -rf nimcache + rm -rf dist + rm -rf .git + + - name: Run nim-dagger tests + shell: bash + run: | + export UCPU="$cpu" + cd nim-dagger + nimble install -y --depsOnly + nimble test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml deleted file mode 100644 index 1648b34e..00000000 --- a/.github/workflows/test.yml +++ /dev/null @@ -1,15 +0,0 @@ -name: CI - -on: [push, pull_request] - -jobs: - test: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, macOS-latest, windows-latest] - steps: - - uses: actions/checkout@v2 - - uses: iffy/install-nim@v3 - - name: Test - run: nimble test -y diff --git a/.tool-versions b/.tool-versions index 9ed77605..b1180c58 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -nim 1.4.2 +nim 1.2.6 diff --git a/ipfs.nim b/dagger.nim similarity index 100% rename from ipfs.nim rename to dagger.nim diff --git a/dagger.nimble b/dagger.nimble new file mode 100644 index 00000000..30dfcda3 --- /dev/null +++ b/dagger.nimble @@ -0,0 +1,17 @@ +version = "0.1.0" +author = "Dagger Team" +description = "The hardrive for Web3" +license = "MIT" + +requires "nim >= 1.2.6", + "libp2p >= 0.0.2 & < 0.1.0", + "nimcrypto >= 0.4.1", + "bearssl >= 0.1.4", + "chronicles >= 0.7.2", + "chronos >= 2.5.2", + "metrics", + "secp256k1", + "stew#head", + "protobufserialization >= 0.2.0 & < 0.3.0", + "asynctest >= 0.2.1 & < 0.3.0", + "stew" diff --git a/dagger/bitswap.nim b/dagger/bitswap.nim new file mode 100644 index 00000000..0e3d9a4d --- /dev/null +++ b/dagger/bitswap.nim @@ -0,0 +1,183 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/chronicles +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/errors + +import ./bitswap/protobuf/bitswap as pb +import ./blocktype as bt +import ./stores/blockstore +import ./utils/asyncheapqueue + +import ./bitswap/network +import ./bitswap/engine + +export network, blockstore, asyncheapqueue, engine + +logScope: + topics = "dagger bitswap" + +const + DefaultTaskQueueSize = 100 + DefaultConcurrentTasks = 10 + DefaultMaxRetries = 3 + +type + Bitswap* = ref object of BlockStore + engine*: BitswapEngine # bitswap decision engine + taskQueue*: AsyncHeapQueue[BitswapPeerCtx] # peers we're currently processing tasks for + bitswapTasks: seq[Future[void]] # future to control bitswap task + bitswapRunning: bool # indicates if the bitswap task is running + concurrentTasks: int # number of concurrent peers we're serving at any given time + maxRetries: int # max number of tries for a failed block + taskHandler: TaskHandler # handler provided by the engine called by the runner + +proc bitswapTaskRunner(b: Bitswap) {.async.} = + ## process tasks in order of least amount of + ## debt ratio + ## + + while b.bitswapRunning: + let peerCtx = await b.taskQueue.pop() + asyncSpawn b.taskHandler(peerCtx) + + trace "Exiting bitswap task runner" + +proc start*(b: Bitswap) {.async.} = + ## Start the bitswap task + ## + + trace "bitswap start" + + if b.bitswapTasks.len > 0: + warn "Starting bitswap twice" + return + + b.bitswapRunning = true + for i in 0.. 0: + return peer[0] + +proc requestBlocks*( + b: BitswapEngine, + cids: seq[Cid], + timeout = DefaultTimeout): seq[Future[bt.Block]] = + ## Request a block from remotes + ## + + # no Cids to request + if cids.len == 0: + return + + if b.peers.len <= 0: + warn "No peers to request blocks from" + # TODO: run discovery here to get peers for the block + return + + var blocks: seq[Future[bt.Block]] + for c in cids: + if c notin b.pendingBlocks: + # install events to await blocks incoming from different sources + blocks.add( + b.pendingBlocks.addOrAwait(c).wait(timeout)) + + proc cmp(a, b: BitswapPeerCtx): int = + if a.debtRatio == b.debtRatio: + 0 + elif a.debtRatio > b.debtRatio: + 1 + else: + -1 + + # sort the peers so that we request + # the blocks from a peer with the lowest + # debt ratio + var sortedPeers = b.peers.sorted( + cmp + ) + + # get the first peer with at least one (any) + # matching cid + var blockPeer: BitswapPeerCtx + for i, p in sortedPeers: + let has = cids.anyIt( + it in p.peerHave + ) + + if has: + blockPeer = p + break + + # didn't find any peer with matching cids + # use the first one in the sorted array + if isNil(blockPeer): + blockPeer = sortedPeers[0] + + sortedPeers.keepItIf( + it != blockPeer + ) + + trace "Requesting blocks from peer", peer = blockPeer.id, len = cids.len + # request block + b.request.sendWantList( + blockPeer.id, + cids, + wantType = WantType.wantBlock) # we want this remote to send us a block + + if sortedPeers.len == 0: + return blocks # no peers to send wants to + + template sendWants(ctx: BitswapPeerCtx) = + # just send wants + b.request.sendWantList( + ctx.id, + cids.filterIt( it notin ctx.peerHave ), # filter out those that we already know about + wantType = WantType.wantHave) # we only want to know if the peer has the block + + # filter out the peer we've already requested from + var stop = sortedPeers.high + if stop > b.peersPerRequest: stop = b.peersPerRequest + trace "Sending want list requests to remaining peers", count = stop + 1 + for p in sortedPeers[0..stop]: + sendWants(p) + + return blocks + +proc blockPresenceHandler*( + b: BitswapEngine, + peer: PeerID, + presence: seq[BlockPresence]) = + ## Handle block presence + ## + + let peerCtx = b.getPeerCtx(peer) + if isNil(peerCtx): + return + + for blk in presence: + let cid = Cid.init(blk.cid).get() + if cid notin peerCtx.peerHave: + if blk.type == BlockPresenceType.presenceHave: + peerCtx.peerHave.add(cid) + +proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) = + trace "Schedule a task for new blocks" + + let cids = blocks.mapIt( it.cid ) + # schedule any new peers to provide blocks to + for p in b.peers: + for c in cids: # for each cid + # schedule a peer if it wants at least one + # cid and we have it in our local store + if c in p.peerWants and c in b.localStore: + if not b.scheduleTask(p): + trace "Unable to schedule task for peer", peer = p.id + break # do next peer + +proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) = + ## Resolve pending blocks from the pending blocks manager + ## and schedule any new task to be ran + ## + + trace "Resolving blocks" + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + +proc blocksHandler*( + b: BitswapEngine, + peer: PeerID, + blocks: seq[bt.Block]) = + ## handle incoming blocks + ## + + trace "Got blocks from peer", peer, len = blocks.len + b.localStore.putBlocks(blocks) + b.resolveBlocks(blocks) + +proc wantListHandler*( + b: BitswapEngine, + peer: PeerID, + wantList: WantList) = + ## Handle incoming want lists + ## + + trace "Got want list for peer", peer + let peerCtx = b.getPeerCtx(peer) + if isNil(peerCtx): + return + + var dontHaves: seq[Cid] + let entries = wantList.entries + for e in entries: + let idx = peerCtx.peerWants.find(e) + if idx > -1: + # peer doesn't want this block anymore + if e.cancel: + peerCtx.peerWants.del(idx) + continue + + peerCtx.peerWants[idx] = e # update entry + else: + peerCtx.peerWants.add(e) + + trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid + + # peer might want to ask for the same cid with + # different want params + if e.sendDontHave and not(b.localStore.hasBlock(e.cid)): + dontHaves.add(e.cid) + + # send don't have's to remote + if dontHaves.len > 0: + b.request.sendPresence( + peer, + dontHaves.mapIt( + BlockPresence( + cid: it.data.buffer, + `type`: BlockPresenceType.presenceDontHave))) + + if not b.scheduleTask(peerCtx): + trace "Unable to schedule task for peer", peer + +proc setupPeer*(b: BitswapEngine, peer: PeerID) = + ## Perform initial setup, such as want + ## list exchange + ## + + trace "Setting up new peer", peer + if peer notin b.peers: + b.peers.add(BitswapPeerCtx( + id: peer + )) + + # broadcast our want list, the other peer will do the same + if b.wantList.len > 0: + b.request.sendWantList(peer, b.wantList, full = true) + +proc dropPeer*(b: BitswapEngine, peer: PeerID) = + ## Cleanup disconnected peer + ## + + trace "Dropping peer", peer + + # drop the peer from the peers table + b.peers.keepItIf( it.id != peer ) + +proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} = + trace "Handling task for peer", peer = task.id + + var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max) + # get blocks and wants to send to the remote + for e in task.peerWants: + if e.wantType == WantType.wantBlock: + await wantsBlocks.push(e) + + # TODO: There should be all sorts of accounting of + # bytes sent/received here + if wantsBlocks.len > 0: + let blocks = await b.localStore.getBlocks( + wantsBlocks.mapIt( + it.cid + )) + + if blocks.len > 0: + b.request.sendBlocks(task.id, blocks) + + # Remove successfully sent blocks + task.peerWants.keepIf( + proc(e: Entry): bool = + not blocks.anyIt( it.cid == e.cid ) + ) + + var wants: seq[BlockPresence] + # do not remove wants from the queue unless + # we send the block or get a cancel + for e in task.peerWants: + if e.wantType == WantType.wantHave: + wants.add( + BlockPresence( + cid: e.`block`, + `type`: if b.localStore.hasBlock(e.cid): + BlockPresenceType.presenceHave + else: + BlockPresenceType.presenceDontHave + )) + + if wants.len > 0: + b.request.sendPresence(task.id, wants) + +proc new*( + T: type BitswapEngine, + localStore: BlockStore, + request: BitswapRequest = BitswapRequest(), + scheduleTask: TaskScheduler = nil, + peersPerRequest = DefaultMaxPeersPerRequest): T = + + proc taskScheduler(task: BitswapPeerCtx): bool = + if not isNil(scheduleTask): + return scheduleTask(task) + + let b = BitswapEngine( + localStore: localStore, + pendingBlocks: PendingBlocksManager.new(), + peersPerRequest: peersPerRequest, + scheduleTask: taskScheduler, + request: request, + ) + + return b diff --git a/dagger/bitswap/network.nim b/dagger/bitswap/network.nim new file mode 100644 index 00000000..2d52b925 --- /dev/null +++ b/dagger/bitswap/network.nim @@ -0,0 +1,309 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/tables + +import pkg/chronicles +import pkg/chronos + +import pkg/libp2p + +import ../blocktype as bt +import ./protobuf/bitswap as pb +import ./networkpeer + +export pb, networkpeer + +logScope: + topics = "dagger bitswap network" + +const Codec* = "/ipfs/bitswap/1.2.0" + +type + WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.} + BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.} + BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} + + BitswapHandlers* = object + onWantList*: WantListHandler + onBlocks*: BlocksHandler + onPresence*: BlockPresenceHandler + + WantListBroadcaster* = proc( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} + + BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.} + PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.} + + BitswapRequest* = object + sendWantList*: WantListBroadcaster + sendBlocks*: BlocksBroadcaster + sendPresence*: PresenceBroadcaster + + BitswapNetwork* = ref object of LPProtocol + peers*: Table[PeerID, NetworkPeer] + switch*: Switch + handlers*: BitswapHandlers + request*: BitswapRequest + getConn: ConnProvider + +proc handleWantList( + b: BitswapNetwork, + peer: NetworkPeer, + list: WantList) = + ## Handle incoming want list + ## + + if isNil(b.handlers.onWantList): + return + + trace "Handling want list for peer", peer = peer.id + b.handlers.onWantList(peer.id, list) + +# TODO: make into a template +proc makeWantList*( + cids: seq[Cid], + priority: int = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false): WantList = + var entries: seq[Entry] + for cid in cids: + entries.add(Entry( + `block`: cid.data.buffer, + priority: priority.int32, + cancel: cancel, + wantType: wantType, + sendDontHave: sendDontHave)) + + WantList(entries: entries, full: full) + +proc broadcastWantList*( + b: BitswapNetwork, + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) = + ## send a want message to peer + ## + + if id notin b.peers: + return + + trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len + + let wantList = makeWantList( + cids, + priority, + cancel, + wantType, + full, + sendDontHave) + asyncSpawn b.peers[id].send(Message(wantlist: wantList)) + +proc handleBlocks( + b: BitswapNetwork, + peer: NetworkPeer, + blocks: seq[auto]) = + ## Handle incoming blocks + ## + + if isNil(b.handlers.onBlocks): + return + + trace "Handling blocks for peer", peer = peer.id + + var blks: seq[bt.Block] + for blk in blocks: + when blk is pb.Block: + blks.add(bt.Block.new(Cid.init(blk.prefix).get(), blk.data)) + elif blk is seq[byte]: + blks.add(bt.Block.new(Cid.init(blk).get(), blk)) + else: + error("Invalid block type") + + b.handlers.onBlocks(peer.id, blks) + +template makeBlocks*( + blocks: seq[bt.Block]): + seq[pb.Block] = + var blks: seq[pb.Block] + for blk in blocks: + # for now only send bitswap `1.1.0` + blks.add(pb.Block( + prefix: blk.cid.data.buffer, + data: blk.data + )) + + blks + +proc broadcastBlocks*( + b: BitswapNetwork, + id: PeerID, + blocks: seq[bt.Block]) = + ## Send blocks to remote + ## + + if id notin b.peers: + return + + trace "Sending blocks to peer", peer = id, len = blocks.len + asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks))) + +proc handleBlockPresence( + b: BitswapNetwork, + peer: NetworkPeer, + presence: seq[BlockPresence]) = + ## Handle block presence + ## + + if isNil(b.handlers.onPresence): + return + + trace "Handling block presence for peer", peer = peer.id + b.handlers.onPresence(peer.id, presence) + +proc broadcastBlockPresence*( + b: BitswapNetwork, + id: PeerID, + presence: seq[BlockPresence]) = + ## Send presence to remote + ## + + if id notin b.peers: + return + + trace "Sending presence to peer", peer = id + asyncSpawn b.peers[id].send(Message(blockPresences: presence)) + +proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} = + try: + if msg.wantlist.entries.len > 0: + b.handleWantList(peer, msg.wantlist) + + if msg.blocks.len > 0: + b.handleBlocks(peer, msg.blocks) + + if msg.payload.len > 0: + b.handleBlocks(peer, msg.payload) + + if msg.blockPresences.len > 0: + b.handleBlockPresence(peer, msg.blockPresences) + + except CatchableError as exc: + trace "Exception in bitswap rpc handler", exc = exc.msg + +proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer = + ## Creates or retrieves a BitswapNetwork Peer + ## + + if peer in b.peers: + return b.peers[peer] + + var getConn = proc(): Future[Connection] {.async.} = + try: + return await b.switch.dial(peer, Codec) + except CatchableError as exc: + trace "unable to connect to bitswap peer", exc = exc.msg + + if not isNil(b.getConn): + getConn = b.getConn + + let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = + b.rpcHandler(p, msg) + + # create new pubsub peer + let bitSwapPeer = NetworkPeer.new(peer, getConn, rpcHandler) + debug "created new bitswap peer", peer + + b.peers[peer] = bitSwapPeer + + return bitSwapPeer + +proc setupPeer*(b: BitswapNetwork, peer: PeerID) = + ## Perform initial setup, such as want + ## list exchange + ## + + discard b.getOrCreatePeer(peer) + +proc dropPeer*(b: BitswapNetwork, peer: PeerID) = + ## Cleanup disconnected peer + ## + + b.peers.del(peer) + +method init*(b: BitswapNetwork) = + ## Perform protocol initialization + ## + + proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = + if event.kind == PeerEventKind.Joined: + b.setupPeer(peerId) + else: + b.dropPeer(peerId) + + b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) + b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let peerId = conn.peerInfo.peerId + let bitswapPeer = b.getOrCreatePeer(peerId) + await bitswapPeer.readLoop(conn) # attach read loop + + b.handler = handle + b.codec = Codec + +proc new*( + T: type BitswapNetwork, + switch: Switch, + connProvider: ConnProvider = nil): T = + ## Create a new BitswapNetwork instance + ## + + let b = BitswapNetwork( + switch: switch, + getConn: connProvider) + + proc sendWantList( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} = + b.broadcastWantList( + id, cids, priority, cancel, + wantType, full, sendDontHave) + + proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} = + b.broadcastBlocks(id, blocks) + + proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = + b.broadcastBlockPresence(id, presence) + + b.request = BitswapRequest( + sendWantList: sendWantList, + sendBlocks: sendBlocks, + sendPresence: sendPresence, + ) + + b.init() + return b diff --git a/dagger/bitswap/networkpeer.nim b/dagger/bitswap/networkpeer.nim new file mode 100644 index 00000000..aae8fd91 --- /dev/null +++ b/dagger/bitswap/networkpeer.nim @@ -0,0 +1,80 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronicles +import pkg/protobuf_serialization +import pkg/libp2p + +import ./protobuf/bitswap + +logScope: + topics = "dagger bitswap networkpeer" + +const MaxMessageSize = 8 * 1024 * 1024 + +type + RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} + + NetworkPeer* = ref object of RootObj + id*: PeerId + handler*: RPCHandler + sendConn: Connection + getConn: ConnProvider + +proc connected*(b: NetworkPeer): bool = + not(isNil(b.sendConn)) and + not(b.sendConn.closed or b.sendConn.atEof) + +proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = + if isNil(conn): + return + + try: + while not conn.atEof: + let data = await conn.readLp(MaxMessageSize) + let msg: Message = Protobuf.decode(data, Message) + trace "Got message for peer", peer = b.id, msg + await b.handler(b, msg) + except CatchableError as exc: + trace "Exception in bitswap read loop", exc = exc.msg + finally: + await conn.close() + +proc connect*(b: NetworkPeer): Future[Connection] {.async.} = + if b.connected: + return b.sendConn + + b.sendConn = await b.getConn() + asyncSpawn b.readLoop(b.sendConn) + return b.sendConn + +proc send*(b: NetworkPeer, msg: Message) {.async.} = + let conn = await b.connect() + + if isNil(conn): + trace "Unable to get send connection for peer message not sent", peer = b.id + return + + trace "Sending message to remote", peer = b.id, msg = $msg + await conn.writeLp(Protobuf.encode(msg)) + +proc new*( + T: type NetworkPeer, + peer: PeerId, + connProvider: ConnProvider, + rpcHandler: RPCHandler): T = + + doAssert(not isNil(connProvider), + "should supply connection provider") + + NetworkPeer( + id: peer, + getConn: connProvider, + handler: rpcHandler) diff --git a/dagger/bitswap/pendingblocks.nim b/dagger/bitswap/pendingblocks.nim new file mode 100644 index 00000000..af1973e1 --- /dev/null +++ b/dagger/bitswap/pendingblocks.nim @@ -0,0 +1,73 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/tables + +import pkg/chronicles +import pkg/chronos +import pkg/libp2p + +import ../blocktype + +logScope: + topics = "dagger bitswap pendingblocks" + +type + PendingBlocksManager* = ref object of RootObj + blocks*: Table[Cid, Future[Block]] # pending Block requests + +proc addOrAwait*( + p: PendingBlocksManager, + cid: Cid): + Future[Block] {.async.} = + ## Add an event for a block + ## + + if cid notin p.blocks: + p.blocks[cid] = newFuture[Block]() + trace "Adding pending future for block", cid + + let blk = p.blocks[cid] + try: + return await blk + except CancelledError as exc: + trace "Blocks cancelled", exc = exc.msg, cid + raise exc + except CatchableError as exc: + trace "Pending WANT failed or expired", exc = exc.msg + finally: + p.blocks.del(cid) + +proc resolve*( + p: PendingBlocksManager, + blocks: seq[Block]) = + ## Resolve pending blocks + ## + + for blk in blocks: + # resolve any pending blocks + if blk.cid in p.blocks: + let pending = p.blocks[blk.cid] + if not pending.finished: + trace "Resolving block", cid = $blk.cid + pending.complete(blk) + p.blocks.del(blk.cid) + +proc pending*( + p: PendingBlocksManager, + cid: Cid): bool = cid in p.blocks + +proc contains*( + p: PendingBlocksManager, + cid: Cid): bool = p.pending(cid) + +proc new*(T: type PendingBlocksManager): T = + T( + blocks: initTable[Cid, Future[Block]]() + ) diff --git a/dagger/bitswap/protobuf/bitswap.nim b/dagger/bitswap/protobuf/bitswap.nim new file mode 100644 index 00000000..e4498cef --- /dev/null +++ b/dagger/bitswap/protobuf/bitswap.nim @@ -0,0 +1,55 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/hashes +import std/sequtils +import pkg/protobuf_serialization +import pkg/libp2p + +import_proto3 "message.proto" + +export Message +export Wantlist, WantType, Entry +export Block, BlockPresenceType, BlockPresence + +proc hash*(e: Entry): Hash = + hash(e.`block`) + +proc cid*(e: Entry): Cid = + ## Helper to convert raw bytes to Cid + ## + + Cid.init(e.`block`).get() + +proc contains*(a: openArray[Entry], b: Cid): bool = + ## Convenience method to check for peer precense + ## + + a.filterIt( it.cid == b ).len > 0 + +proc `==`*(a: Entry, cid: Cid): bool = + return a.cid == cid + +proc `<`*(a, b: Entry): bool = + a.priority < b.priority + +proc cid*(e: BlockPresence): Cid = + ## Helper to convert raw bytes to Cid + ## + + Cid.init(e.cid).get() + +proc `==`*(a: BlockPresence, cid: Cid): bool = + return cid(a) == cid + +proc contains*(a: openArray[BlockPresence], b: Cid): bool = + ## Convenience method to check for peer precense + ## + + a.filterIt( cid(it) == b ).len > 0 diff --git a/dagger/bitswap/protobuf/message.proto b/dagger/bitswap/protobuf/message.proto new file mode 100644 index 00000000..fc81228e --- /dev/null +++ b/dagger/bitswap/protobuf/message.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package bitswap.message.pb; + +message Message { + + message Wantlist { + enum WantType { + wantBlock = 0; + wantHave = 1; + } + + message Entry { + bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) + int32 priority = 2; // the priority (normalized). default to 1 + bool cancel = 3; // whether this revokes an entry + WantType wantType = 4; // Note: defaults to enum 0, ie Block + bool sendDontHave = 5; // Note: defaults to false + } + + repeated Entry entries = 1; // a list of wantlist entries + bool full = 2; // whether this is the full wantlist. default to false + } + + message Block { + bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) + bytes data = 2; + } + + enum BlockPresenceType { + presenceHave = 0; + presenceDontHave = 1; + } + + message BlockPresence { + bytes cid = 1; + BlockPresenceType type = 2; + } + + Wantlist wantlist = 1; + repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 + repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 + repeated BlockPresence blockPresences = 4; + int32 pendingBytes = 5; +} diff --git a/ipfs/protobuf/message.proto.license b/dagger/bitswap/protobuf/message.proto.license similarity index 100% rename from ipfs/protobuf/message.proto.license rename to dagger/bitswap/protobuf/message.proto.license diff --git a/dagger/blocktype.nim b/dagger/blocktype.nim new file mode 100644 index 00000000..7833a3c4 --- /dev/null +++ b/dagger/blocktype.nim @@ -0,0 +1,55 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/libp2p/multihash +import pkg/libp2p/multicodec +import pkg/libp2p/cid +import pkg/stew/byteutils + +export cid, multihash, multicodec + +type + CidDontMatchError* = object of CatchableError + + Block* = object of RootObj + cid*: Cid + data*: seq[byte] + +proc `$`*(b: Block): string = + result &= "cid: " & $b.cid + result &= "\ndata: " & string.fromBytes(b.data) + +proc new*( + T: type Block, + cid: Cid, + data: openArray[byte] = [], + verify: bool = false): T = + let b = Block.new( + data, + cid.cidver, + cid.mhash.get().mcodec, + cid.mcodec + ) + + if verify and cid != b.cid: + raise newException(CidDontMatchError, + "The suplied Cid doesn't match the data!") + + return b + +proc new*( + T: type Block, + data: openArray[byte] = [], + version = CIDv0, + hcodec = multiCodec("sha2-256"), + codec = multiCodec("dag-pb")): T = + let hash = MultiHash.digest($hcodec, data).get() + Block( + cid: Cid.init(version, codec, hash).get(), + data: @data) diff --git a/dagger/chunker.nim b/dagger/chunker.nim new file mode 100644 index 00000000..e22cebf0 --- /dev/null +++ b/dagger/chunker.nim @@ -0,0 +1,132 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +# TODO: This is super inneficient and merits a rewrite, but it'll do for now + +import std/sequtils + +import ./p2p/rng +import ./blocktype + +export blocktype + +const + DefaultChunkSize*: int64 = 1024 * 256 + +type + # default reader type + Reader* = proc(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.} + + ChunkerType* {.pure.} = enum + SizedChunker + RabinChunker + + Chunker* = ref object of RootObj + reader*: Reader + size*: Natural + pos*: Natural + case kind*: ChunkerType: + of SizedChunker: + chunkSize*: Natural + pad*: bool # pad last block if less than size + of RabinChunker: + discard + +proc getBytes*(c: Chunker): seq[byte] = + ## returns a chunk of bytes from + ## the instantiated chunker + ## + + if c.pos >= c.size: + return + + var bytes = newSeq[byte](c.chunkSize) + let read = c.reader(bytes, c.pos) + c.pos += read + + if not c.pad and bytes.len != read: + bytes.setLen(read) + + return bytes + +iterator items*(c: Chunker): seq[byte] = + while true: + let chunk = c.getBytes() + if chunk.len <= 0: + break + + yield chunk + +proc new*( + T: type Chunker, + kind = ChunkerType.SizedChunker, + reader: Reader, + size: Natural, + chunkSize = DefaultChunkSize, + pad = false): T = + var chunker = Chunker( + kind: kind, + reader: reader, + size: size) + + if kind == ChunkerType.SizedChunker: + chunker.pad = pad + chunker.chunkSize = chunkSize + + return chunker + +proc newRandomChunker*( + rng: Rng, + size: int64, + kind = ChunkerType.SizedChunker, + chunkSize = DefaultChunkSize, + pad = false): Chunker = + ## create a chunker that produces + ## random data + ## + + proc reader(data: var openArray[byte], offset: Natural = 0): int = + var alpha = toSeq(byte('A')..byte('z')) + + var read = 0 + while read <= data.high: + rng.shuffle(alpha) + for a in alpha: + if read > data.high: + break + + data[read] = a + read.inc + + return read + + Chunker.new( + kind = ChunkerType.SizedChunker, + reader = reader, + size = size, + pad = pad, + chunkSize = chunkSize) + +proc newFileChunker*( + file: File, + kind = ChunkerType.SizedChunker, + chunkSize = DefaultChunkSize, + pad = false): Chunker = + ## create the default File chunker + ## + + proc reader(data: var openArray[byte], offset: Natural = 0): int = + return file.readBytes(data, 0, data.len) + + Chunker.new( + kind = ChunkerType.SizedChunker, + reader = reader, + size = file.getFileSize(), + pad = pad, + chunkSize = chunkSize) diff --git a/dagger/p2p/rng.nim b/dagger/p2p/rng.nim new file mode 100644 index 00000000..276d3717 --- /dev/null +++ b/dagger/p2p/rng.nim @@ -0,0 +1,42 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/libp2p/crypto/crypto +import pkg/bearssl + +type + Rng* = ref BrHmacDrbgContext + +var rng {.threadvar.}: Rng + +proc instance*(t: type Rng): Rng = + if rng.isNil: + rng = newRng() + rng + +# Random helpers: similar as in stdlib, but with BrHmacDrbgContext rng +# TODO: Move these somewhere else? +const randMax = 18_446_744_073_709_551_615'u64 + +proc rand*(rng: Rng, max: Natural): int = + if max == 0: return 0 + + var x: uint64 + while true: + brHmacDrbgGenerate(addr rng[], addr x, csize_t(sizeof(x))) + if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias + return int(x mod (uint64(max) + 1'u64)) + +proc sample*[T](rng: Rng, a: openArray[T]): T = + result = a[rng.rand(a.high)] + +proc shuffle*[T](rng: Rng, a: var openArray[T]) = + for i in countdown(a.high, 1): + let j = rng.rand(i) + swap(a[i], a[j]) diff --git a/dagger/stores/blockstore.nim b/dagger/stores/blockstore.nim new file mode 100644 index 00000000..baa3fd19 --- /dev/null +++ b/dagger/stores/blockstore.nim @@ -0,0 +1,79 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sets +import std/sequtils +import chronos +import ../blocktype + +export blocktype + +type + ChangeType* {.pure.} = enum + Added, Removed + + BlockStoreChangeEvt* = object + cids*: seq[Cid] + kind*: ChangeType + + BlocksChangeHandler* = proc(evt: BlockStoreChangeEvt) {.gcsafe, closure.} + + BlockStore* = ref object of RootObj + changeHandlers: array[ChangeType, seq[BlocksChangeHandler]] + +proc addChangeHandler*( + s: BlockStore, + handler: BlocksChangeHandler, + changeType: ChangeType) = + s.changeHandlers[changeType].add(handler) + +proc removeChangeHandler*( + s: BlockStore, + handler: BlocksChangeHandler, + changeType: ChangeType) = + s.changeHandlers[changeType].keepItIf( it != handler ) + +proc triggerChange( + s: BlockStore, + changeType: ChangeType, + cids: seq[Cid]) = + let evt = BlockStoreChangeEvt( + kind: changeType, + cids: cids, + ) + + for handler in s.changeHandlers[changeType]: + handler(evt) + +method getBlocks*(b: BlockStore, cid: seq[Cid]): Future[seq[Block]] {.base.} = + ## Get a block from the stores + ## + + doAssert(false, "Not implemented!") + +method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} = + ## Check if the block exists in the blockstore + ## + + return false + +method putBlocks*(s: BlockStore, blocks: seq[Block]) {.base.} = + ## Put a block to the blockstore + ## + + s.triggerChange(ChangeType.Added, blocks.mapIt( it.cid )) + +method delBlocks*(s: BlockStore, blocks: seq[Cid]) {.base.} = + ## Delete a block/s from the block store + ## + + s.triggerChange(ChangeType.Removed, blocks) + +proc contains*(s: BlockStore, blk: Cid): bool = + s.hasBlock(blk) diff --git a/dagger/stores/manager.nim b/dagger/stores/manager.nim new file mode 100644 index 00000000..1c80f4fc --- /dev/null +++ b/dagger/stores/manager.nim @@ -0,0 +1,45 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/chronos +import ./blockstore + +type + BlockStoreManager* = ref object of BlockStore + stores: seq[BlockStore] + +proc addProvider*(b: BlockStoreManager, provider: BlockStore) = + b.stores.add(provider) + +proc removeProvider*(b: BlockStoreManager, provider: BlockStore) = + b.stores.keepItIf( it != provider ) + +method addChangeHandler*( + s: BlockStoreManager, + handler: BlocksChangeHandler, + changeType: ChangeType) = + ## Add change handler to all registered + ## block stores + ## + + for p in s.stores: + p.addChangeHandler(handler, changeType) + +method removeChangeHandler*( + s: BlockStoreManager, + handler: BlocksChangeHandler, + changeType: ChangeType) = + ## Remove change handler from all registered + ## block stores + ## + + for p in s.stores: + p.removeChangeHandler(handler, changeType) diff --git a/dagger/stores/memorystore.nim b/dagger/stores/memorystore.nim new file mode 100644 index 00000000..97cc3ad2 --- /dev/null +++ b/dagger/stores/memorystore.nim @@ -0,0 +1,60 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import pkg/chronos +import pkg/libp2p + +import ../stores/blockstore +import ../blocktype + +export blockstore + +type + MemoryStore* = ref object of BlockStore + blocks: seq[Block] # TODO: Should be an LRU cache + +method getBlocks*( + s: MemoryStore, + cids: seq[Cid]): Future[seq[Block]] {.async.} = + ## Get a block from the stores + ## + + var res: seq[Block] + for c in cids: + res.add(s.blocks.filterIt( it.cid == c )) + + return res + +method hasBlock*(s: MemoryStore, cid: Cid): bool = + ## check if the block exists + ## + + s.blocks.filterIt( it.cid == cid ).len > 0 + +method putBlocks*(s: MemoryStore, blocks: seq[Block]) = + ## Put a block to the blockstore + ## + + s.blocks.add(blocks) + procCall BlockStore(s).putBlocks(blocks) + +method delBlocks*(s: MemoryStore, cids: seq[Cid]) = + ## delete a block/s from the block store + ## + + for c in cids: + s.blocks.keepItIf( it.cid != c ) + + procCall BlockStore(s).delBlocks(cids) + +proc new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore = + MemoryStore( + blocks: @blocks + ) diff --git a/dagger/utils/asyncheapqueue.nim b/dagger/utils/asyncheapqueue.nim new file mode 100644 index 00000000..efbc7a66 --- /dev/null +++ b/dagger/utils/asyncheapqueue.nim @@ -0,0 +1,326 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import pkg/chronos +import pkg/stew/results + +# Based on chronos AsyncHeapQueue and std/heapqueue + +type + QueueType* {.pure.} = enum + Min, Max + + AsyncHeapQueue*[T] = ref object of RootRef + ## A priority queue + ## + ## If ``maxsize`` is less than or equal to zero, the queue size is + ## infinite. If it is an integer greater than ``0``, then "await put()" + ## will block when the queue reaches ``maxsize``, until an item is + ## removed by "await get()". + queueType: QueueType + getters: seq[Future[void]] + putters: seq[Future[void]] + queue: seq[T] + maxsize: int + + AsyncHQErrors* {.pure.} = enum + Empty, Full + +proc newAsyncHeapQueue*[T]( + maxsize: int = 0, + queueType: QueueType = QueueType.Min): AsyncHeapQueue[T] = + ## Creates a new asynchronous queue ``AsyncHeapQueue``. + ## + + AsyncHeapQueue[T]( + getters: newSeq[Future[void]](), + putters: newSeq[Future[void]](), + queue: newSeqOfCap[T](maxsize), + maxsize: maxsize, + queueType: queueType, + ) + +proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} = + var i = 0 + while i < len(waiters): + var waiter = waiters[i] + inc(i) + + if not(waiter.finished()): + waiter.complete() + break + + if i > 0: + waiters.delete(0, i - 1) + +proc heapCmp[T](x, y: T, max: bool = false): bool {.inline.} = + if max: + return (y < x) + else: + return (x < y) + +proc siftdown[T](heap: AsyncHeapQueue[T], startpos, p: int) = + ## 'heap' is a heap at all indices >= startpos, except + ## possibly for pos. pos is the index of a leaf with a + ## possibly out-of-order value. Restore the heap invariant. + ## + + var pos = p + var newitem = heap[pos] + # Follow the path to the root, moving parents down until + # finding a place newitem fits. + while pos > startpos: + let parentpos = (pos - 1) shr 1 + let parent = heap[parentpos] + if heapCmp(newitem, parent, heap.queueType == QueueType.Max): + heap.queue[pos] = parent + pos = parentpos + else: + break + heap.queue[pos] = newitem + +proc siftup[T](heap: AsyncHeapQueue[T], p: int) = + let endpos = len(heap) + var pos = p + let startpos = pos + let newitem = heap[pos] + # Bubble up the smaller child until hitting a leaf. + var childpos = 2*pos + 1 # leftmost child position + while childpos < endpos: + # Set childpos to index of smaller child. + let rightpos = childpos + 1 + if rightpos < endpos and + not heapCmp(heap[childpos], heap[rightpos], heap.queueType == QueueType.Max): + childpos = rightpos + # Move the smaller child up. + heap.queue[pos] = heap[childpos] + pos = childpos + childpos = 2*pos + 1 + # The leaf at pos is empty now. Put newitem there, and bubble it up + # to its final resting place (by sifting its parents down). + heap.queue[pos] = newitem + siftdown(heap, startpos, pos) + +proc full*[T](heap: AsyncHeapQueue[T]): bool {.inline.} = + ## Return ``true`` if there are ``maxsize`` items in the queue. + ## + ## Note: If the ``heap`` was initialized with ``maxsize = 0`` (default), + ## then ``full()`` is never ``true``. + if heap.maxsize <= 0: + false + else: + (len(heap.queue) >= heap.maxsize) + +proc empty*[T](heap: AsyncHeapQueue[T]): bool {.inline.} = + ## Return ``true`` if the queue is empty, ``false`` otherwise. + (len(heap.queue) == 0) + +proc pushNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, AsyncHQErrors] = + ## Push `item` onto heap, maintaining the heap invariant. + ## + + if heap.full(): + return err(AsyncHQErrors.Full) + + heap.queue.add(item) + siftdown(heap, 0, len(heap)-1) + heap.getters.wakeupNext() + + return ok() + +proc push*[T](heap: AsyncHeapQueue[T], item: T) {.async, gcsafe.} = + ## Push item into the queue, awaiting for an available slot + ## when it's full + ## + + while heap.full(): + var putter = newFuture[void]("AsyncHeapQueue.push") + heap.putters.add(putter) + try: + await putter + except CatchableError as exc: + if not(heap.full()) and not(putter.cancelled()): + heap.putters.wakeupNext() + raise exc + + heap.pushNoWait(item).tryGet() + +proc popNoWait*[T](heap: AsyncHeapQueue[T]): Result[T, AsyncHQErrors] = + ## Pop and return the smallest item from `heap`, + ## maintaining the heap invariant. + ## + + if heap.empty(): + return err(AsyncHQErrors.Empty) + + let lastelt = heap.queue.pop() + if heap.len > 0: + result = ok(heap[0]) + heap.queue[0] = lastelt + siftup(heap, 0) + else: + result = ok(lastelt) + + heap.putters.wakeupNext() + +proc pop*[T](heap: AsyncHeapQueue[T]): Future[T] {.async.} = + ## Remove and return an ``item`` from the beginning of the queue ``heap``. + ## If the queue is empty, wait until an item is available. + while heap.empty(): + var getter = newFuture[void]("AsyncHeapQueue.pop") + heap.getters.add(getter) + try: + await getter + except CatchableError as exc: + if not(heap.empty()) and not(getter.cancelled()): + heap.getters.wakeupNext() + raise exc + + return heap.popNoWait().tryGet() + +proc del*[T](heap: AsyncHeapQueue[T], index: Natural) = + ## Removes the element at `index` from `heap`, + ## maintaining the heap invariant. + ## + + if heap.empty(): + return + + swap(heap.queue[^1], heap.queue[index]) + let newLen = heap.len - 1 + heap.queue.setLen(newLen) + if index < newLen: + heap.siftup(index) + + heap.putters.wakeupNext() + +proc delete*[T](heap: AsyncHeapQueue[T], item: T) = + ## Find and delete an `item` from the `heap` + ## + + let index = heap.find(item) + if index > -1: + heap.del(index) + +proc update*[T](heap: AsyncHeapQueue[T], item: T): bool = + ## Update an entry in the heap by reshufling its + ## possition, maintaining the heap invariant. + ## + + let index = heap.find(item) + if index > -1: + # replace item with new one in case it's a copy + heap.queue[index] = item + # re-establish heap order + # TODO: don't start at 0 to avoid reshuffling + # entire heap + heap.siftup(0) + return true + +proc pushOrUpdateNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, AsyncHQErrors] = + ## Update an item if it exists or push a new one + ## + + if heap.update(item): + return ok() + + return heap.pushNoWait(item) + +proc pushOrUpdate*[T](heap: AsyncHeapQueue[T], item: T) {.async.} = + ## Update an item if it exists or push a new one + ## awaiting until a slot becomes available + ## + + if not heap.update(item): + await heap.push(item) + +proc replace*[T](heap: AsyncHeapQueue[T], item: T): Result[T, AsyncHQErrors] = + ## Pop and return the current smallest value, and add the new item. + ## This is more efficient than pop() followed by push(), and can be + ## more appropriate when using a fixed-size heap. Note that the value + ## returned may be larger than item! That constrains reasonable uses of + ## this routine unless written as part of a conditional replacement: + ## + ## .. code-block:: nim + ## if item > heap[0]: + ## item = replace(heap, item) + ## + + if heap.empty(): + error(AsyncHQErrors.Empty) + + result = heap[0] + heap.queue[0] = item + siftup(heap, 0) + +proc pushPopNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[T, AsyncHQErrors] = + ## Fast version of a push followed by a pop. + ## + + if heap.empty(): + err(AsyncHQErrors.Empty) + + if heap.len > 0 and heapCmp(heap[0], item, heap.queueType == QueueType.Max): + swap(item, heap[0]) + siftup(heap, 0) + return item + +proc clear*[T](heap: AsyncHeapQueue[T]) {.inline.} = + ## Clears all elements of queue ``heap``. + heap.queue.setLen(0) + +proc len*[T](heap: AsyncHeapQueue[T]): int {.inline.} = + ## Return the number of elements in ``heap``. + len(heap.queue) + +proc size*[T](heap: AsyncHeapQueue[T]): int {.inline.} = + ## Return the maximum number of elements in ``heap``. + len(heap.maxsize) + +proc `[]`*[T](heap: AsyncHeapQueue[T], i: Natural) : T {.inline.} = + ## Access the i-th element of ``heap`` by order from first to last. + ## ``heap[0]`` is the first element, ``heap[^1]`` is the last element. + heap.queue[i] + +proc `[]`*[T](heap: AsyncHeapQueue[T], i: BackwardsIndex) : T {.inline.} = + ## Access the i-th element of ``heap`` by order from first to last. + ## ``heap[0]`` is the first element, ``heap[^1]`` is the last element. + heap.queue[len(heap.queue) - int(i)] + +iterator items*[T](heap: AsyncHeapQueue[T]): T {.inline.} = + ## Yield every element of ``heap``. + for item in heap.queue.items(): + yield item + +iterator mitems*[T](heap: AsyncHeapQueue[T]): var T {.inline.} = + ## Yield every element of ``heap``. + for mitem in heap.queue.mitems(): + yield mitem + +iterator pairs*[T](heap: AsyncHeapQueue[T]): tuple[key: int, val: T] {.inline.} = + ## Yield every (position, value) of ``heap``. + for pair in heap.queue.pairs(): + yield pair + +proc contains*[T](heap: AsyncHeapQueue[T], item: T): bool {.inline.} = + ## Return true if ``item`` is in ``heap`` or false if not found. Usually used + ## via the ``in`` operator. + for e in heap.queue.items(): + if e == item: return true + return false + +proc `$`*[T](heap: AsyncHeapQueue[T]): string = + ## Turn an async queue ``heap`` into its string representation. + var res = "[" + for item in heap.queue.items(): + if len(res) > 1: res.add(", ") + res.addQuoted(item) + res.add("]") + res diff --git a/ipfs.nimble b/ipfs.nimble deleted file mode 100644 index 3eb02dc7..00000000 --- a/ipfs.nimble +++ /dev/null @@ -1,10 +0,0 @@ -version = "0.1.0" -author = "Dagger Team" -description = "Decentralized storage in Nim" -license = "MIT" - -requires "nim >= 1.4.2 & < 2.0.0" -requires "libp2p >= 0.0.2 & < 0.1.0" -requires "chronos >= 2.5.2 & < 3.0.0" -requires "protobufserialization >= 0.2.0 & < 0.3.0" -requires "asynctest >= 0.2.1 & < 0.3.0" diff --git a/ipfs/bitswap.nim b/ipfs/bitswap.nim deleted file mode 100644 index 689c6e51..00000000 --- a/ipfs/bitswap.nim +++ /dev/null @@ -1,50 +0,0 @@ -import std/options -import pkg/chronos -import pkg/libp2p/cid -import ./ipfsobject -import ./repo -import ./p2p/switch -import ./bitswap/protocol -import ./bitswap/exchange - -export options -export Cid -export Switch - -type - Bitswap* = ref object - repo: Repo - switch: Switch - exchanges: seq[Exchange] # TODO: never cleaned - -proc startExchange(bitswap: Bitswap, stream: BitswapStream) = - let exchange = Exchange.start(bitswap.repo, stream) - bitswap.exchanges.add(exchange) - -proc start*(_: type Bitswap, switch: Switch, repo = Repo()): Bitswap = - let bitswap = Bitswap(repo: repo, switch: switch) - let protocol = BitswapProtocol.new() - proc acceptLoop {.async.} = - while true: - let stream = await protocol.accept() - bitswap.startExchange(stream) - asyncSpawn acceptLoop() - switch.mount(protocol) - bitswap - -proc connect*(bitswap: Bitswap, peer: PeerInfo) {.async.} = - let stream = await bitswap.switch.dial(peer, BitswapProtocol) - bitswap.startExchange(stream) - -proc store*(bitswap: Bitswap, obj: IpfsObject) = - bitswap.repo.store(obj) - -proc retrieve*(bitswap: Bitswap, - cid: Cid, - timeout = 30.seconds): Future[Option[IpfsObject]] {.async.} = - result = bitswap.repo.retrieve(cid) - if result.isNone: - for exchange in bitswap.exchanges: - await exchange.want(cid) - await bitswap.repo.wait(cid, timeout) - result = bitswap.repo.retrieve(cid) diff --git a/ipfs/bitswap/exchange.nim b/ipfs/bitswap/exchange.nim deleted file mode 100644 index 4c32c1d8..00000000 --- a/ipfs/bitswap/exchange.nim +++ /dev/null @@ -1,38 +0,0 @@ -import pkg/chronos -import pkg/libp2p/cid -import ../repo -import ./stream -import ./messages - -type Exchange* = object - repo: Repo - stream: BitswapStream - -proc want*(exchange: Exchange, cid: Cid) {.async.} = - await exchange.stream.write(Message.want(cid)) - -proc send*(exchange: Exchange, obj: IpfsObject) {.async.} = - await exchange.stream.write(Message.send(obj.data)) - -proc handlePayload(exchange: Exchange, message: Message) {.async.} = - for bloc in message.payload: - let obj = IpfsObject(data: bloc.data) - exchange.repo.store(obj) - -proc handleWants(exchange: Exchange, message: Message) {.async.} = - for want in message.wantlist.entries: - let cid = Cid.init(want.`block`).get() - let obj = exchange.repo.retrieve(cid) - if obj.isSome: - await exchange.send(obj.get()) - -proc readLoop(exchange: Exchange) {.async.} = - while true: - let message = await exchange.stream.read() - await exchange.handlePayload(message) - await exchange.handleWants(message) - -proc start*(_: type Exchange, repo: Repo, stream: BitswapStream): Exchange = - let exchange = Exchange(repo: repo, stream: stream) - asyncSpawn exchange.readLoop() - exchange diff --git a/ipfs/bitswap/messages.nim b/ipfs/bitswap/messages.nim deleted file mode 100644 index b8395f54..00000000 --- a/ipfs/bitswap/messages.nim +++ /dev/null @@ -1,15 +0,0 @@ -import pkg/libp2p -import ../protobuf/bitswap - -export Cid -export Message - -proc want*(t: type Message, cids: varargs[Cid]): Message = - for cid in cids: - let entry = Entry(`block`: cid.data.buffer) - result.wantlist.entries.add(entry) - -proc send*(t: type Message, blocks: varargs[seq[byte]]): Message = - for data in blocks: - let bloc = Block(data: data) - result.payload.add(bloc) diff --git a/ipfs/bitswap/protocol.nim b/ipfs/bitswap/protocol.nim deleted file mode 100644 index cf2fc6b1..00000000 --- a/ipfs/bitswap/protocol.nim +++ /dev/null @@ -1,33 +0,0 @@ -import pkg/chronos -import pkg/libp2p/switch -import pkg/libp2p/stream/connection -import pkg/libp2p/protocols/protocol -import ./stream - -export stream except readLoop - -const Codec = "/ipfs/bitswap/1.2.0" - -type - BitswapProtocol* = ref object of LPProtocol - connections: AsyncQueue[BitswapStream] - -proc new*(t: type BitswapProtocol): BitswapProtocol = - let connections = newAsyncQueue[BitswapStream](1) - proc handle(connection: Connection, proto: string) {.async.} = - let stream = BitswapStream.new(connection) - await connections.put(stream) - await stream.readLoop() - BitswapProtocol(connections: connections, codecs: @[Codec], handler: handle) - -proc dial*(switch: Switch, - peer: PeerInfo, - t: type BitswapProtocol): - Future[BitswapStream] {.async.} = - let connection = await switch.dial(peer.peerId, peer.addrs, Codec) - let stream = BitswapStream.new(connection) - asyncSpawn stream.readLoop() - result = stream - -proc accept*(bitswap: BitswapProtocol): Future[BitswapStream] {.async.} = - result = await bitswap.connections.get() diff --git a/ipfs/bitswap/stream.nim b/ipfs/bitswap/stream.nim deleted file mode 100644 index dd1bbf88..00000000 --- a/ipfs/bitswap/stream.nim +++ /dev/null @@ -1,38 +0,0 @@ -import pkg/chronos -import pkg/protobuf_serialization -import pkg/libp2p/stream/connection -import ./messages - -export messages - -const MaxMessageSize = 8 * 1024 * 1024 - -type - BitswapStream* = ref object - bytestream: LpStream - messages: AsyncQueue[Message] - -proc new*(t: type BitswapStream, bytestream: LpStream): BitswapStream = - BitswapStream(bytestream: bytestream, messages: newAsyncQueue[Message](1)) - -proc readOnce(stream: BitswapStream) {.async.} = - let encoded = await stream.bytestream.readLp(MaxMessageSize) - let message = Protobuf.decode(encoded, Message) - await stream.messages.put(message) - -proc readLoop*(stream: BitswapStream) {.async.} = - while true: - try: - await stream.readOnce() - except LPStreamEOFError: - break - -proc write*(stream: BitswapStream, message: Message) {.async.} = - let encoded = Protobuf.encode(message) - await stream.bytestream.writeLp(encoded) - -proc read*(stream: BitswapStream): Future[Message] {.async.} = - result = await stream.messages.get() - -proc close*(stream: BitswapStream) {.async.} = - await stream.bytestream.close() diff --git a/ipfs/chunking.nim b/ipfs/chunking.nim deleted file mode 100644 index 59e601d8..00000000 --- a/ipfs/chunking.nim +++ /dev/null @@ -1,10 +0,0 @@ -import ./ipfsobject - -export ipfsobject - -proc createObject*(file: File): IpfsObject = - let contents = file.readAll() - IpfsObject(data: cast[seq[byte]](contents)) - -proc writeToFile*(obj: IpfsObject, output: File) = - output.write(cast[string](obj.data)) diff --git a/ipfs/dht/routing.nim b/ipfs/dht/routing.nim deleted file mode 100644 index 5a4a1632..00000000 --- a/ipfs/dht/routing.nim +++ /dev/null @@ -1,15 +0,0 @@ -import pkg/libp2p/peerinfo -import pkg/libp2p/cid - -export peerinfo -export cid - -type - RoutingTable* = object - peers: seq[PeerInfo] - -proc add*(table: var RoutingTable, peer: PeerInfo) = - table.peers.add(peer) - -proc closest*(table: RoutingTable, id: Cid): seq[PeerInfo] = - table.peers diff --git a/ipfs/ipfsobject.nim b/ipfs/ipfsobject.nim deleted file mode 100644 index b4b46a06..00000000 --- a/ipfs/ipfsobject.nim +++ /dev/null @@ -1,14 +0,0 @@ -import pkg/libp2p/multihash -import pkg/libp2p/multicodec -import pkg/libp2p/cid - -export cid - -type - IpfsObject* = object - data*: seq[byte] - -proc cid*(obj: IpfsObject): Cid = - let codec = multiCodec("dag-pb") - let hash = MultiHash.digest("sha2-256", obj.data).get() - Cid.init(CIDv0, codec, hash).get() diff --git a/ipfs/p2p/rng.nim b/ipfs/p2p/rng.nim deleted file mode 100644 index 5fb187e3..00000000 --- a/ipfs/p2p/rng.nim +++ /dev/null @@ -1,13 +0,0 @@ -import pkg/libp2p/crypto/crypto -import pkg/bearssl - -type - Rng* = RandomNumberGenerator - RandomNumberGenerator = ref BrHmacDrbgContext - -var rng {.threadvar.}: Rng - -proc instance*(t: type Rng): Rng = - if rng.isNil: - rng = newRng() - rng diff --git a/ipfs/p2p/switch.nim b/ipfs/p2p/switch.nim deleted file mode 100644 index 8c67b7ae..00000000 --- a/ipfs/p2p/switch.nim +++ /dev/null @@ -1,32 +0,0 @@ -import std/tables -import pkg/chronos -import pkg/libp2p/switch -import pkg/libp2p/crypto/crypto -import pkg/libp2p/peerinfo -import pkg/libp2p/protocols/identify -import pkg/libp2p/stream/connection -import pkg/libp2p/muxers/muxer -import pkg/libp2p/muxers/mplex/mplex -import pkg/libp2p/transports/transport -import pkg/libp2p/transports/tcptransport -import pkg/libp2p/protocols/secure/secure -import pkg/libp2p/protocols/secure/noise -import pkg/libp2p/protocols/secure/secio -import ./rng - -export switch - -proc create*(t: type Switch): Switch = - - proc createMplex(conn: Connection): Muxer = - Mplex.init(conn) - - let privateKey = PrivateKey.random(Ed25519, Rng.instance[]).get() - let peerInfo = PeerInfo.init(privateKey) - let identify = newIdentify(peerInfo) - let mplexProvider = newMuxerProvider(createMplex, MplexCodec) - let transports = @[Transport(TcpTransport.init({ReuseAddr}))] - let muxers = [(MplexCodec, mplexProvider)].toTable - let secureManagers = [Secure(newNoise(Rng.instance, privateKey))] - - newSwitch(peerInfo, transports, identify, muxers, secureManagers) diff --git a/ipfs/protobuf/bitswap.nim b/ipfs/protobuf/bitswap.nim deleted file mode 100644 index 898eb16a..00000000 --- a/ipfs/protobuf/bitswap.nim +++ /dev/null @@ -1,7 +0,0 @@ -import pkg/protobuf_serialization - -import_proto3 "message.proto" - -export Message -export Wantlist, WantType, Entry -export Block, BlockPresenceType, BlockPresence diff --git a/ipfs/protobuf/message.proto b/ipfs/protobuf/message.proto deleted file mode 100644 index 3b702c69..00000000 --- a/ipfs/protobuf/message.proto +++ /dev/null @@ -1,44 +0,0 @@ -syntax = "proto3"; - -package bitswap.message.pb; - -message Message { - - message Wantlist { - enum WantType { - wantBlock = 0; - wantHave = 1; - } - - message Entry { - bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) - int32 priority = 2; // the priority (normalized). default to 1 - bool cancel = 3; // whether this revokes an entry - WantType wantType = 4; // Note: defaults to enum 0, ie Block - bool sendDontHave = 5; // Note: defaults to false - } - - repeated Entry entries = 1; // a list of wantlist entries - bool full = 2; // whether this is the full wantlist. default to false - } - - message Block { - bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) - bytes data = 2; - } - - enum BlockPresenceType { - presenceHave = 0; - presenceDontHave = 1; - } - message BlockPresence { - bytes cid = 1; - BlockPresenceType type = 2; - } - - Wantlist wantlist = 1; - repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 - repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 - repeated BlockPresence blockPresences = 4; - int32 pendingBytes = 5; -} diff --git a/ipfs/repo.nim b/ipfs/repo.nim deleted file mode 100644 index dc4412ea..00000000 --- a/ipfs/repo.nim +++ /dev/null @@ -1,41 +0,0 @@ -import std/options -import std/tables -import std/hashes -import pkg/chronos -import pkg/libp2p -import ./ipfsobject -import ./repo/waitinglist - -export options -export ipfsobject - -type - Repo* = ref object - storage: Table[Cid, IpfsObject] - waiting: WaitingList[Cid] - -proc hash(id: Cid): Hash = - hash($id) - -proc store*(repo: Repo, obj: IpfsObject) = - let id = obj.cid - repo.storage[id] = obj - repo.waiting.deliver(id) - -proc contains*(repo: Repo, id: Cid): bool = - repo.storage.hasKey(id) - -proc retrieve*(repo: Repo, id: Cid): Option[IpfsObject] = - if repo.contains(id): - repo.storage[id].some - else: - IpfsObject.none - -proc wait*(repo: Repo, id: Cid, timeout: Duration): Future[void] = - var future: Future[void] - if repo.contains(id): - future = newFuture[void]() - future.complete() - else: - future = repo.waiting.wait(id, timeout) - future diff --git a/ipfs/repo/waitinglist.nim b/ipfs/repo/waitinglist.nim deleted file mode 100644 index ab7a6786..00000000 --- a/ipfs/repo/waitinglist.nim +++ /dev/null @@ -1,20 +0,0 @@ -import std/tables -import pkg/chronos - -type WaitingList*[T] = object - futures: Table[T, seq[Future[void]]] - -proc wait*[T](list: var WaitingList, item: T, timeout: Duration): Future[void] = - let future = newFuture[void]("waitinglist.wait") - proc onTimeout(_: pointer) = - if not future.finished: - future.complete() - discard setTimer(Moment.fromNow(timeout), onTimeout, nil) - list.futures.mgetOrPut(item, @[]).add(future) - future - -proc deliver*[T](list: var WaitingList, item: T) = - if list.futures.hasKey(item): - for future in list.futures[item]: - future.complete() - list.futures.del(item) diff --git a/tests/dagger/bitswap/testbitswap.nim b/tests/dagger/bitswap/testbitswap.nim new file mode 100644 index 00000000..ae3a41b5 --- /dev/null +++ b/tests/dagger/bitswap/testbitswap.nim @@ -0,0 +1,201 @@ + +import std/sets +import std/sequtils +import std/algorithm + +import pkg/asynctest +import pkg/chronos +import pkg/stew/byteutils +import pkg/libp2p +import pkg/libp2p/errors + +import pkg/dagger/p2p/rng +import pkg/dagger/bitswap +import pkg/dagger/stores/memorystore +import pkg/dagger/chunker +import pkg/dagger/blocktype as bt +import pkg/dagger/utils/asyncheapqueue + +import ./utils +import ../helpers + +suite "Bitswap engine - 2 nodes": + + let + chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + blocks1 = chunker1.mapIt( bt.Block.new(it) ) + chunker2 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + blocks2 = chunker2.mapIt( bt.Block.new(it) ) + + var + switch1, switch2: Switch + network1, network2: BitswapNetwork + bitswap1, bitswap2: Bitswap + awaiters: seq[Future[void]] + peerId1, peerId2: PeerID + peerCtx1, peerCtx2: BitswapPeerCtx + done: Future[void] + + setup: + done = newFuture[void]() + + switch1 = newStandardSwitch() + switch2 = newStandardSwitch() + awaiters.add(await switch1.start()) + awaiters.add(await switch2.start()) + + peerId1 = switch1.peerInfo.peerId + peerId2 = switch2.peerInfo.peerId + + network1 = BitswapNetwork.new(switch = switch1) + bitswap1 = Bitswap.new(MemoryStore.new(blocks1), network1) + switch1.mount(network1) + + network2 = BitswapNetwork.new(switch = switch2) + bitswap2 = Bitswap.new(MemoryStore.new(blocks2), network2) + switch2.mount(network2) + + await allFuturesThrowing( + bitswap1.start(), + bitswap2.start(), + ) + + # initialize our want lists + bitswap1.engine.wantList = blocks2.mapIt( it.cid ) + bitswap2.engine.wantList = blocks1.mapIt( it.cid ) + + await switch1.connect( + switch2.peerInfo.peerId, + switch2.peerInfo.addrs) + + await sleepAsync(1.seconds) # give some time to exchange lists + peerCtx2 = bitswap1.engine.getPeerCtx(peerId2) + peerCtx1 = bitswap2.engine.getPeerCtx(peerId1) + + teardown: + await allFuturesThrowing( + bitswap1.stop(), + bitswap2.stop(), + switch1.stop(), + switch2.stop()) + + await allFuturesThrowing(awaiters) + + test "should exchange want lists on connect": + check not isNil(peerCtx1) + check not isNil(peerCtx2) + + check: + peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == + bitswap2.engine.wantList.mapIt( $it ).sorted(cmp[string]) + + peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == + bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string]) + + test "should send want-have for block": + let blk = bt.Block.new("Block 1".toBytes) + bitswap2.engine.localStore.putBlocks(@[blk]) + + let entry = Entry( + `block`: blk.cid.data.buffer, + priority: 1, + cancel: false, + wantType: WantType.wantBlock, + sendDontHave: false) + + peerCtx1.peerWants.add(entry) + check bitswap2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk + await sleepAsync(100.millis) + + check bitswap1.engine.localStore.hasBlock(blk.cid) + + test "should get blocks from remote": + let blocks = await bitswap1.getBlocks(blocks2.mapIt( it.cid )) + check blocks == blocks2 + + test "remote should send blocks when available": + let blk = bt.Block.new("Block 1".toBytes) + + # should fail retrieving block from remote + check not await bitswap1.getBlocks(@[blk.cid]) + .withTimeout(100.millis) # should expire + + proc onBlocks(evt: BlockStoreChangeEvt) = + check evt.cids == @[blk.cid] + done.complete() + + bitswap1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added) + + # first put the required block in the local store + bitswap2.engine.localStore.putBlocks(@[blk]) + + # second trigger bitswap to resolve any pending requests + # for the block + bitswap2.putBlocks(@[blk]) + + await done + +suite "Bitswap - multiple nodes": + let + chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + switch: seq[Switch] + bitswap: seq[Bitswap] + awaiters: seq[Future[void]] + + setup: + for e in generateNodes(5): + switch.add(e.switch) + bitswap.add(e.bitswap) + await e.bitswap.start() + + awaiters = switch.mapIt( + (await it.start()) + ).concat() + + teardown: + await allFuturesThrowing( + switch.mapIt( it.stop() ) + ) + + await allFuturesThrowing(awaiters) + + test "should receive haves for own want list": + let + downloader = bitswap[4] + engine = downloader.engine + + # Add blocks from 1st peer to want list + engine.wantList &= blocks[0..3].mapIt( it.cid ) + engine.wantList &= blocks[12..15].mapIt( it.cid ) + + bitswap[0].engine.localStore.putBlocks(blocks[0..3]) + bitswap[1].engine.localStore.putBlocks(blocks[4..7]) + bitswap[2].engine.localStore.putBlocks(blocks[8..11]) + bitswap[3].engine.localStore.putBlocks(blocks[12..15]) + + await connectNodes(switch) + + await sleepAsync(1.seconds) + check engine.peers[0].peerHave == blocks[0..3].mapIt( it.cid ) + check engine.peers[3].peerHave == blocks[12..15].mapIt( it.cid ) + + test "should exchange blocks with multiple nodes": + let + downloader = bitswap[4] + engine = downloader.engine + + # Add blocks from 1st peer to want list + engine.wantList &= blocks[0..3].mapIt( it.cid ) + engine.wantList &= blocks[12..15].mapIt( it.cid ) + + bitswap[0].engine.localStore.putBlocks(blocks[0..3]) + bitswap[1].engine.localStore.putBlocks(blocks[4..7]) + bitswap[2].engine.localStore.putBlocks(blocks[8..11]) + bitswap[3].engine.localStore.putBlocks(blocks[12..15]) + + await connectNodes(switch) + let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid )) + check wantListBlocks == blocks[0..3] diff --git a/tests/dagger/bitswap/testengine.nim b/tests/dagger/bitswap/testengine.nim new file mode 100644 index 00000000..de6209f5 --- /dev/null +++ b/tests/dagger/bitswap/testengine.nim @@ -0,0 +1,346 @@ +import std/sequtils + +import pkg/stew/byteutils +import pkg/asynctest +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/errors + +import pkg/dagger/p2p/rng +import pkg/dagger/bitswap +import pkg/dagger/bitswap/pendingblocks +import pkg/dagger/stores/memorystore +import pkg/dagger/chunker +import pkg/dagger/blocktype as bt + +import ../helpers + +suite "Bitswap engine basic": + let + rng = Rng.instance() + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerID.init(seckey.getKey().tryGet()).tryGet() + chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + done: Future[void] + + setup: + done = newFuture[void]() + + test "should send want list to new peers": + proc sendWantList( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} = + check cids == blocks.mapIt( it.cid ) + + done.complete() + + let request = BitswapRequest( + sendWantList: sendWantList, + ) + + let engine = BitswapEngine.new(MemoryStore.new(blocks), request) + engine.wantList = blocks.mapIt( it.cid ) + engine.setupPeer(peerId) + + await done + +suite "Bitswap engine handlers": + let + rng = Rng.instance() + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerID.init(seckey.getKey().tryGet()).tryGet() + chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + engine: BitswapEngine + peerCtx: BitswapPeerCtx + done: Future[void] + + setup: + done = newFuture[void]() + engine = BitswapEngine.new(MemoryStore.new()) + peerCtx = BitswapPeerCtx( + id: peerId + ) + engine.peers.add(peerCtx) + + test "should handle want list": + let wantList = makeWantList(blocks.mapIt( it.cid )) + proc taskScheduler(ctx: BitswapPeerCtx): bool = + check ctx.id == peerId + check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid ) + + done.complete() + + engine.scheduleTask = taskScheduler + engine.wantListHandler(peerId, wantList) + + await done + + test "should handle want list - `dont-have`": + let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = + check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) + for p in presence: + check: + p.`type` == BlockPresenceType.presenceDontHave + + done.complete() + + engine.request = BitswapRequest( + sendPresence: sendPresence + ) + + engine.wantListHandler(peerId, wantList) + + await done + + test "should handle want list - `dont-have` some blocks": + let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = + check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer ) + for p in presence: + check: + p.`type` == BlockPresenceType.presenceDontHave + + done.complete() + + engine.request = BitswapRequest(sendPresence: sendPresence) + engine.localStore.putBlocks(@[blocks[0], blocks[1]]) + engine.wantListHandler(peerId, wantList) + + await done + + test "should handle blocks": + let pending = blocks.mapIt( + engine.pendingBlocks.addOrAwait( it.cid ) + ) + + engine.blocksHandler(peerId, blocks) + let resolved = await allFinished(pending) + check resolved.mapIt( it.read ) == blocks + for b in blocks: + check engine.localStore.hasBlock(b.cid) + + test "should handle block presence": + engine.blockPresenceHandler( + peerId, + blocks.mapIt( + BlockPresence( + cid: it.cid.data.buffer, + `type`: BlockPresenceType.presenceHave + ))) + + check peerCtx.peerHave == blocks.mapIt( it.cid ) + +suite "Bitswap engine blocks": + + let + rng = Rng.instance() + chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + engine: BitswapEngine + peersCtx: seq[BitswapPeerCtx] + peers: seq[PeerID] + done: Future[void] + + setup: + done = newFuture[void]() + engine = BitswapEngine.new(MemoryStore.new()) + peersCtx = @[] + + for i in 0..3: + let seckey = PrivateKey.random(rng[]).tryGet() + peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet()) + + peersCtx.add(BitswapPeerCtx( + id: peers[i] + )) + + # set debt ratios + + # ratio > 1 + peersCtx[0].bytesSent = 1000 + peersCtx[0].bytesRecv = 100 + + # ratio < 1 + peersCtx[1].bytesSent = 100 + peersCtx[1].bytesRecv = 1000 + + # ratio > 1 + peersCtx[2].bytesSent = 100 + peersCtx[2].bytesRecv = 99 + + # ratio == 0 + peersCtx[3].bytesSent = 100 + peersCtx[3].bytesRecv = 100 + + engine.peers = peersCtx + + test "should select peer with least debt ratio": + proc sendWantList( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} = + check cids == blocks.mapIt( it.cid ) + if peersCtx[1].id == id: # second peer has the least debt ratio + check wantType == WantType.wantBlock + engine.resolveBlocks(blocks) + else: + check wantType == WantType.wantHave + + engine.request.sendWantList = sendWantList + + let pending = engine.requestBlocks(blocks.mapIt( it.cid )) + let resolved = await allFinished(pending) + check resolved.mapIt( it.read ) == blocks + + test "should select peer with least debt ratio and have CIDs": + proc sendWantList( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} = + check cids == blocks.mapIt( it.cid ) + if peersCtx[3].id == id: # 4th peer has the least debt ratio and has cids + check wantType == WantType.wantBlock + engine.resolveBlocks(blocks) + else: + check wantType == WantType.wantHave + + engine.request.sendWantList = sendWantList + + peersCtx[3].peerHave = blocks.mapIt( it.cid ) + let pending = engine.requestBlocks(blocks.mapIt( it.cid )) + let resolved = await allFinished(pending) + check resolved.mapIt( it.read ) == blocks + + +suite "Task Handler": + + let + rng = Rng.instance() + chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + engine: BitswapEngine + peersCtx: seq[BitswapPeerCtx] + peers: seq[PeerID] + done: Future[void] + + setup: + done = newFuture[void]() + engine = BitswapEngine.new(MemoryStore.new()) + peersCtx = @[] + + for i in 0..3: + let seckey = PrivateKey.random(rng[]).tryGet() + peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet()) + + peersCtx.add(BitswapPeerCtx( + id: peers[i] + )) + + engine.peers = peersCtx + + test "Should send want-blocks in priority order": + proc sendBlocks( + id: PeerID, + blks: seq[bt.Block]) {.gcsafe.} = + check blks.len == 2 + check: + blks[1].cid == blocks[0].cid + blks[0].cid == blocks[1].cid + + engine.localStore.putBlocks(blocks) + engine.request.sendBlocks = sendBlocks + + # second block to send by priority + peersCtx[0].peerWants.add( + Entry( + `block`: blocks[0].cid.data.buffer, + priority: 49, + cancel: false, + wantType: WantType.wantBlock, + sendDontHave: false) + ) + + # first block to send by priority + peersCtx[0].peerWants.add( + Entry( + `block`: blocks[1].cid.data.buffer, + priority: 50, + cancel: false, + wantType: WantType.wantBlock, + sendDontHave: false) + ) + + await engine.taskHandler(peersCtx[0]) + + test "Should send presence": + proc sendPresence( + id: PeerID, + presence: seq[BlockPresence]) {.gcsafe.} = + check presence.len == 3 + check: + presence[0].cid == blocks[0].cid.data.buffer + presence[0].`type` == BlockPresenceType.presenceHave + + presence[1].cid == blocks[1].cid.data.buffer + presence[1].`type` == BlockPresenceType.presenceHave + + presence[2].`type` == BlockPresenceType.presenceDontHave + + engine.localStore.putBlocks(blocks) + engine.request.sendPresence = sendPresence + + # have block + peersCtx[0].peerWants.add( + Entry( + `block`: blocks[0].cid.data.buffer, + priority: 1, + cancel: false, + wantType: WantType.wantHave, + sendDontHave: false) + ) + + # have block + peersCtx[0].peerWants.add( + Entry( + `block`: blocks[1].cid.data.buffer, + priority: 1, + cancel: false, + wantType: WantType.wantHave, + sendDontHave: false) + ) + + # don't have block + peersCtx[0].peerWants.add( + Entry( + `block`: bt.Block.new("Block 1".toBytes).cid.data.buffer, + priority: 1, + cancel: false, + wantType: WantType.wantHave, + sendDontHave: false) + ) + + await engine.taskHandler(peersCtx[0]) diff --git a/tests/dagger/bitswap/testnetwork.nim b/tests/dagger/bitswap/testnetwork.nim new file mode 100644 index 00000000..98c872bf --- /dev/null +++ b/tests/dagger/bitswap/testnetwork.nim @@ -0,0 +1,196 @@ +import std/sequtils +import std/tables + +import pkg/asynctest +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/standard_setup +import pkg/libp2p/errors +import pkg/protobuf_serialization + +import pkg/dagger/stores/memorystore +import pkg/dagger/bitswap/network +import pkg/dagger/p2p/rng +import pkg/dagger/chunker +import pkg/dagger/blocktype as bt + +import ../helpers + +suite "Bitswap network": + let + rng = Rng.instance() + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerID.init(seckey.getKey().tryGet()).tryGet() + chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + network: BitswapNetwork + networkPeer: NetworkPeer + buffer: BufferStream + done: Future[void] + + proc getConn(): Future[Connection] {.async.} = + return Connection(buffer) + + setup: + done = newFuture[void]() + buffer = newBufferStream() + network = BitswapNetwork.new( + switch = newStandardSwitch(), + connProvider = getConn) + network.setupPeer(peerId) + networkPeer = network.peers[peerId] + discard await networkPeer.connect() + + test "Want List handler": + proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} = + # check that we got the correct amount of entries + check wantList.entries.len == 4 + + for b in blocks: + check b.cid in wantList.entries + let entry = wantList.entries[wantList.entries.find(b.cid)] + check entry.wantType == WantType.wantHave + check entry.priority == 1 + check entry.cancel == true + check entry.sendDontHave == true + + done.complete() + + network.handlers.onWantList = wantListHandler + + let wantList = makeWantList( + blocks.mapIt( it.cid ), + 1, true, WantType.wantHave, + true, true) + + let msg = Message(wantlist: wantList) + await buffer.pushData(lenPrefix(Protobuf.encode(msg))) + + await done.wait(500.millis) + + test "Blocks Handler": + proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} = + check blks == blocks + done.complete() + + network.handlers.onBlocks = blocksHandler + + let msg = Message(payload: makeBlocks(blocks)) + await buffer.pushData(lenPrefix(Protobuf.encode(msg))) + + await done.wait(500.millis) + + test "Precense Handler": + proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} = + for b in blocks: + check: + b.cid in precense + + done.complete() + + network.handlers.onPresence = presenceHandler + + let msg = Message( + blockPresences: blocks.mapIt( + BlockPresence( + cid: it.cid.data.buffer, + type: BlockPresenceType.presenceHave + ))) + await buffer.pushData(lenPrefix(Protobuf.encode(msg))) + + await done.wait(500.millis) + +suite "Bitswap Network - e2e": + let + chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + blocks = chunker.mapIt( bt.Block.new(it) ) + + var + switch1, switch2: Switch + network1, network2: BitswapNetwork + awaiters: seq[Future[void]] + done: Future[void] + + setup: + done = newFuture[void]() + switch1 = newStandardSwitch() + switch2 = newStandardSwitch() + awaiters.add(await switch1.start()) + awaiters.add(await switch2.start()) + + network1 = BitswapNetwork.new( + switch = switch1) + switch1.mount(network1) + + network2 = BitswapNetwork.new( + switch = switch2) + switch2.mount(network2) + + await switch1.connect( + switch2.peerInfo.peerId, + switch2.peerInfo.addrs) + + teardown: + await allFuturesThrowing( + switch1.stop(), + switch2.stop()) + + await allFuturesThrowing(awaiters) + + test "broadcast want list": + proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} = + # check that we got the correct amount of entries + check wantList.entries.len == 4 + + for b in blocks: + check b.cid in wantList.entries + let entry = wantList.entries[wantList.entries.find(b.cid)] + check entry.wantType == WantType.wantHave + check entry.priority == 1 + check entry.cancel == true + check entry.sendDontHave == true + + done.complete() + + network2.handlers.onWantList = wantListHandler + network1.broadcastWantList( + switch2.peerInfo.peerId, + blocks.mapIt( it.cid ), + 1, true, WantType.wantHave, + true, true) + + await done.wait(500.millis) + + test "broadcast blocks": + proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} = + check blks == blocks + done.complete() + + network2.handlers.onBlocks = blocksHandler + network1.broadcastBlocks( + switch2.peerInfo.peerId, + blocks) + + await done.wait(500.millis) + + test "broadcast precense": + proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} = + for b in blocks: + check: + b.cid in precense + + done.complete() + + network2.handlers.onPresence = presenceHandler + + network1.broadcastBlockPresence( + switch2.peerInfo.peerId, + blocks.mapIt( + BlockPresence( + cid: it.cid.data.buffer, + type: BlockPresenceType.presenceHave + ))) + + await done.wait(500.millis) diff --git a/tests/dagger/bitswap/utils.nim b/tests/dagger/bitswap/utils.nim new file mode 100644 index 00000000..9bf3d4b2 --- /dev/null +++ b/tests/dagger/bitswap/utils.nim @@ -0,0 +1,34 @@ +import std/sequtils + +import pkg/chronos +import pkg/libp2p + +import pkg/dagger/bitswap +import pkg/dagger/stores/memorystore +import pkg/dagger/blocktype as bt + +proc generateNodes*( + num: Natural, + blocks: openArray[bt.Block] = [], + secureManagers: openarray[SecureProtocol] = [ + SecureProtocol.Noise, + ]): seq[tuple[switch: Switch, bitswap: Bitswap]] = + + for i in 0.. 0: + result.add(popNoWait(tmp).get()) + +suite "synchronous tests": + test "test pushNoWait - Min": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + check heap[0] == 0 + check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + test "test pushNoWait - Max": + var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + check heap[0] == 9 + check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + test "test popNoWait": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + var res: seq[int] + while heap.len > 0: + let r = heap.popNoWait() + if r.isOk: + res.add(r.get) + + check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + test "test popNoWait - Max": + var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + var res: seq[int] + while heap.len > 0: + let r = heap.popNoWait() + if r.isOk: + res.add(r.get) + + check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + test "test del": # Test del + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + heap.del(0) + doAssert(heap[0] == 1) + + heap.del(heap.find(7)) + check heap.toSortedSeq == @[1, 2, 3, 4, 5, 6, 8, 9] + + heap.del(heap.find(5)) + check heap.toSortedSeq == @[1, 2, 3, 4, 6, 8, 9] + + heap.del(heap.find(6)) + check heap.toSortedSeq == @[1, 2, 3, 4, 8, 9] + + heap.del(heap.find(2)) + check heap.toSortedSeq == @[1, 3, 4, 8, 9] + + test "del last": # Test del last + var heap = newAsyncHeapQueue[int]() + let data = [1, 2, 3] + for item in data: + check heap.pushNoWait(item).isOk + + heap.del(2) + check heap.toSortedSeq == @[1, 2] + + heap.del(1) + check heap.toSortedSeq == @[1] + + heap.del(0) + check heap.toSortedSeq == newSeq[int]() # empty seq has no type + + test "should throw popping from an empty queue": + var heap = newAsyncHeapQueue[int]() + let err = heap.popNoWait() + check err.isErr + check err.error == AsyncHQErrors.Empty + + test "should throw pushing to an full queue": + var heap = newAsyncHeapQueue[int](1) + check heap.pushNoWait(1).isOk + let err = heap.pushNoWait(2) + check err.isErr + check err.error == AsyncHQErrors.Full + + test "test clear": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + check heap.len == 10 + heap.clear() + check heap.len == 0 + +suite "asynchronous tests": + + test "test push": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + await push(heap, item) + check heap[0] == 0 + check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + test "test push and pop with maxSize": + var heap = newAsyncHeapQueue[int](5) + let data = [1, 9, 5, 3, 7, 4, 2] + + proc pushTask() {.async.} = + for item in data: + await push(heap, item) + + asyncCheck pushTask() + + check heap.len == 5 + check heap[0] == 1 # because we haven't pushed 0 yet + + check (await heap.pop) == 1 + check (await heap.pop) == 3 + check (await heap.pop) == 5 + check (await heap.pop) == 7 + check (await heap.pop) == 9 + + await sleepAsync(1.milliseconds) # allow poll to run once more + check (await heap.pop) == 2 + check (await heap.pop) == 4 + + test "test update": + var heap = newAsyncHeapQueue[Task](5) + + for item in [("a", 4), ("b", 3), ("c", 2)]: + check heap.pushNoWait(item).isOk + + check heap[0] == (name: "c", priority: 2) + check heap.update((name: "a", priority: 1)) + check heap[0] == (name: "a", priority: 1) + + test "test pushOrUpdate - update": + var heap = newAsyncHeapQueue[Task](3) + + for item in [("a", 4), ("b", 3), ("c", 2)]: + check heap.pushNoWait(item).isOk + + check heap[0] == (name: "c", priority: 2) + await heap.pushOrUpdate((name: "a", priority: 1)) + check heap[0] == (name: "a", priority: 1) + + test "test pushOrUpdate - push": + var heap = newAsyncHeapQueue[Task](2) + + for item in [("a", 4), ("b", 3)]: + check heap.pushNoWait(item).isOk + + check heap[0] == ("b", 3) # sanity check for order + + let fut = heap.pushOrUpdate(("c", 2)) # attempt to push a non existen item but block + check heap.popNoWait().get() == ("b", 3) # pop one off + await fut # wait for push to complete + + check heap[0] == (name: "c", priority: 2) # check order again + + test "test pop": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk + + var res: seq[int] + while heap.len > 0: + res.add((await heap.pop())) + + check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + test "test delete": + var heap = newAsyncHeapQueue[Task]() + let data = ["d", "b", "c", "a", "h", "e", "f", "g"] + + for item in data: + check heap.pushNoWait(( + name: item, + priority: Rng.instance().rand(data.len) + )).isOk + + let del = heap[3] + heap.delete(del) + check heap.find(del) < 0 diff --git a/tests/dagger/testblockstore.nim b/tests/dagger/testblockstore.nim new file mode 100644 index 00000000..517b403b --- /dev/null +++ b/tests/dagger/testblockstore.nim @@ -0,0 +1,82 @@ +import std/sequtils +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils + +import pkg/dagger/p2p/rng +import pkg/dagger/stores/memorystore +import pkg/dagger/chunker + +import ./helpers + +suite "Memory Store": + + var store: MemoryStore + var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) + var blocks = chunker.mapIt( Block.new(it) ) + + setup: + store = MemoryStore.new(blocks) + + test "getBlocks single": + let blk = await store.getBlocks(@[blocks[0].cid]) + check blk[0] == blocks[0] + + test "getBlocks multiple": + let blk = await store.getBlocks(blocks[0..2].mapIt( it.cid )) + check blk == blocks[0..2] + + test "hasBlock": + check store.hasBlock(blocks[0].cid) + + test "delBlocks single": + let blks = blocks[1..3].mapIt( it.cid ) + store.delBlocks(blks) + + check not store.hasBlock(blks[0]) + check not store.hasBlock(blks[1]) + check not store.hasBlock(blks[2]) + + test "add blocks change handler": + let blocks = @[ + Block.new("Block 1".toBytes), + Block.new("Block 2".toBytes), + Block.new("Block 3".toBytes), + ] + + var triggered = false + store.addChangeHandler( + proc(evt: BlockStoreChangeEvt) = + check evt.kind == ChangeType.Added + check evt.cids == blocks.mapIt( it.cid ) + triggered = true + , ChangeType.Added + ) + + store.putBlocks(blocks) + check triggered + + test "add blocks change handler": + let blocks = @[ + Block.new("Block 1".toBytes), + Block.new("Block 2".toBytes), + Block.new("Block 3".toBytes), + ] + + var triggered = false + store.addChangeHandler( + proc(evt: BlockStoreChangeEvt) = + check evt.kind == ChangeType.Removed + check evt.cids == blocks.mapIt( it.cid ) + triggered = true + , ChangeType.Removed + ) + + store.putBlocks(blocks) + check store.hasBlock(blocks[0].cid) + check store.hasBlock(blocks[1].cid) + check store.hasBlock(blocks[2].cid) + + store.delBlocks(blocks.mapIt( it.cid )) + check triggered diff --git a/tests/dagger/testchunking.nim b/tests/dagger/testchunking.nim new file mode 100644 index 00000000..82dace73 --- /dev/null +++ b/tests/dagger/testchunking.nim @@ -0,0 +1,39 @@ +import std/unittest +import pkg/stew/byteutils +import pkg/dagger/chunker + +suite "Chunking": + test "should return proper size chunks": + proc reader(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.} = + let contents = "1234567890".toBytes + copyMem(addr data[0], unsafeAddr contents[offset], data.len) + return data.len + + let chunker = Chunker.new( + reader = reader, + size = 10, + chunkSize = 2) + + check chunker.getBytes() == "12".toBytes + check chunker.getBytes() == "34".toBytes + check chunker.getBytes() == "56".toBytes + check chunker.getBytes() == "78".toBytes + check chunker.getBytes() == "90".toBytes + check chunker.getBytes() == "".toBytes + + test "should chunk file": + let (fileName, _, _) = instantiationInfo() # get this file's name + let path = "tests/dagger/" & filename + let file = open(path) + let fileChunker = newFileChunker(file = file) + + var data: seq[byte] + while true: + let buff = fileChunker.getBytes() + if buff.len <= 0: + break + + check buff.len <= fileChunker.chunkSize + data.add(buff) + + check string.fromBytes(data) == readFile(path) diff --git a/tests/helpers/examples.nim b/tests/helpers/examples.nim deleted file mode 100644 index 67e322f8..00000000 --- a/tests/helpers/examples.nim +++ /dev/null @@ -1,13 +0,0 @@ -import std/sequtils -import std/random -import pkg/libp2p -import pkg/ipfs/ipfsobject - -proc example*(t: type seq[byte]): seq[byte] = - newSeqWith(10, rand(byte)) - -proc example*(t: type IpfsObject): IpfsObject = - IpfsObject(data: seq[byte].example) - -proc example*(t: type Cid): Cid = - IpfsObject.example.cid diff --git a/tests/ipfs/testBitswap.nim b/tests/ipfs/testBitswap.nim deleted file mode 100644 index 1d5466c3..00000000 --- a/tests/ipfs/testBitswap.nim +++ /dev/null @@ -1,41 +0,0 @@ -import pkg/chronos -import pkg/asynctest -import pkg/ipfs/ipfsobject -import pkg/ipfs/p2p/switch -import pkg/ipfs/bitswap - -suite "bitswap": - - let address = MultiAddress.init("/ip4/127.0.0.1/tcp/40981").get() - let obj = IpfsObject(data: @[1'u8, 2'u8, 3'u8]) - - var bitswap1, bitswap2: Bitswap - var peer1, peer2: Switch - - setup: - peer1 = Switch.create() - peer2 = Switch.create() - peer1.peerInfo.addrs.add(address) - discard await peer1.start() - discard await peer2.start() - bitswap1 = Bitswap.start(peer1) - bitswap2 = Bitswap.start(peer2) - - teardown: - await peer1.stop() - await peer2.stop() - - test "stores ipfs objects": - bitswap1.store(obj) - - test "retrieves local objects": - bitswap1.store(obj) - check (await bitswap1.retrieve(obj.cid)).get() == obj - - test "signals retrieval failure": - check (await bitswap1.retrieve(obj.cid, 100.milliseconds)).isNone - - test "retrieves objects from network": - bitswap1.store(obj) - await bitswap2.connect(peer1.peerInfo) - check (await bitswap2.retrieve(obj.cid)).get() == obj diff --git a/tests/ipfs/testBitswapMessages.nim b/tests/ipfs/testBitswapMessages.nim deleted file mode 100644 index d798af76..00000000 --- a/tests/ipfs/testBitswapMessages.nim +++ /dev/null @@ -1,23 +0,0 @@ -import std/unittest -import pkg/libp2p -import pkg/ipfs/protobuf/bitswap -import pkg/ipfs/bitswap/messages -import ../helpers/examples - -suite "bitswap messages": - - test "creates message with want list": - let cid1, cid2 = Cid.example - let message = Message.want(cid1, cid2) - check message == Message(wantlist: WantList(entries: @[ - Entry(`block`: cid1.data.buffer), - Entry(`block`: cid2.data.buffer) - ])) - - test "creates message that sends blocks": - let block1, block2 = seq[byte].example - let message = Message.send(block1, block2) - check message == Message(payload: @[ - Block(data: block1), - Block(data: block2) - ]) diff --git a/tests/ipfs/testBitswapProtocol.nim b/tests/ipfs/testBitswapProtocol.nim deleted file mode 100644 index a895b079..00000000 --- a/tests/ipfs/testBitswapProtocol.nim +++ /dev/null @@ -1,57 +0,0 @@ -import pkg/chronos -import pkg/asynctest -import pkg/ipfs/p2p/switch -import pkg/ipfs/bitswap/messages -import pkg/ipfs/bitswap/protocol - -suite "bitswap protocol": - - let address = MultiAddress.init("/ip4/127.0.0.1/tcp/45344").get() - let message = Message.send(@[1'u8, 2'u8, 3'u8]) - - var peer1, peer2: Switch - var bitswap: BitswapProtocol - - setup: - peer1 = Switch.create() - peer2 = Switch.create() - bitswap = BitswapProtocol.new() - peer1.peerInfo.addrs.add(address) - peer1.mount(bitswap) - discard await peer1.start() - discard await peer2.start() - - teardown: - await peer1.stop() - await peer2.stop() - - test "opens a stream to another peer": - let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol) - await stream.close() - - test "accepts a stream from another peer": - let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol) - let incoming = await bitswap.accept() - await outgoing.close() - await incoming.close() - - test "writes messages to a stream": - let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol) - await stream.write(message) - await stream.close() - - test "reads messages from incoming stream": - let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol) - let incoming = await bitswap.accept() - await outgoing.write(message) - check (await incoming.read()) == message - await outgoing.close() - await incoming.close() - - test "reads messages from outgoing stream": - let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol) - let incoming = await bitswap.accept() - await incoming.write(message) - check (await outgoing.read()) == message - await outgoing.close() - await incoming.close() diff --git a/tests/ipfs/testChunking.nim b/tests/ipfs/testChunking.nim deleted file mode 100644 index 56e48461..00000000 --- a/tests/ipfs/testChunking.nim +++ /dev/null @@ -1,31 +0,0 @@ -import std/unittest -import std/os -import pkg/ipfs/chunking - -suite "chunking": - - var input, output: File - - setup: - input = open("tests/input.txt", fmReadWrite) - output = open("tests/output.txt", fmReadWrite) - input.write("foo") - input.setFilePos(0) - - teardown: - input.close() - output.close() - removeFile("tests/input.txt") - removeFile("tests/output.txt") - - test "creates an IPFS object from a file": - check createObject(input) != IpfsObject.default - - test "writes an IPFS object to a file": - let obj = createObject(input) - writeToFile(obj, output) - - input.setFilePos(0) - output.setFilePos(0) - check output.readAll() == input.readAll() - diff --git a/tests/ipfs/testDhtRouting.nim b/tests/ipfs/testDhtRouting.nim deleted file mode 100644 index b35d23c1..00000000 --- a/tests/ipfs/testDhtRouting.nim +++ /dev/null @@ -1,16 +0,0 @@ -import std/unittest -import pkg/ipfs/ipfsobject -import pkg/ipfs/dht/routing - -suite "DHT routing table": - - test "finds peers closest to some content": - let peer1 = PeerInfo(peerId: PeerId(data: @[1'u8])) - let peer2 = PeerInfo(peerId: PeerId(data: @[2'u8])) - let contentId = IpfsObject(data: @[]).cid - - var table = RoutingTable() - table.add(peer1) - table.add(peer2) - - check table.closest(contentId) == @[peer1, peer2] diff --git a/tests/ipfs/testIpfs.nim b/tests/ipfs/testIpfs.nim deleted file mode 100644 index aa5f3dfa..00000000 --- a/tests/ipfs/testIpfs.nim +++ /dev/null @@ -1,48 +0,0 @@ -import std/os -import pkg/asynctest -import pkg/chronos -import pkg/ipfs - -suite "integration": - - let address = MultiAddress.init("/ip4/127.0.0.1/tcp/48952").get() - - var peer1, peer2: Ipfs - var input, output: File - - proc setupPeers {.async.} = - peer1 = await Ipfs.start(address) - peer2 = await Ipfs.start() - await peer2.connect(peer1.info) - - proc setupFiles = - input = open("tests/input.txt", fmReadWrite) - output = open("tests/output.txt", fmReadWrite) - input.write("foo") - input.setFilePos(0) - - proc teardownPeers {.async.} = - await peer1.stop() - await peer2.stop() - - proc teardownFiles = - input.close() - output.close() - removeFile("tests/input.txt") - removeFile("tests/output.txt") - - setup: - await setupPeers() - setupFiles() - - teardown: - await teardownPeers() - teardownFiles() - - test "file can be transferred from one peer to another": - let identifier = await peer1.add(input) - await peer2.get(identifier, output) - - input.setFilePos(0) - output.setFilePos(0) - check output.readAll() == input.readAll() diff --git a/tests/ipfs/testObject.nim b/tests/ipfs/testObject.nim deleted file mode 100644 index 551cfcd5..00000000 --- a/tests/ipfs/testObject.nim +++ /dev/null @@ -1,12 +0,0 @@ -import std/unittest -import pkg/libp2p -import pkg/ipfs/ipfsobject - -suite "IPFS Object": - - test "has a content id": - let dag1 = IpfsObject(data: @[1'u8, 2'u8, 3'u8]) - let dag2 = IpfsObject(data: @[4'u8, 5'u8, 6'u8]) - let dag3 = IpfsObject(data: @[4'u8, 5'u8, 6'u8]) - check dag1.cid != dag2.cid - check dag2.cid == dag3.cid diff --git a/tests/ipfs/testProtobuf.nim b/tests/ipfs/testProtobuf.nim deleted file mode 100644 index e9490f09..00000000 --- a/tests/ipfs/testProtobuf.nim +++ /dev/null @@ -1,25 +0,0 @@ -import std/unittest -import pkg/libp2p -import pkg/protobuf_serialization -import pkg/ipfs/protobuf/bitswap -import ../helpers/examples - -suite "protobuf messages": - - test "serializes bitswap want lists": - let cid = Cid.example - let entry = Entry(`block`: cid.data.buffer) - let wantlist = WantList(entries: @[entry]) - let message = Message(wantlist: wantlist) - - let encoded = Protobuf.encode(message) - - check Protobuf.decode(encoded, Message) == message - - test "serializes bitswap blocks": - let bloc = Block(data: seq[byte].example) - let message = Message(payload: @[bloc]) - - let encoded = Protobuf.encode(message) - - check Protobuf.decode(encoded, Message) == message diff --git a/tests/ipfs/testRepo.nim b/tests/ipfs/testRepo.nim deleted file mode 100644 index 8af5370b..00000000 --- a/tests/ipfs/testRepo.nim +++ /dev/null @@ -1,36 +0,0 @@ -import pkg/asynctest -import pkg/chronos -import pkg/ipfs/repo - -suite "repo": - - let obj = IpfsObject(data: @[1'u8, 2'u8, 3'u8]) - var repo: Repo - - setup: - repo = Repo() - - test "stores IPFS objects": - repo.store(obj) - - test "retrieves IPFS objects by their content id": - repo.store(obj) - check repo.retrieve(obj.cid).get() == obj - - test "signals retrieval failure": - check repo.retrieve(obj.cid).isNone - - test "knows which content ids are stored": - check repo.contains(obj.cid) == false - repo.store(obj) - check repo.contains(obj.cid) == true - - test "waits for IPFS object to arrive": - let waiting = repo.wait(obj.cid, 1.minutes) - check not waiting.finished - repo.store(obj) - check waiting.finished - - test "does not wait when IPFS object is already stored": - repo.store(obj) - check repo.wait(obj.cid, 1.minutes).finished diff --git a/tests/ipfs/testWaitingList.nim b/tests/ipfs/testWaitingList.nim deleted file mode 100644 index 986742c3..00000000 --- a/tests/ipfs/testWaitingList.nim +++ /dev/null @@ -1,31 +0,0 @@ -import pkg/asynctest -import pkg/chronos -import ipfs/repo/waitinglist - -suite "waiting list": - - var list: WaitingList[string] - - setup: - list = WaitingList[string]() - - test "waits for item to be delivered": - let waiting = list.wait("apple", 1.minutes) - check not waiting.finished - list.deliver("orange") - check not waiting.finished - list.deliver("apple") - check waiting.finished - - test "notifies everyone who is waiting": - let wait1 = list.wait("apple", 1.minutes) - let wait2 = list.wait("apple", 1.minutes) - list.deliver("apple") - check wait1.finished - check wait2.finished - - test "stops waiting after timeout": - let wait = list.wait("apple", 100.milliseconds) - check not wait.finished - await sleepAsync(100.milliseconds) - check wait.finished diff --git a/tests/nim.cfg b/tests/nim.cfg index 0f840a15..cf484916 100644 --- a/tests/nim.cfg +++ b/tests/nim.cfg @@ -1 +1,3 @@ --path:".." +--threads:on +--tlsEmulation:off diff --git a/tests/testAll.nim b/tests/testAll.nim index 0d573335..9e791999 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -1,12 +1,7 @@ -import ./ipfs/testObject -import ./ipfs/testChunking -import ./ipfs/testWaitingList -import ./ipfs/testRepo -import ./ipfs/testDhtRouting -import ./ipfs/testProtobuf -import ./ipfs/testBitswapMessages -import ./ipfs/testBitswapProtocol -import ./ipfs/testBitswap -import ./ipfs/testIpfs +import ./dagger/bitswap/testbitswap +import ./dagger/bitswap/testnetwork +import ./dagger/testasyncheapqueue +import ./dagger/testblockstore +import ./dagger/testchunking {.warning[UnusedImport]: off.}