From b0a1166c3cfb5ed42c80e05306b2b02df71e2f8f Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Mon, 1 Feb 2021 16:19:53 +0100 Subject: [PATCH] Bitswap: retrieve() waits until IPFS object has been retrieved Workaround that always waited for exactly 1 second has been removed. --- ipfs/bitswap.nim | 6 ++++-- ipfs/repo.nim | 16 +++++++++++++++- ipfs/repo/waitinglist.nim | 20 ++++++++++++++++++++ tests/ipfs/testBitswap.nim | 2 +- tests/ipfs/testRepo.nim | 13 ++++++++++++- tests/ipfs/testWaitingList.nim | 31 +++++++++++++++++++++++++++++++ tests/testAll.nim | 1 + 7 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 ipfs/repo/waitinglist.nim create mode 100644 tests/ipfs/testWaitingList.nim diff --git a/ipfs/bitswap.nim b/ipfs/bitswap.nim index ca8914af..689c6e51 100644 --- a/ipfs/bitswap.nim +++ b/ipfs/bitswap.nim @@ -39,10 +39,12 @@ proc connect*(bitswap: Bitswap, peer: PeerInfo) {.async.} = proc store*(bitswap: Bitswap, obj: IpfsObject) = bitswap.repo.store(obj) -proc retrieve*(bitswap: Bitswap, cid: Cid): Future[Option[IpfsObject]] {.async.} = +proc retrieve*(bitswap: Bitswap, + cid: Cid, + timeout = 30.seconds): Future[Option[IpfsObject]] {.async.} = result = bitswap.repo.retrieve(cid) if result.isNone: for exchange in bitswap.exchanges: await exchange.want(cid) - await sleepAsync(1.seconds) # TODO + await bitswap.repo.wait(cid, timeout) result = bitswap.repo.retrieve(cid) diff --git a/ipfs/repo.nim b/ipfs/repo.nim index eaefaeb2..dc4412ea 100644 --- a/ipfs/repo.nim +++ b/ipfs/repo.nim @@ -1,8 +1,10 @@ import std/options import std/tables import std/hashes +import pkg/chronos import pkg/libp2p import ./ipfsobject +import ./repo/waitinglist export options export ipfsobject @@ -10,12 +12,15 @@ export ipfsobject type Repo* = ref object storage: Table[Cid, IpfsObject] + waiting: WaitingList[Cid] proc hash(id: Cid): Hash = hash($id) proc store*(repo: Repo, obj: IpfsObject) = - repo.storage[obj.cid] = obj + let id = obj.cid + repo.storage[id] = obj + repo.waiting.deliver(id) proc contains*(repo: Repo, id: Cid): bool = repo.storage.hasKey(id) @@ -25,3 +30,12 @@ proc retrieve*(repo: Repo, id: Cid): Option[IpfsObject] = repo.storage[id].some else: IpfsObject.none + +proc wait*(repo: Repo, id: Cid, timeout: Duration): Future[void] = + var future: Future[void] + if repo.contains(id): + future = newFuture[void]() + future.complete() + else: + future = repo.waiting.wait(id, timeout) + future diff --git a/ipfs/repo/waitinglist.nim b/ipfs/repo/waitinglist.nim new file mode 100644 index 00000000..ab7a6786 --- /dev/null +++ b/ipfs/repo/waitinglist.nim @@ -0,0 +1,20 @@ +import std/tables +import pkg/chronos + +type WaitingList*[T] = object + futures: Table[T, seq[Future[void]]] + +proc wait*[T](list: var WaitingList, item: T, timeout: Duration): Future[void] = + let future = newFuture[void]("waitinglist.wait") + proc onTimeout(_: pointer) = + if not future.finished: + future.complete() + discard setTimer(Moment.fromNow(timeout), onTimeout, nil) + list.futures.mgetOrPut(item, @[]).add(future) + future + +proc deliver*[T](list: var WaitingList, item: T) = + if list.futures.hasKey(item): + for future in list.futures[item]: + future.complete() + list.futures.del(item) diff --git a/tests/ipfs/testBitswap.nim b/tests/ipfs/testBitswap.nim index d93fc5ba..1d5466c3 100644 --- a/tests/ipfs/testBitswap.nim +++ b/tests/ipfs/testBitswap.nim @@ -33,7 +33,7 @@ suite "bitswap": check (await bitswap1.retrieve(obj.cid)).get() == obj test "signals retrieval failure": - check (await bitswap1.retrieve(obj.cid)).isNone + check (await bitswap1.retrieve(obj.cid, 100.milliseconds)).isNone test "retrieves objects from network": bitswap1.store(obj) diff --git a/tests/ipfs/testRepo.nim b/tests/ipfs/testRepo.nim index a70f2962..8af5370b 100644 --- a/tests/ipfs/testRepo.nim +++ b/tests/ipfs/testRepo.nim @@ -1,4 +1,5 @@ -import std/unittest +import pkg/asynctest +import pkg/chronos import pkg/ipfs/repo suite "repo": @@ -23,3 +24,13 @@ suite "repo": check repo.contains(obj.cid) == false repo.store(obj) check repo.contains(obj.cid) == true + + test "waits for IPFS object to arrive": + let waiting = repo.wait(obj.cid, 1.minutes) + check not waiting.finished + repo.store(obj) + check waiting.finished + + test "does not wait when IPFS object is already stored": + repo.store(obj) + check repo.wait(obj.cid, 1.minutes).finished diff --git a/tests/ipfs/testWaitingList.nim b/tests/ipfs/testWaitingList.nim new file mode 100644 index 00000000..986742c3 --- /dev/null +++ b/tests/ipfs/testWaitingList.nim @@ -0,0 +1,31 @@ +import pkg/asynctest +import pkg/chronos +import ipfs/repo/waitinglist + +suite "waiting list": + + var list: WaitingList[string] + + setup: + list = WaitingList[string]() + + test "waits for item to be delivered": + let waiting = list.wait("apple", 1.minutes) + check not waiting.finished + list.deliver("orange") + check not waiting.finished + list.deliver("apple") + check waiting.finished + + test "notifies everyone who is waiting": + let wait1 = list.wait("apple", 1.minutes) + let wait2 = list.wait("apple", 1.minutes) + list.deliver("apple") + check wait1.finished + check wait2.finished + + test "stops waiting after timeout": + let wait = list.wait("apple", 100.milliseconds) + check not wait.finished + await sleepAsync(100.milliseconds) + check wait.finished diff --git a/tests/testAll.nim b/tests/testAll.nim index 5b356fcd..0d573335 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -1,5 +1,6 @@ import ./ipfs/testObject import ./ipfs/testChunking +import ./ipfs/testWaitingList import ./ipfs/testRepo import ./ipfs/testDhtRouting import ./ipfs/testProtobuf