Bitswap: retrieve() waits until IPFS object has been retrieved

Workaround that always waited for exactly 1 second has been removed.
This commit is contained in:
Mark Spanbroek 2021-02-01 16:19:53 +01:00 committed by markspanbroek
parent b54c3d9585
commit b0a1166c3c
7 changed files with 84 additions and 5 deletions

View File

@ -39,10 +39,12 @@ proc connect*(bitswap: Bitswap, peer: PeerInfo) {.async.} =
proc store*(bitswap: Bitswap, obj: IpfsObject) =
bitswap.repo.store(obj)
proc retrieve*(bitswap: Bitswap, cid: Cid): Future[Option[IpfsObject]] {.async.} =
proc retrieve*(bitswap: Bitswap,
cid: Cid,
timeout = 30.seconds): Future[Option[IpfsObject]] {.async.} =
result = bitswap.repo.retrieve(cid)
if result.isNone:
for exchange in bitswap.exchanges:
await exchange.want(cid)
await sleepAsync(1.seconds) # TODO
await bitswap.repo.wait(cid, timeout)
result = bitswap.repo.retrieve(cid)

View File

@ -1,8 +1,10 @@
import std/options
import std/tables
import std/hashes
import pkg/chronos
import pkg/libp2p
import ./ipfsobject
import ./repo/waitinglist
export options
export ipfsobject
@ -10,12 +12,15 @@ export ipfsobject
type
Repo* = ref object
storage: Table[Cid, IpfsObject]
waiting: WaitingList[Cid]
proc hash(id: Cid): Hash =
hash($id)
proc store*(repo: Repo, obj: IpfsObject) =
repo.storage[obj.cid] = obj
let id = obj.cid
repo.storage[id] = obj
repo.waiting.deliver(id)
proc contains*(repo: Repo, id: Cid): bool =
repo.storage.hasKey(id)
@ -25,3 +30,12 @@ proc retrieve*(repo: Repo, id: Cid): Option[IpfsObject] =
repo.storage[id].some
else:
IpfsObject.none
proc wait*(repo: Repo, id: Cid, timeout: Duration): Future[void] =
var future: Future[void]
if repo.contains(id):
future = newFuture[void]()
future.complete()
else:
future = repo.waiting.wait(id, timeout)
future

20
ipfs/repo/waitinglist.nim Normal file
View File

@ -0,0 +1,20 @@
import std/tables
import pkg/chronos
type WaitingList*[T] = object
futures: Table[T, seq[Future[void]]]
proc wait*[T](list: var WaitingList, item: T, timeout: Duration): Future[void] =
let future = newFuture[void]("waitinglist.wait")
proc onTimeout(_: pointer) =
if not future.finished:
future.complete()
discard setTimer(Moment.fromNow(timeout), onTimeout, nil)
list.futures.mgetOrPut(item, @[]).add(future)
future
proc deliver*[T](list: var WaitingList, item: T) =
if list.futures.hasKey(item):
for future in list.futures[item]:
future.complete()
list.futures.del(item)

View File

@ -33,7 +33,7 @@ suite "bitswap":
check (await bitswap1.retrieve(obj.cid)).get() == obj
test "signals retrieval failure":
check (await bitswap1.retrieve(obj.cid)).isNone
check (await bitswap1.retrieve(obj.cid, 100.milliseconds)).isNone
test "retrieves objects from network":
bitswap1.store(obj)

View File

@ -1,4 +1,5 @@
import std/unittest
import pkg/asynctest
import pkg/chronos
import pkg/ipfs/repo
suite "repo":
@ -23,3 +24,13 @@ suite "repo":
check repo.contains(obj.cid) == false
repo.store(obj)
check repo.contains(obj.cid) == true
test "waits for IPFS object to arrive":
let waiting = repo.wait(obj.cid, 1.minutes)
check not waiting.finished
repo.store(obj)
check waiting.finished
test "does not wait when IPFS object is already stored":
repo.store(obj)
check repo.wait(obj.cid, 1.minutes).finished

View File

@ -0,0 +1,31 @@
import pkg/asynctest
import pkg/chronos
import ipfs/repo/waitinglist
suite "waiting list":
var list: WaitingList[string]
setup:
list = WaitingList[string]()
test "waits for item to be delivered":
let waiting = list.wait("apple", 1.minutes)
check not waiting.finished
list.deliver("orange")
check not waiting.finished
list.deliver("apple")
check waiting.finished
test "notifies everyone who is waiting":
let wait1 = list.wait("apple", 1.minutes)
let wait2 = list.wait("apple", 1.minutes)
list.deliver("apple")
check wait1.finished
check wait2.finished
test "stops waiting after timeout":
let wait = list.wait("apple", 100.milliseconds)
check not wait.finished
await sleepAsync(100.milliseconds)
check wait.finished

View File

@ -1,5 +1,6 @@
import ./ipfs/testObject
import ./ipfs/testChunking
import ./ipfs/testWaitingList
import ./ipfs/testRepo
import ./ipfs/testDhtRouting
import ./ipfs/testProtobuf