diff --git a/ipfs/bitswap.nim b/ipfs/bitswap.nim new file mode 100644 index 00000000..ca8914af --- /dev/null +++ b/ipfs/bitswap.nim @@ -0,0 +1,48 @@ +import std/options +import pkg/chronos +import pkg/libp2p/cid +import ./ipfsobject +import ./repo +import ./p2p/switch +import ./bitswap/protocol +import ./bitswap/exchange + +export options +export Cid +export Switch + +type + Bitswap* = ref object + repo: Repo + switch: Switch + exchanges: seq[Exchange] # TODO: never cleaned + +proc startExchange(bitswap: Bitswap, stream: BitswapStream) = + let exchange = Exchange.start(bitswap.repo, stream) + bitswap.exchanges.add(exchange) + +proc start*(_: type Bitswap, switch: Switch, repo = Repo()): Bitswap = + let bitswap = Bitswap(repo: repo, switch: switch) + let protocol = BitswapProtocol.new() + proc acceptLoop {.async.} = + while true: + let stream = await protocol.accept() + bitswap.startExchange(stream) + asyncSpawn acceptLoop() + switch.mount(protocol) + bitswap + +proc connect*(bitswap: Bitswap, peer: PeerInfo) {.async.} = + let stream = await bitswap.switch.dial(peer, BitswapProtocol) + bitswap.startExchange(stream) + +proc store*(bitswap: Bitswap, obj: IpfsObject) = + bitswap.repo.store(obj) + +proc retrieve*(bitswap: Bitswap, cid: Cid): Future[Option[IpfsObject]] {.async.} = + result = bitswap.repo.retrieve(cid) + if result.isNone: + for exchange in bitswap.exchanges: + await exchange.want(cid) + await sleepAsync(1.seconds) # TODO + result = bitswap.repo.retrieve(cid) diff --git a/ipfs/bitswap/exchange.nim b/ipfs/bitswap/exchange.nim new file mode 100644 index 00000000..4c32c1d8 --- /dev/null +++ b/ipfs/bitswap/exchange.nim @@ -0,0 +1,38 @@ +import pkg/chronos +import pkg/libp2p/cid +import ../repo +import ./stream +import ./messages + +type Exchange* = object + repo: Repo + stream: BitswapStream + +proc want*(exchange: Exchange, cid: Cid) {.async.} = + await exchange.stream.write(Message.want(cid)) + +proc send*(exchange: Exchange, obj: IpfsObject) {.async.} = + await exchange.stream.write(Message.send(obj.data)) + +proc handlePayload(exchange: Exchange, message: Message) {.async.} = + for bloc in message.payload: + let obj = IpfsObject(data: bloc.data) + exchange.repo.store(obj) + +proc handleWants(exchange: Exchange, message: Message) {.async.} = + for want in message.wantlist.entries: + let cid = Cid.init(want.`block`).get() + let obj = exchange.repo.retrieve(cid) + if obj.isSome: + await exchange.send(obj.get()) + +proc readLoop(exchange: Exchange) {.async.} = + while true: + let message = await exchange.stream.read() + await exchange.handlePayload(message) + await exchange.handleWants(message) + +proc start*(_: type Exchange, repo: Repo, stream: BitswapStream): Exchange = + let exchange = Exchange(repo: repo, stream: stream) + asyncSpawn exchange.readLoop() + exchange diff --git a/tests/ipfs/testBitswap.nim b/tests/ipfs/testBitswap.nim new file mode 100644 index 00000000..d93fc5ba --- /dev/null +++ b/tests/ipfs/testBitswap.nim @@ -0,0 +1,41 @@ +import pkg/chronos +import pkg/asynctest +import pkg/ipfs/ipfsobject +import pkg/ipfs/p2p/switch +import pkg/ipfs/bitswap + +suite "bitswap": + + let address = MultiAddress.init("/ip4/127.0.0.1/tcp/40981").get() + let obj = IpfsObject(data: @[1'u8, 2'u8, 3'u8]) + + var bitswap1, bitswap2: Bitswap + var peer1, peer2: Switch + + setup: + peer1 = Switch.create() + peer2 = Switch.create() + peer1.peerInfo.addrs.add(address) + discard await peer1.start() + discard await peer2.start() + bitswap1 = Bitswap.start(peer1) + bitswap2 = Bitswap.start(peer2) + + teardown: + await peer1.stop() + await peer2.stop() + + test "stores ipfs objects": + bitswap1.store(obj) + + test "retrieves local objects": + bitswap1.store(obj) + check (await bitswap1.retrieve(obj.cid)).get() == obj + + test "signals retrieval failure": + check (await bitswap1.retrieve(obj.cid)).isNone + + test "retrieves objects from network": + bitswap1.store(obj) + await bitswap2.connect(peer1.peerInfo) + check (await bitswap2.retrieve(obj.cid)).get() == obj diff --git a/tests/testAll.nim b/tests/testAll.nim index cb61c65b..5b356fcd 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -5,6 +5,7 @@ import ./ipfs/testDhtRouting import ./ipfs/testProtobuf import ./ipfs/testBitswapMessages import ./ipfs/testBitswapProtocol +import ./ipfs/testBitswap import ./ipfs/testIpfs {.warning[UnusedImport]: off.}