refactors integration tests
This commit is contained in:
parent
00c5ae2711
commit
31a5a1c6be
|
@ -1,5 +1,7 @@
|
|||
from std/times import inMilliseconds, initDuration, inSeconds, fromUnix
|
||||
import std/strformat
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import pkg/codex/logutils
|
||||
import pkg/questionable/results
|
||||
import ../contracts/time
|
||||
|
@ -15,7 +17,7 @@ logScope:
|
|||
topics = "integration test validation"
|
||||
|
||||
template eventuallyS*(expression: untyped, timeout=10, step = 5,
|
||||
cancelWhenExpression: untyped = false): bool =
|
||||
cancelExpression: untyped = false): bool =
|
||||
bind Moment, now, seconds
|
||||
|
||||
proc eventuallyS: Future[bool] {.async.} =
|
||||
|
@ -26,7 +28,7 @@ template eventuallyS*(expression: untyped, timeout=10, step = 5,
|
|||
echo (i*step).seconds
|
||||
if endTime < Moment.now():
|
||||
return false
|
||||
if cancelWhenExpression:
|
||||
if cancelExpression:
|
||||
return false
|
||||
await sleepAsync(step.seconds)
|
||||
return true
|
||||
|
@ -38,77 +40,64 @@ marketplacesuite "Validation":
|
|||
let tolerance = 1
|
||||
let proofProbability = 1
|
||||
|
||||
var slotsFilled: seq[SlotId]
|
||||
var slotsFreed: seq[SlotId]
|
||||
var requestsFailed: seq[RequestId]
|
||||
var requestCancelled = false
|
||||
# var slotsAndRequests = initTable[string, seq[UInt256]]()
|
||||
# var events = initTable[string, seq[ref MarketplaceEvent]]()
|
||||
var events = {
|
||||
$SlotFilled: newSeq[ref MarketplaceEvent](),
|
||||
$SlotFreed: newSeq[ref MarketplaceEvent](),
|
||||
$RequestFailed: newSeq[ref MarketplaceEvent](),
|
||||
$RequestCancelled: newSeq[ref MarketplaceEvent]()
|
||||
}.toTable
|
||||
var eventSubscriptions = newSeq[provider.Subscription]()
|
||||
|
||||
var slotFilledSubscription: provider.Subscription
|
||||
var requestFailedSubscription: provider.Subscription
|
||||
var slotFreedSubscription: provider.Subscription
|
||||
var requestCancelledSubscription: provider.Subscription
|
||||
proc box[T](x: T): ref T =
|
||||
new(result);
|
||||
result[] = x
|
||||
|
||||
proc trackSlotsFilled(marketplace: Marketplace):
|
||||
Future[provider.Subscription] {.async.} =
|
||||
slotsFilled = newSeq[SlotId]()
|
||||
proc onSlotFilled(event: SlotFilled) =
|
||||
let slotId = slotId(event.requestId, event.slotIndex)
|
||||
slotsFilled.add(slotId)
|
||||
debug "SlotFilled", requestId = event.requestId, slotIndex = event.slotIndex,
|
||||
slotId = slotId
|
||||
proc onMarketplaceEvent[T: MarketplaceEvent](event: T) {.gcsafe, raises:[].} =
|
||||
try:
|
||||
debug "onMarketplaceEvent", eventType = $T, event = event
|
||||
events[$T].add(box(event))
|
||||
except KeyError:
|
||||
discard
|
||||
|
||||
let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
|
||||
subscription
|
||||
|
||||
proc trackRequestsFailed(marketplace: Marketplace):
|
||||
Future[provider.Subscription] {.async.} =
|
||||
requestsFailed = newSeq[RequestId]()
|
||||
proc onRequestFailed(event: RequestFailed) =
|
||||
requestsFailed.add(event.requestId)
|
||||
debug "RequestFailed", requestId = event.requestId
|
||||
|
||||
let subscription = await marketplace.subscribe(RequestFailed, onRequestFailed)
|
||||
subscription
|
||||
|
||||
proc trackRequestCancelled(marketplace: Marketplace, requestId: RequestId):
|
||||
Future[provider.Subscription] {.async.} =
|
||||
requestCancelled = false
|
||||
proc onRequestCancelled(event: RequestCancelled) =
|
||||
if requestId == event.requestId:
|
||||
requestCancelled = true
|
||||
debug "RequestCancelled", requestId = event.requestId
|
||||
|
||||
let subscription = await marketplace.subscribe(RequestCancelled, onRequestCancelled)
|
||||
subscription
|
||||
|
||||
proc trackSlotsFreed(marketplace: Marketplace, requestId: RequestId):
|
||||
Future[provider.Subscription] {.async.} =
|
||||
slotsFreed = newSeq[SlotId]()
|
||||
proc onSlotFreed(event: SlotFreed) =
|
||||
if event.requestId == requestId:
|
||||
let slotId = slotId(event.requestId, event.slotIndex)
|
||||
slotsFreed.add(slotId)
|
||||
debug "SlotFreed", requestId = event.requestId, slotIndex = event.slotIndex,
|
||||
slotId = slotId, slotsFreed = slotsFreed.len
|
||||
|
||||
let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
|
||||
subscription
|
||||
|
||||
proc startTrackingEvents(marketplace: Marketplace, requestId: RequestId) {.async.} =
|
||||
slotFilledSubscription = await marketplace.trackSlotsFilled()
|
||||
requestFailedSubscription = await marketplace.trackRequestsFailed()
|
||||
slotFreedSubscription = await marketplace.trackSlotsFreed(requestId)
|
||||
requestCancelledSubscription =
|
||||
await marketplace.trackRequestCancelled(requestId)
|
||||
proc startTrackingEvents(marketplace: Marketplace) {.async.} =
|
||||
eventSubscriptions.add(
|
||||
await marketplace.subscribe(SlotFilled, onMarketplaceEvent[SlotFilled])
|
||||
)
|
||||
eventSubscriptions.add(
|
||||
await marketplace.subscribe(RequestFailed, onMarketplaceEvent[RequestFailed])
|
||||
)
|
||||
eventSubscriptions.add(
|
||||
await marketplace.subscribe(SlotFreed, onMarketplaceEvent[SlotFreed])
|
||||
)
|
||||
eventSubscriptions.add(
|
||||
await marketplace.subscribe(RequestCancelled, onMarketplaceEvent[RequestCancelled])
|
||||
)
|
||||
|
||||
proc stopTrackingEvents() {.async.} =
|
||||
await slotFilledSubscription.unsubscribe()
|
||||
await slotFreedSubscription.unsubscribe()
|
||||
await requestFailedSubscription.unsubscribe()
|
||||
await requestCancelledSubscription.unsubscribe()
|
||||
for event in eventSubscriptions:
|
||||
await event.unsubscribe()
|
||||
|
||||
proc checkSlotsFailed(marketplace: Marketplace, slotsFilled: seq[SlotId],
|
||||
slotsFreed: seq[SlotId]) {.async.} =
|
||||
proc checkSlotsFreed(requestId: RequestId, expectedSlotsFreed: int): bool =
|
||||
events[$SlotFreed].filter(
|
||||
e => (ref SlotFreed)(e).requestId == requestId)
|
||||
.len == expectedSlotsFreed and
|
||||
events[$RequestFailed].map(
|
||||
e => (ref RequestFailed)(e).requestId).contains(requestId)
|
||||
|
||||
proc isRequestCancelled(requestId: RequestId): bool =
|
||||
events[$RequestCancelled].map(e => (ref RequestCancelled)(e).requestId)
|
||||
.contains(requestId)
|
||||
|
||||
proc getSlots[T: MarketplaceEvent](requestId: RequestId): seq[SlotId] =
|
||||
events[$T].filter(
|
||||
e => (ref T)(e).requestId == requestId).map(
|
||||
e => slotId((ref T)(e).requestId, (ref T)(e).slotIndex))
|
||||
|
||||
proc checkSlotsFailed(marketplace: Marketplace, requestId: RequestId) {.async.} =
|
||||
let slotsFreed = getSlots[SlotFreed](requestId)
|
||||
let slotsFilled = getSlots[SlotFilled](requestId)
|
||||
let slotsNotFreed = slotsFilled.filter(
|
||||
slotId => not slotsFreed.contains(slotId)
|
||||
).toHashSet
|
||||
|
@ -167,6 +156,8 @@ marketplacesuite "Validation":
|
|||
# testproofs.nim - we may want to address it or remove the comment.
|
||||
createAvailabilities(data.len * 2, duration)
|
||||
|
||||
await marketplace.startTrackingEvents()
|
||||
|
||||
let cid = client0.upload(data).get
|
||||
let purchaseId = await client0.requestStorage(
|
||||
cid,
|
||||
|
@ -178,8 +169,6 @@ marketplacesuite "Validation":
|
|||
)
|
||||
let requestId = client0.requestId(purchaseId).get
|
||||
|
||||
await marketplace.startTrackingEvents(requestId)
|
||||
|
||||
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
|
||||
|
||||
echo fmt"expiry = {(expiry + 60).int.seconds}"
|
||||
|
@ -207,13 +196,12 @@ marketplacesuite "Validation":
|
|||
# remaining nodes are be freed but marked as "Failed" as the whole
|
||||
# request fails. A couple of checks to capture this:
|
||||
let expectedSlotsFreed = tolerance + 1
|
||||
check eventuallyS((slotsFreed.len == expectedSlotsFreed and
|
||||
requestsFailed.contains(requestId)),
|
||||
check eventuallyS(checkSlotsFreed(requestId, expectedSlotsFreed),
|
||||
timeout = secondsTillRequestEnd + 60, step = 5,
|
||||
cancelWhenExpression = requestCancelled)
|
||||
cancelExpression = isRequestCancelled(requestId))
|
||||
|
||||
# extra check
|
||||
await marketplace.checkSlotsFailed(slotsFilled, slotsFreed)
|
||||
await marketplace.checkSlotsFailed(requestId)
|
||||
|
||||
await stopTrackingEvents()
|
||||
|
||||
|
@ -253,6 +241,8 @@ marketplacesuite "Validation":
|
|||
# testproofs.nim - we may want to address it or remove the comment.
|
||||
createAvailabilities(data.len * 2, duration)
|
||||
|
||||
await marketplace.startTrackingEvents()
|
||||
|
||||
let cid = client0.upload(data).get
|
||||
let purchaseId = await client0.requestStorage(
|
||||
cid,
|
||||
|
@ -264,8 +254,6 @@ marketplacesuite "Validation":
|
|||
)
|
||||
let requestId = client0.requestId(purchaseId).get
|
||||
|
||||
await marketplace.startTrackingEvents(requestId)
|
||||
|
||||
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
|
||||
|
||||
echo fmt"expiry = {(expiry + 60).int.seconds}"
|
||||
|
@ -314,12 +302,11 @@ marketplacesuite "Validation":
|
|||
# request fails. A couple of checks to capture this:
|
||||
let expectedSlotsFreed = tolerance + 1
|
||||
|
||||
check eventuallyS((slotsFreed.len == expectedSlotsFreed and
|
||||
requestsFailed.contains(requestId)),
|
||||
check eventuallyS(checkSlotsFreed(requestId, expectedSlotsFreed),
|
||||
timeout = secondsTillRequestEnd + 60, step = 5,
|
||||
cancelWhenExpression = requestCancelled)
|
||||
cancelExpression = isRequestCancelled(requestId))
|
||||
|
||||
# extra check
|
||||
await marketplace.checkSlotsFailed(slotsFilled, slotsFreed)
|
||||
await marketplace.checkSlotsFailed(requestId)
|
||||
|
||||
await stopTrackingEvents()
|
||||
|
|
Loading…
Reference in New Issue