diff --git a/dagger.nim b/dagger.nim index 9fd5b5a9..e69de29b 100644 --- a/dagger.nim +++ b/dagger.nim @@ -1,49 +0,0 @@ -import pkg/chronos -import pkg/libp2p/peerinfo -import pkg/libp2p/multiaddress -import ./ipfs/p2p/switch -import ./ipfs/repo -import ./ipfs/chunking -import ./ipfs/bitswap - -export peerinfo except IPFS -export multiaddress except IPFS - -type - Ipfs* = ref object - repo: Repo - switch: Switch - bitswap: Bitswap - -proc info*(ipfs: Ipfs): PeerInfo = - ipfs.switch.peerInfo - -proc start*(_: type Ipfs, addresses: seq[MultiAddress]): Future[Ipfs] {.async.} = - let repo = Repo() - let switch = Switch.create() - let bitswap = Bitswap.start(switch, repo) - switch.peerInfo.addrs.add(addresses) - discard await switch.start() - result = Ipfs(repo: repo, switch: switch, bitswap: bitswap) - -proc start*(_: type Ipfs, address: MultiAddress): Future[Ipfs] {.async.} = - result = await Ipfs.start(@[address]) - -proc start*(_: type Ipfs): Future[Ipfs] {.async.} = - result = await Ipfs.start(@[]) - -proc connect*(peer: Ipfs, info: PeerInfo) {.async.} = - await peer.bitswap.connect(info) - -proc add*(peer: Ipfs, input: File): Future[Cid] {.async.} = - let obj = createObject(input) - peer.repo.store(obj) - result = obj.cid - -proc get*(peer: Ipfs, identifier: Cid, output: File) {.async.} = - let obj = await peer.bitswap.retrieve(identifier) - if obj.isSome: - obj.get().writeToFile(output) - -proc stop*(peer: Ipfs) {.async.} = - await peer.switch.stop() diff --git a/dagger.nimble b/dagger.nimble index c7d345ca..d794b933 100644 --- a/dagger.nimble +++ b/dagger.nimble @@ -4,7 +4,7 @@ description = "The hardrive for Web3" license = "MIT" requires "nim >= 1.2.6", - "libp2p >= 0.0.2 & < 0.1.0", + "libp2p#unstable", "nimcrypto >= 0.4.1", "bearssl >= 0.1.4", "chronicles >= 0.7.2", diff --git a/dagger/blockexchange.nim b/dagger/blockexchange.nim new file mode 100644 index 00000000..125f50bf --- /dev/null +++ b/dagger/blockexchange.nim @@ -0,0 +1,11 @@ +import ./blockexchange/[ + network, + engine, + peercontext] + +import ./blockexchange/protobuf/[ + blockexc, + payments, + presence] + +export network, engine, peercontext, blockexc, payments, presence diff --git a/dagger/blockexchange.out b/dagger/blockexchange.out new file mode 100755 index 00000000..0288a06d Binary files /dev/null and b/dagger/blockexchange.out differ diff --git a/dagger/bitswap/engine.nim b/dagger/blockexchange/engine.nim similarity index 85% rename from dagger/bitswap/engine.nim rename to dagger/blockexchange/engine.nim index 4a424725..d28b09c5 100644 --- a/dagger/bitswap/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -15,38 +15,39 @@ import pkg/chronicles import pkg/libp2p import pkg/libp2p/errors -import ./protobuf/bitswap as pb -import ./protobuf/presence -import ../blocktype as bt import ../stores/blockstore +import ../blocktype as bt import ../utils/asyncheapqueue +import ./protobuf/blockexc +import ./protobuf/presence + import ./network import ./pendingblocks import ./peercontext import ./engine/payments -export peercontext +export peercontext, payments, pendingblocks logScope: - topics = "dagger bitswap engine" + topics = "dagger blockexc engine" const DefaultTimeout* = 5.seconds DefaultMaxPeersPerRequest* = 10 type - TaskHandler* = proc(task: BitswapPeerCtx): Future[void] {.gcsafe.} - TaskScheduler* = proc(task: BitswapPeerCtx): bool {.gcsafe.} + TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} + TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} - BitswapEngine* = ref object of RootObj + BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance - peers*: seq[BitswapPeerCtx] # peers we're currently actively exchanging with + peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with wantList*: seq[Cid] # local wants list pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved peersPerRequest: int # max number of peers to request from scheduleTask*: TaskScheduler # schedule a new task with the task runner - request*: BitswapRequest # bitswap network requests + request*: BlockExcRequest # block exchange network requests wallet*: WalletRef # nitro wallet for micropayments pricing*: ?Pricing # optional bandwidth pricing @@ -60,7 +61,7 @@ proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = a.anyIt( it.cid == b ) -proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx = +proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx = ## Get the peer's context ## @@ -69,7 +70,7 @@ proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx = return peer[0] proc requestBlocks*( - b: BitswapEngine, + b: BlockExcEngine, cids: seq[Cid], timeout = DefaultTimeout): seq[Future[bt.Block]] = ## Request a block from remotes @@ -91,12 +92,11 @@ proc requestBlocks*( blocks.add( b.pendingBlocks.addOrAwait(c).wait(timeout)) - var peers = b.peers # get the first peer with at least one (any) # matching cid - var blockPeer: BitswapPeerCtx + var blockPeer: BlockExcPeerCtx for i, p in peers: let has = cids.anyIt( it in p.peerHave @@ -125,7 +125,7 @@ proc requestBlocks*( if peers.len == 0: return blocks # no peers to send wants to - template sendWants(ctx: BitswapPeerCtx) = + template sendWants(ctx: BlockExcPeerCtx) = # just send wants b.request.sendWantList( ctx.id, @@ -142,7 +142,7 @@ proc requestBlocks*( return blocks proc blockPresenceHandler*( - b: BitswapEngine, + b: BlockExcEngine, peer: PeerID, blocks: seq[BlockPresence]) = ## Handle block presence @@ -156,7 +156,7 @@ proc blockPresenceHandler*( if presence =? Presence.init(blk): peerCtx.updatePresence(presence) -proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) = +proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" let cids = blocks.mapIt( it.cid ) @@ -170,7 +170,7 @@ proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) = trace "Unable to schedule task for peer", peer = p.id break # do next peer -proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) = +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## Resolve pending blocks from the pending blocks manager ## and schedule any new task to be ran ## @@ -179,8 +179,8 @@ proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) = b.pendingBlocks.resolve(blocks) b.scheduleTasks(blocks) -proc payForBlocks(engine: BitswapEngine, - peer: BitswapPeerCtx, +proc payForBlocks(engine: BlockExcEngine, + peer: BlockExcPeerCtx, blocks: seq[bt.Block]) = let sendPayment = engine.request.sendPayment if sendPayment.isNil: @@ -191,7 +191,7 @@ proc payForBlocks(engine: BitswapEngine, sendPayment(peer.id, payment) proc blocksHandler*( - b: BitswapEngine, + b: BlockExcEngine, peer: PeerID, blocks: seq[bt.Block]) = ## handle incoming blocks @@ -206,7 +206,7 @@ proc blocksHandler*( b.payForBlocks(peerCtx, blocks) proc wantListHandler*( - b: BitswapEngine, + b: BlockExcEngine, peer: PeerID, wantList: WantList) = ## Handle incoming want lists @@ -250,14 +250,14 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer -proc accountHandler*(engine: BitswapEngine, peer: PeerID, account: Account) = +proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) = let context = engine.getPeerCtx(peer) if context.isNil: return context.account = account.some -proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState) = +proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) = without context =? engine.getPeerCtx(peer).option and account =? context.account: return @@ -268,14 +268,14 @@ proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState) else: context.paymentChannel = engine.wallet.acceptChannel(payment).option -proc setupPeer*(b: BitswapEngine, peer: PeerID) = +proc setupPeer*(b: BlockExcEngine, peer: PeerID) = ## Perform initial setup, such as want ## list exchange ## trace "Setting up new peer", peer if peer notin b.peers: - b.peers.add(BitswapPeerCtx( + b.peers.add(BlockExcPeerCtx( id: peer )) @@ -286,7 +286,7 @@ proc setupPeer*(b: BitswapEngine, peer: PeerID) = if address =? b.pricing.?address: b.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(b: BitswapEngine, peer: PeerID) = +proc dropPeer*(b: BlockExcEngine, peer: PeerID) = ## Cleanup disconnected peer ## @@ -295,7 +295,7 @@ proc dropPeer*(b: BitswapEngine, peer: PeerID) = # drop the peer from the peers table b.peers.keepItIf( it.id != peer ) -proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} = +proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max) @@ -334,19 +334,19 @@ proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} = if wants.len > 0: b.request.sendPresence(task.id, wants) -proc new*( - T: type BitswapEngine, +func new*( + T: type BlockExcEngine, localStore: BlockStore, wallet: WalletRef, - request: BitswapRequest = BitswapRequest(), + request: BlockExcRequest = BlockExcRequest(), scheduleTask: TaskScheduler = nil, peersPerRequest = DefaultMaxPeersPerRequest): T = - proc taskScheduler(task: BitswapPeerCtx): bool = + proc taskScheduler(task: BlockExcPeerCtx): bool = if not isNil(scheduleTask): return scheduleTask(task) - let b = BitswapEngine( + let b = BlockExcEngine( localStore: localStore, pendingBlocks: PendingBlocksManager.new(), peersPerRequest: peersPerRequest, diff --git a/dagger/bitswap/engine/payments.nim b/dagger/blockexchange/engine/payments.nim similarity index 91% rename from dagger/bitswap/engine/payments.nim rename to dagger/blockexchange/engine/payments.nim index 0beecc86..08718fcc 100644 --- a/dagger/bitswap/engine/payments.nim +++ b/dagger/blockexchange/engine/payments.nim @@ -18,7 +18,7 @@ func openLedgerChannel*(wallet: WalletRef, asset: EthAddress): ?!ChannelId = wallet.openLedgerChannel(hub, ChainId, asset, AmountPerChannel) -func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId = +func getOrOpenChannel(wallet: WalletRef, peer: BlockExcPeerCtx): ?!ChannelId = if channel =? peer.paymentChannel: success channel elif account =? peer.account: @@ -29,7 +29,7 @@ func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId = failure "no account set for peer" func pay*(wallet: WalletRef, - peer: BitswapPeerCtx, + peer: BlockExcPeerCtx, amount: UInt256): ?!SignedState = if account =? peer.account: let asset = Asset diff --git a/dagger/bitswap/network.nim b/dagger/blockexchange/network.nim similarity index 80% rename from dagger/bitswap/network.nim rename to dagger/blockexchange/network.nim index 8774f3ca..6338dd00 100644 --- a/dagger/bitswap/network.nim +++ b/dagger/blockexchange/network.nim @@ -13,19 +13,20 @@ import pkg/chronicles import pkg/chronos import pkg/libp2p +import pkg/questionable +import pkg/questionable/results import ../blocktype as bt -import ./protobuf/bitswap as pb +import ./protobuf/blockexc as pb import ./protobuf/payments import ./networkpeer -export pb, networkpeer -export payments +export networkpeer, payments logScope: - topics = "dagger bitswap network" + topics = "dagger blockexc network" -const Codec* = "/ipfs/bitswap/1.2.0" +const Codec* = "/dagger/blockexc/1.0.0" type WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.} @@ -34,7 +35,7 @@ type AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.} PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} - BitswapHandlers* = object + BlockExcHandlers* = object onWantList*: WantListHandler onBlocks*: BlocksHandler onPresence*: BlockPresenceHandler @@ -55,22 +56,22 @@ type AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.} PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} - BitswapRequest* = object + BlockExcRequest* = object sendWantList*: WantListBroadcaster sendBlocks*: BlocksBroadcaster sendPresence*: PresenceBroadcaster sendAccount*: AccountBroadcaster sendPayment*: PaymentBroadcaster - BitswapNetwork* = ref object of LPProtocol + BlockExcNetwork* = ref object of LPProtocol peers*: Table[PeerID, NetworkPeer] switch*: Switch - handlers*: BitswapHandlers - request*: BitswapRequest + handlers*: BlockExcHandlers + request*: BlockExcRequest getConn: ConnProvider proc handleWantList( - b: BitswapNetwork, + b: BlockExcNetwork, peer: NetworkPeer, list: WantList) = ## Handle incoming want list @@ -102,7 +103,7 @@ proc makeWantList*( WantList(entries: entries, full: full) proc broadcastWantList*( - b: BitswapNetwork, + b: BlockExcNetwork, id: PeerID, cids: seq[Cid], priority: int32 = 0, @@ -128,7 +129,7 @@ proc broadcastWantList*( asyncSpawn b.peers[id].send(Message(wantlist: wantList)) proc handleBlocks( - b: BitswapNetwork, + b: BlockExcNetwork, peer: NetworkPeer, blocks: seq[auto]) = ## Handle incoming blocks @@ -142,9 +143,11 @@ proc handleBlocks( var blks: seq[bt.Block] for blk in blocks: when blk is pb.Block: - blks.add(bt.Block.new(Cid.init(blk.prefix).get(), blk.data)) + if b =? bt.Block.new(Cid.init(blk.prefix).get(), blk.data): + blks.add(b) elif blk is seq[byte]: - blks.add(bt.Block.new(Cid.init(blk).get(), blk)) + if b =? bt.Block.new(Cid.init(blk).get(), blk): + blks.add(b) else: error("Invalid block type") @@ -155,7 +158,6 @@ template makeBlocks*( seq[pb.Block] = var blks: seq[pb.Block] for blk in blocks: - # for now only send bitswap `1.1.0` blks.add(pb.Block( prefix: blk.cid.data.buffer, data: blk.data @@ -164,7 +166,7 @@ template makeBlocks*( blks proc broadcastBlocks*( - b: BitswapNetwork, + b: BlockExcNetwork, id: PeerID, blocks: seq[bt.Block]) = ## Send blocks to remote @@ -177,7 +179,7 @@ proc broadcastBlocks*( asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks))) proc handleBlockPresence( - b: BitswapNetwork, + b: BlockExcNetwork, peer: NetworkPeer, presence: seq[BlockPresence]) = ## Handle block presence @@ -190,7 +192,7 @@ proc handleBlockPresence( b.handlers.onPresence(peer.id, presence) proc broadcastBlockPresence*( - b: BitswapNetwork, + b: BlockExcNetwork, id: PeerID, presence: seq[BlockPresence]) = ## Send presence to remote @@ -202,14 +204,14 @@ proc broadcastBlockPresence*( trace "Sending presence to peer", peer = id asyncSpawn b.peers[id].send(Message(blockPresences: presence)) -proc handleAccount(network: BitswapNetwork, +proc handleAccount(network: BlockExcNetwork, peer: NetworkPeer, account: Account) = if network.handlers.onAccount.isNil: return network.handlers.onAccount(peer.id, account) -proc broadcastAccount*(network: BitswapNetwork, +proc broadcastAccount*(network: BlockExcNetwork, id: PeerId, account: Account) = if id notin network.peers: @@ -218,7 +220,7 @@ proc broadcastAccount*(network: BitswapNetwork, let message = Message(account: AccountMessage.init(account)) asyncSpawn network.peers[id].send(message) -proc broadcastPayment*(network: BitswapNetwork, +proc broadcastPayment*(network: BlockExcNetwork, id: PeerId, payment: SignedState) = if id notin network.peers: @@ -227,21 +229,18 @@ proc broadcastPayment*(network: BitswapNetwork, let message = Message(payment: StateChannelUpdate.init(payment)) asyncSpawn network.peers[id].send(message) -proc handlePayment(network: BitswapNetwork, +proc handlePayment(network: BlockExcNetwork, peer: NetworkPeer, payment: SignedState) = if network.handlers.onPayment.isNil: return network.handlers.onPayment(peer.id, payment) -proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} = +proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} = try: if msg.wantlist.entries.len > 0: b.handleWantList(peer, msg.wantlist) - if msg.blocks.len > 0: - b.handleBlocks(peer, msg.blocks) - if msg.payload.len > 0: b.handleBlocks(peer, msg.payload) @@ -255,10 +254,10 @@ proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} = b.handlePayment(peer, payment) except CatchableError as exc: - trace "Exception in bitswap rpc handler", exc = exc.msg + trace "Exception in blockexc rpc handler", exc = exc.msg -proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer = - ## Creates or retrieves a BitswapNetwork Peer +proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer = + ## Creates or retrieves a BlockExcNetwork Peer ## if peer in b.peers: @@ -268,7 +267,7 @@ proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer = try: return await b.switch.dial(peer, Codec) except CatchableError as exc: - trace "unable to connect to bitswap peer", exc = exc.msg + trace "unable to connect to blockexc peer", exc = exc.msg if not isNil(b.getConn): getConn = b.getConn @@ -277,31 +276,35 @@ proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer = b.rpcHandler(p, msg) # create new pubsub peer - let bitSwapPeer = NetworkPeer.new(peer, getConn, rpcHandler) - debug "created new bitswap peer", peer + let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) + debug "created new blockexc peer", peer - b.peers[peer] = bitSwapPeer + b.peers[peer] = blockExcPeer - return bitSwapPeer + return blockExcPeer -proc setupPeer*(b: BitswapNetwork, peer: PeerID) = +proc setupPeer*(b: BlockExcNetwork, peer: PeerID) = ## Perform initial setup, such as want ## list exchange ## discard b.getOrCreatePeer(peer) -proc dropPeer*(b: BitswapNetwork, peer: PeerID) = +proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = ## Cleanup disconnected peer ## b.peers.del(peer) -method init*(b: BitswapNetwork) = +method init*(b: BlockExcNetwork) = ## Perform protocol initialization ## - proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = + proc peerEventHandler(peerInfo: PeerInfo, event: PeerEvent) {.async.} = + # TODO: temporary until libp2p moves back to PeerID + let + peerId = peerInfo.peerId + if event.kind == PeerEventKind.Joined: b.setupPeer(peerId) else: @@ -312,20 +315,20 @@ method init*(b: BitswapNetwork) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = let peerId = conn.peerInfo.peerId - let bitswapPeer = b.getOrCreatePeer(peerId) - await bitswapPeer.readLoop(conn) # attach read loop + let blockexcPeer = b.getOrCreatePeer(peerId) + await blockexcPeer.readLoop(conn) # attach read loop b.handler = handle b.codec = Codec proc new*( - T: type BitswapNetwork, + T: type BlockExcNetwork, switch: Switch, connProvider: ConnProvider = nil): T = - ## Create a new BitswapNetwork instance + ## Create a new BlockExcNetwork instance ## - let b = BitswapNetwork( + let b = BlockExcNetwork( switch: switch, getConn: connProvider) @@ -353,7 +356,7 @@ proc new*( proc sendPayment(id: PeerID, payment: SignedState) = b.broadcastPayment(id, payment) - b.request = BitswapRequest( + b.request = BlockExcRequest( sendWantList: sendWantList, sendBlocks: sendBlocks, sendPresence: sendPresence, diff --git a/dagger/bitswap/networkpeer.nim b/dagger/blockexchange/networkpeer.nim similarity index 93% rename from dagger/bitswap/networkpeer.nim rename to dagger/blockexchange/networkpeer.nim index aae8fd91..7e011021 100644 --- a/dagger/bitswap/networkpeer.nim +++ b/dagger/blockexchange/networkpeer.nim @@ -12,10 +12,10 @@ import pkg/chronicles import pkg/protobuf_serialization import pkg/libp2p -import ./protobuf/bitswap +import ./protobuf/blockexc logScope: - topics = "dagger bitswap networkpeer" + topics = "dagger blockexc networkpeer" const MaxMessageSize = 8 * 1024 * 1024 @@ -43,7 +43,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = trace "Got message for peer", peer = b.id, msg await b.handler(b, msg) except CatchableError as exc: - trace "Exception in bitswap read loop", exc = exc.msg + trace "Exception in blockexc read loop", exc = exc.msg finally: await conn.close() @@ -65,7 +65,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = trace "Sending message to remote", peer = b.id, msg = $msg await conn.writeLp(Protobuf.encode(msg)) -proc new*( +func new*( T: type NetworkPeer, peer: PeerId, connProvider: ConnProvider, diff --git a/dagger/bitswap/peercontext.nim b/dagger/blockexchange/peercontext.nim similarity index 74% rename from dagger/bitswap/peercontext.nim rename to dagger/blockexchange/peercontext.nim index 74fcd909..99c7b15f 100644 --- a/dagger/bitswap/peercontext.nim +++ b/dagger/blockexchange/peercontext.nim @@ -4,15 +4,14 @@ import pkg/libp2p import pkg/chronos import pkg/nitro import pkg/questionable -import ./protobuf/bitswap +import ./protobuf/blockexc import ./protobuf/payments import ./protobuf/presence -export payments -export nitro +export payments, nitro type - BitswapPeerCtx* = ref object of RootObj + BlockExcPeerCtx* = ref object of RootObj id*: PeerID peerPrices*: Table[Cid, UInt256] # remote peer have list including price peerWants*: seq[Entry] # remote peers want lists @@ -21,16 +20,16 @@ type account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id -proc peerHave*(context: BitswapPeerCtx): seq[Cid] = +proc peerHave*(context: BlockExcPeerCtx): seq[Cid] = toSeq(context.peerPrices.keys) -proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool = +proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool = ## Convenience method to check for peer prepense ## a.anyIt( it.id == b ) -func updatePresence*(context: BitswapPeerCtx, presence: Presence) = +func updatePresence*(context: BlockExcPeerCtx, presence: Presence) = let cid = presence.cid let price = presence.price @@ -39,7 +38,7 @@ func updatePresence*(context: BitswapPeerCtx, presence: Presence) = elif cid in context.peerHave and not presence.have: context.peerPrices.del(cid) -func price*(context: BitswapPeerCtx, cids: seq[Cid]): UInt256 = +func price*(context: BlockExcPeerCtx, cids: seq[Cid]): UInt256 = for cid in cids: if price =? context.peerPrices.?[cid]: result += price diff --git a/dagger/bitswap/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim similarity index 94% rename from dagger/bitswap/pendingblocks.nim rename to dagger/blockexchange/pendingblocks.nim index af1973e1..b783c0d1 100644 --- a/dagger/bitswap/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -16,7 +16,7 @@ import pkg/libp2p import ../blocktype logScope: - topics = "dagger bitswap pendingblocks" + topics = "dagger blockexc pendingblocks" type PendingBlocksManager* = ref object of RootObj @@ -67,7 +67,7 @@ proc contains*( p: PendingBlocksManager, cid: Cid): bool = p.pending(cid) -proc new*(T: type PendingBlocksManager): T = +func new*(T: type PendingBlocksManager): T = T( blocks: initTable[Cid, Future[Block]]() ) diff --git a/dagger/bitswap/protobuf/bitswap.nim b/dagger/blockexchange/protobuf/blockexc.nim similarity index 100% rename from dagger/bitswap/protobuf/bitswap.nim rename to dagger/blockexchange/protobuf/blockexc.nim diff --git a/dagger/bitswap/protobuf/message.proto b/dagger/blockexchange/protobuf/message.proto similarity index 74% rename from dagger/bitswap/protobuf/message.proto rename to dagger/blockexchange/protobuf/message.proto index ac8b4e88..48a4ea36 100644 --- a/dagger/bitswap/protobuf/message.proto +++ b/dagger/blockexchange/protobuf/message.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package bitswap.message.pb; +package blockexc.message.pb; message Message { @@ -11,7 +11,7 @@ message Message { } message Entry { - bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) + bytes block = 1; // the block cid int32 priority = 2; // the priority (normalized). default to 1 bool cancel = 3; // whether this revokes an entry WantType wantType = 4; // Note: defaults to enum 0, ie Block @@ -47,10 +47,9 @@ message Message { } Wantlist wantlist = 1; - repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 - repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 - repeated BlockPresence blockPresences = 4; - int32 pendingBytes = 5; - AccountMessage account = 6; - StateChannelUpdate payment = 7; + repeated Block payload = 2; + repeated BlockPresence blockPresences = 3; + int32 pendingBytes = 4; + AccountMessage account = 5; + StateChannelUpdate payment = 6; } diff --git a/dagger/bitswap/protobuf/message.proto.license b/dagger/blockexchange/protobuf/message.proto.license similarity index 100% rename from dagger/bitswap/protobuf/message.proto.license rename to dagger/blockexchange/protobuf/message.proto.license diff --git a/dagger/bitswap/protobuf/payments.nim b/dagger/blockexchange/protobuf/payments.nim similarity index 98% rename from dagger/bitswap/protobuf/payments.nim rename to dagger/blockexchange/protobuf/payments.nim index 078b5f61..bf1ee0f2 100644 --- a/dagger/bitswap/protobuf/payments.nim +++ b/dagger/blockexchange/protobuf/payments.nim @@ -4,7 +4,7 @@ import pkg/stint import pkg/nitro import pkg/questionable import pkg/upraises -import ./bitswap +import ./blockexc export AccountMessage export StateChannelUpdate diff --git a/dagger/bitswap/protobuf/presence.nim b/dagger/blockexchange/protobuf/presence.nim similarity index 93% rename from dagger/bitswap/protobuf/presence.nim rename to dagger/blockexchange/protobuf/presence.nim index af8a4fb7..cc53edcb 100644 --- a/dagger/bitswap/protobuf/presence.nim +++ b/dagger/blockexchange/protobuf/presence.nim @@ -3,7 +3,7 @@ import pkg/stint import pkg/questionable import pkg/questionable/results import pkg/upraises -import ./bitswap +import ./blockexc export questionable export stint @@ -12,7 +12,7 @@ export BlockPresenceType upraises.push: {.upraises: [].} type - PresenceMessage* = bitswap.BlockPresence + PresenceMessage* = blockexc.BlockPresence Presence* = object cid*: Cid have*: bool diff --git a/dagger/blockset.nim b/dagger/blockset.nim new file mode 100644 index 00000000..2f323c1f --- /dev/null +++ b/dagger/blockset.nim @@ -0,0 +1,62 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results + +import ./blockstream +export blockstream + +type + BlockSetRef* = ref object of BlockStreamRef + stream*: BlockStreamRef + hcodec*: MultiCodec + +proc hashBytes*(mh: MultiHash): seq[byte] = + mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)] + +proc hashBytes*(b: Block): seq[byte] = + if mh =? b.cid.mhash: + return mh.hashBytes() + +method nextBlock*(d: BlockSetRef): ?!Block = + d.stream.nextBlock() + +proc treeHash*(d: BlockSetRef): ?!MultiHash = + var + stack: seq[seq[byte]] + + while true: + let (blk1, blk2) = (d.nextBlock().option, d.nextBlock().option) + if blk1.isNone and blk2.isNone and stack.len == 1: + let res = MultiHash.digest($d.hcodec, stack[0]) + if mh =? res: + return success mh + + return failure($res.error) + + if blk1.isSome: stack.add((!blk1).hashBytes()) + if blk2.isSome: stack.add((!blk2).hashBytes()) + + while stack.len > 1: + let (b1, b2) = (stack.pop(), stack.pop()) + let res = MultiHash.digest($d.hcodec, b1 & b2) + if mh =? res: + stack.add(mh.hashBytes()) + else: + return failure($res.error) + +func new*( + T: type BlockSetRef, + stream: BlockStreamRef, + hcodec: MultiCodec = multiCodec("sha2-256")): T = + T(stream: stream, hcodec: hcodec) diff --git a/dagger/blockstream.nim b/dagger/blockstream.nim new file mode 100644 index 00000000..cb7601ce --- /dev/null +++ b/dagger/blockstream.nim @@ -0,0 +1,12 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import ./blockstream/[blockstream, chunkedblockstream] + +export blockstream, chunkedblockstream diff --git a/dagger/blockstream/blockstream.nim b/dagger/blockstream/blockstream.nim new file mode 100644 index 00000000..fd342c9c --- /dev/null +++ b/dagger/blockstream/blockstream.nim @@ -0,0 +1,29 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/questionable +import pkg/questionable/results +import ../blocktype + +export blocktype + +type + BlockStreamRef* = ref object of RootObj + +method nextBlock*(b: BlockStreamRef): ?!Block {.base.} = + doAssert(false, "Not implemented!") + +iterator items*(b: BlockStreamRef): Block = + while true: + without blk =? b.nextBlock(): + break + + yield blk diff --git a/dagger/blockstream/chunkedblockstream.nim b/dagger/blockstream/chunkedblockstream.nim new file mode 100644 index 00000000..beddd77e --- /dev/null +++ b/dagger/blockstream/chunkedblockstream.nim @@ -0,0 +1,28 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/questionable +import pkg/questionable/results + +import ./blockstream +import ../chunker + +type + ChunkedBlockStreamRef* = ref object of BlockStreamRef + chunker*: Chunker + +method nextBlock*(c: ChunkedBlockStreamRef): ?!Block = + let data: seq[byte] = c.chunker.getBytes() + if data.len > 0: + return Block.new(data) + +func new*(T: type ChunkedBlockStreamRef, chunker: Chunker): T = + T(chunker: chunker) diff --git a/dagger/blocktype.nim b/dagger/blocktype.nim index 7833a3c4..c4e65afd 100644 --- a/dagger/blocktype.nim +++ b/dagger/blocktype.nim @@ -7,16 +7,14 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import pkg/libp2p/multihash -import pkg/libp2p/multicodec -import pkg/libp2p/cid +{.push raises: [Defect].} + +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results import pkg/stew/byteutils -export cid, multihash, multicodec - type - CidDontMatchError* = object of CatchableError - Block* = object of RootObj cid*: Cid data*: seq[byte] @@ -25,31 +23,31 @@ proc `$`*(b: Block): string = result &= "cid: " & $b.cid result &= "\ndata: " & string.fromBytes(b.data) -proc new*( +func new*( + T: type Block, + data: openArray[byte] = [], + version = CIDv1, + hcodec = multiCodec("sha2-256"), + codec = multiCodec("raw")): ?!T = + let hash = MultiHash.digest($hcodec, data).get() + success Block( + cid: Cid.init(version, codec, hash).get(), + data: @data) + +func new*( T: type Block, cid: Cid, data: openArray[byte] = [], - verify: bool = false): T = - let b = Block.new( + verify: bool = false): ?!T = + let res = Block.new( data, cid.cidver, cid.mhash.get().mcodec, cid.mcodec ) - if verify and cid != b.cid: - raise newException(CidDontMatchError, - "The suplied Cid doesn't match the data!") + if b =? res: + if verify and cid != b.cid: + return failure("The suplied Cid doesn't match the data!") - return b - -proc new*( - T: type Block, - data: openArray[byte] = [], - version = CIDv0, - hcodec = multiCodec("sha2-256"), - codec = multiCodec("dag-pb")): T = - let hash = MultiHash.digest($hcodec, data).get() - Block( - cid: Cid.init(version, codec, hash).get(), - data: @data) + res diff --git a/dagger/chunker.nim b/dagger/chunker.nim index e22cebf0..c1922f10 100644 --- a/dagger/chunker.nim +++ b/dagger/chunker.nim @@ -7,11 +7,16 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -# TODO: This is super inneficient and merits a rewrite, but it'll do for now +# TODO: This is super inneficient and needs a rewrite, but it'll do for now + +{.push raises: [Defect].} import std/sequtils -import ./p2p/rng +import pkg/questionable +import pkg/questionable/results + +import ./rng import ./blocktype export blocktype @@ -21,7 +26,9 @@ const type # default reader type - Reader* = proc(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.} + Reader* = + proc(data: var openArray[byte], offset: Natural = 0): int + {.gcsafe, closure, raises: [Defect].} ChunkerType* {.pure.} = enum SizedChunker @@ -63,7 +70,7 @@ iterator items*(c: Chunker): seq[byte] = yield chunk -proc new*( +func new*( T: type Chunker, kind = ChunkerType.SizedChunker, reader: Reader, @@ -122,11 +129,15 @@ proc newFileChunker*( ## proc reader(data: var openArray[byte], offset: Natural = 0): int = - return file.readBytes(data, 0, data.len) + try: + return file.readBytes(data, 0, data.len) + except IOError as exc: + # TODO: revisit error handling - should this be fatal? + raise newException(Defect, exc.msg) Chunker.new( kind = ChunkerType.SizedChunker, reader = reader, - size = file.getFileSize(), + size = try: file.getFileSize() except: 0, # TODO: should do something smarter abou this pad = pad, chunkSize = chunkSize) diff --git a/dagger/dagger.nim b/dagger/dagger.nim new file mode 100644 index 00000000..ec0ba0b2 --- /dev/null +++ b/dagger/dagger.nim @@ -0,0 +1,9 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + diff --git a/dagger/p2p/rng.nim b/dagger/rng.nim similarity index 100% rename from dagger/p2p/rng.nim rename to dagger/rng.nim diff --git a/dagger/stores.nim b/dagger/stores.nim new file mode 100644 index 00000000..bc0e604a --- /dev/null +++ b/dagger/stores.nim @@ -0,0 +1,3 @@ +import ./stores/[memorystore, blockstore, blockexchange] + +export memorystore, blockstore, blockexchange diff --git a/dagger/bitswap.nim b/dagger/stores/blockexchange.nim similarity index 60% rename from dagger/bitswap.nim rename to dagger/stores/blockexchange.nim index 19766a70..4652437f 100644 --- a/dagger/bitswap.nim +++ b/dagger/stores/blockexchange.nim @@ -14,18 +14,19 @@ import pkg/chronos import pkg/libp2p import pkg/libp2p/errors -import ./bitswap/protobuf/bitswap as pb -import ./blocktype as bt -import ./stores/blockstore -import ./utils/asyncheapqueue +import ../blocktype as bt +import ../utils/asyncheapqueue -import ./bitswap/network -import ./bitswap/engine +import ./blockstore +import ../blockexchange/network +import ../blockexchange/engine +import ../blockexchange/peercontext +import ../blockexchange/protobuf/blockexc as pb -export network, blockstore, asyncheapqueue, engine +export blockstore, network, engine, asyncheapqueue logScope: - topics = "dagger bitswap" + topics = "dagger blockexc" const DefaultTaskQueueSize = 100 @@ -33,58 +34,58 @@ const DefaultMaxRetries = 3 type - Bitswap* = ref object of BlockStore - engine*: BitswapEngine # bitswap decision engine - taskQueue*: AsyncHeapQueue[BitswapPeerCtx] # peers we're currently processing tasks for - bitswapTasks: seq[Future[void]] # future to control bitswap task - bitswapRunning: bool # indicates if the bitswap task is running + BlockExc* = ref object of BlockStore + engine*: BlockExcEngine # blockexc decision engine + taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for + blockexcTasks: seq[Future[void]] # future to control blockexc task + blockexcRunning: bool # indicates if the blockexc task is running concurrentTasks: int # number of concurrent peers we're serving at any given time maxRetries: int # max number of tries for a failed block taskHandler: TaskHandler # handler provided by the engine called by the runner -proc bitswapTaskRunner(b: Bitswap) {.async.} = +proc blockexcTaskRunner(b: BlockExc) {.async.} = ## process tasks ## - while b.bitswapRunning: + while b.blockexcRunning: let peerCtx = await b.taskQueue.pop() asyncSpawn b.taskHandler(peerCtx) - trace "Exiting bitswap task runner" + trace "Exiting blockexc task runner" -proc start*(b: Bitswap) {.async.} = - ## Start the bitswap task +proc start*(b: BlockExc) {.async.} = + ## Start the blockexc task ## - trace "bitswap start" + trace "blockexc start" - if b.bitswapTasks.len > 0: - warn "Starting bitswap twice" + if b.blockexcTasks.len > 0: + warn "Starting blockexc twice" return - b.bitswapRunning = true + b.blockexcRunning = true for i in 0.. 0 -suite "Bitswap - multiple nodes": +suite "BlockExc - multiple nodes": let chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256) - blocks = chunker.mapIt( bt.Block.new(it) ) + blocks = chunker.mapIt( !bt.Block.new(it) ) var switch: seq[Switch] - bitswap: seq[Bitswap] + blockexc: seq[BlockExc] awaiters: seq[Future[void]] setup: for e in generateNodes(5): switch.add(e.switch) - bitswap.add(e.bitswap) - await e.bitswap.start() + blockexc.add(e.blockexc) + await e.blockexc.start() awaiters = switch.mapIt( (await it.start()) @@ -185,17 +184,17 @@ suite "Bitswap - multiple nodes": test "should receive haves for own want list": let - downloader = bitswap[4] + downloader = blockexc[4] engine = downloader.engine # Add blocks from 1st peer to want list engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) - bitswap[0].engine.localStore.putBlocks(blocks[0..3]) - bitswap[1].engine.localStore.putBlocks(blocks[4..7]) - bitswap[2].engine.localStore.putBlocks(blocks[8..11]) - bitswap[3].engine.localStore.putBlocks(blocks[12..15]) + blockexc[0].engine.localStore.putBlocks(blocks[0..3]) + blockexc[1].engine.localStore.putBlocks(blocks[4..7]) + blockexc[2].engine.localStore.putBlocks(blocks[8..11]) + blockexc[3].engine.localStore.putBlocks(blocks[12..15]) await connectNodes(switch) @@ -209,17 +208,17 @@ suite "Bitswap - multiple nodes": test "should exchange blocks with multiple nodes": let - downloader = bitswap[4] + downloader = blockexc[4] engine = downloader.engine # Add blocks from 1st peer to want list engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) - bitswap[0].engine.localStore.putBlocks(blocks[0..3]) - bitswap[1].engine.localStore.putBlocks(blocks[4..7]) - bitswap[2].engine.localStore.putBlocks(blocks[8..11]) - bitswap[3].engine.localStore.putBlocks(blocks[12..15]) + blockexc[0].engine.localStore.putBlocks(blocks[0..3]) + blockexc[1].engine.localStore.putBlocks(blocks[4..7]) + blockexc[2].engine.localStore.putBlocks(blocks[8..11]) + blockexc[3].engine.localStore.putBlocks(blocks[12..15]) await connectNodes(switch) let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid )) diff --git a/tests/dagger/testblockstore.nim b/tests/dagger/stores/testblockstore.nim similarity index 82% rename from tests/dagger/testblockstore.nim rename to tests/dagger/stores/testblockstore.nim index 517b403b..54a0bb04 100644 --- a/tests/dagger/testblockstore.nim +++ b/tests/dagger/stores/testblockstore.nim @@ -1,20 +1,22 @@ import std/sequtils + import pkg/chronos import pkg/asynctest import pkg/libp2p import pkg/stew/byteutils - -import pkg/dagger/p2p/rng +import pkg/questionable +import pkg/questionable/results +import pkg/dagger/rng import pkg/dagger/stores/memorystore import pkg/dagger/chunker -import ./helpers +import ../helpers suite "Memory Store": var store: MemoryStore var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) - var blocks = chunker.mapIt( Block.new(it) ) + var blocks = chunker.mapIt( !Block.new(it) ) setup: store = MemoryStore.new(blocks) @@ -40,9 +42,9 @@ suite "Memory Store": test "add blocks change handler": let blocks = @[ - Block.new("Block 1".toBytes), - Block.new("Block 2".toBytes), - Block.new("Block 3".toBytes), + !Block.new("Block 1".toBytes), + !Block.new("Block 2".toBytes), + !Block.new("Block 3".toBytes), ] var triggered = false @@ -59,9 +61,9 @@ suite "Memory Store": test "add blocks change handler": let blocks = @[ - Block.new("Block 1".toBytes), - Block.new("Block 2".toBytes), - Block.new("Block 3".toBytes), + !Block.new("Block 1".toBytes), + !Block.new("Block 2".toBytes), + !Block.new("Block 3".toBytes), ] var triggered = false diff --git a/tests/dagger/bitswap/utils.nim b/tests/dagger/stores/utils.nim similarity index 69% rename from tests/dagger/bitswap/utils.nim rename to tests/dagger/stores/utils.nim index a3eca793..b6b3309c 100644 --- a/tests/dagger/bitswap/utils.nim +++ b/tests/dagger/stores/utils.nim @@ -3,8 +3,7 @@ import std/sequtils import pkg/chronos import pkg/libp2p -import pkg/dagger/bitswap -import pkg/dagger/stores/memorystore +import pkg/dagger/stores import pkg/dagger/blocktype as bt import ../examples @@ -14,21 +13,20 @@ proc generateNodes*( blocks: openArray[bt.Block] = [], secureManagers: openarray[SecureProtocol] = [ SecureProtocol.Noise, - ]): seq[tuple[switch: Switch, bitswap: Bitswap]] = - + ]): seq[tuple[switch: Switch, blockexc: BlockExc]] = for i in 0..