mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-02 13:33:08 +00:00
131 lines
4.0 KiB
Nim
131 lines
4.0 KiB
Nim
import pkg/ethers
|
|
import pkg/questionable
|
|
import pkg/upraises
|
|
import ./marketplace/market
|
|
import ./marketplace/marketplace
|
|
import ../config
|
|
import ../component
|
|
import ../state
|
|
import ../types
|
|
import ./clock
|
|
|
|
logScope:
|
|
topics = "marketplace"
|
|
|
|
type
|
|
MarketplaceService* = ref object of Component
|
|
state: State
|
|
market: ?OnChainMarket
|
|
clock: Clock
|
|
|
|
OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.}
|
|
RequestInfo* = ref object
|
|
pending*: bool
|
|
slots*: uint64
|
|
slotSize*: uint64
|
|
pricePerBytePerSecond*: uint64
|
|
|
|
proc notStarted() =
|
|
raiseAssert("MarketplaceService was called before it was started.")
|
|
|
|
proc fetchRequestInfo(
|
|
market: OnChainMarket, rid: Rid
|
|
): Future[?RequestInfo] {.async: (raises: []).} =
|
|
try:
|
|
let request = await market.getRequest(rid)
|
|
if r =? request:
|
|
let price = r.ask.pricePerBytePerSecond.truncate(uint64)
|
|
return some(
|
|
RequestInfo(
|
|
pending: false,
|
|
slots: r.ask.slots,
|
|
slotSize: r.ask.slotSize,
|
|
pricePerBytePerSecond: price,
|
|
)
|
|
)
|
|
except CatchableError as exc:
|
|
trace "Failed to get request info", err = exc.msg
|
|
return none(RequestInfo)
|
|
|
|
method subscribeToNewRequests*(
|
|
m: MarketplaceService, onNewRequest: OnNewRequest
|
|
): Future[?!void] {.async: (raises: []), base.} =
|
|
proc resultWrapper(rid: Rid): Future[void] {.async: (raises: [CancelledError]).} =
|
|
let response = await onNewRequest(rid)
|
|
if error =? response.errorOption:
|
|
raiseAssert("Error result in handling of onNewRequest callback: " & error.msg)
|
|
|
|
proc onRequest(
|
|
id: RequestId, ask: StorageAsk, expiry: uint64
|
|
) {.gcsafe, upraises: [].} =
|
|
asyncSpawn resultWrapper(Rid(id))
|
|
|
|
if market =? m.market:
|
|
try:
|
|
discard await market.subscribeRequests(onRequest)
|
|
return success()
|
|
except CatchableError as exc:
|
|
return failure(exc.msg)
|
|
else:
|
|
notStarted()
|
|
|
|
method iteratePastNewRequestEvents*(
|
|
m: MarketplaceService, onNewRequest: OnNewRequest
|
|
): Future[?!void] {.async: (raises: []), base.} =
|
|
let
|
|
oneDay = 60 * 60 * 24
|
|
timespan = oneDay * 30
|
|
startTime = m.clock.now() - timespan.uint64
|
|
|
|
if market =? m.market:
|
|
try:
|
|
let requests =
|
|
await market.queryPastStorageRequestedEventsFromTime(startTime.int64)
|
|
for request in requests:
|
|
if error =? (await onNewRequest(Rid(request.requestId))).errorOption:
|
|
return failure(error.msg)
|
|
return success()
|
|
except CatchableError as exc:
|
|
return failure(exc.msg)
|
|
else:
|
|
notStarted()
|
|
|
|
method getRequestInfo*(
|
|
m: MarketplaceService, rid: Rid
|
|
): Future[?RequestInfo] {.async: (raises: []), base.} =
|
|
# If the request id exists and is running, fetch the request object and return the info object.
|
|
# otherwise, return none.
|
|
if market =? m.market:
|
|
try:
|
|
let state = await market.requestState(rid)
|
|
if s =? state:
|
|
if s == RequestState.New:
|
|
return some(RequestInfo(pending: true))
|
|
if s == RequestState.Started:
|
|
return await market.fetchRequestInfo(rid)
|
|
except CatchableError as exc:
|
|
trace "Failed to get request state", err = exc.msg
|
|
return none(RequestInfo)
|
|
else:
|
|
notStarted()
|
|
|
|
method awake*(
|
|
m: MarketplaceService
|
|
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
|
try:
|
|
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
|
|
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
|
|
return failure("Invalid MarketplaceAddress provided")
|
|
|
|
let marketplace = Marketplace.new(marketplaceAddress, provider)
|
|
m.market = some(OnChainMarket.new(marketplace))
|
|
return success()
|
|
except JsonRpcProviderError as err:
|
|
return failure(err.msg)
|
|
|
|
proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService =
|
|
return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock)
|
|
|
|
proc createMarketplace*(state: State, clock: Clock): MarketplaceService =
|
|
return MarketplaceService.new(state, clock)
|