mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-05 23:13:11 +00:00
Adds metric for total active price per byte per second
This commit is contained in:
parent
21ca5521e5
commit
a7b3323e12
@ -25,9 +25,12 @@ type
|
|||||||
numPending: int
|
numPending: int
|
||||||
numSlots: int
|
numSlots: int
|
||||||
totalSize: int64
|
totalSize: int64
|
||||||
|
totalPrice: uint64
|
||||||
|
|
||||||
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
||||||
var update = Update(numRequests: 0, numPending: 0, numSlots: 0, totalSize: 0)
|
var update = Update(
|
||||||
|
numRequests: 0, numPending: 0, numSlots: 0, totalSize: 0, totalPrice: 0.uint64
|
||||||
|
)
|
||||||
|
|
||||||
proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} =
|
proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} =
|
||||||
let response = await c.marketplace.getRequestInfo(entry.id)
|
let response = await c.marketplace.getRequestInfo(entry.id)
|
||||||
@ -40,6 +43,7 @@ proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
|||||||
inc update.numRequests
|
inc update.numRequests
|
||||||
update.numSlots += info.slots.int
|
update.numSlots += info.slots.int
|
||||||
update.totalSize += (info.slots * info.slotSize).int64
|
update.totalSize += (info.slots * info.slotSize).int64
|
||||||
|
update.totalPrice += info.pricePerBytePerSecond
|
||||||
else:
|
else:
|
||||||
?await c.store.remove(entry.id)
|
?await c.store.remove(entry.id)
|
||||||
return success()
|
return success()
|
||||||
@ -52,6 +56,7 @@ proc updateMetrics(c: ChainMetrics, update: Update) =
|
|||||||
c.metrics.setPendingRequests(update.numPending)
|
c.metrics.setPendingRequests(update.numPending)
|
||||||
c.metrics.setRequestSlots(update.numSlots)
|
c.metrics.setRequestSlots(update.numSlots)
|
||||||
c.metrics.setTotalSize(update.totalSize)
|
c.metrics.setTotalSize(update.totalSize)
|
||||||
|
c.metrics.setPrice(update.totalPrice.int64)
|
||||||
|
|
||||||
proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
|
proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
|
||||||
without update =? (await c.collectUpdate()), err:
|
without update =? (await c.collectUpdate()), err:
|
||||||
|
|||||||
@ -23,6 +23,7 @@ type
|
|||||||
pending*: bool
|
pending*: bool
|
||||||
slots*: uint64
|
slots*: uint64
|
||||||
slotSize*: uint64
|
slotSize*: uint64
|
||||||
|
pricePerBytePerSecond*: uint64
|
||||||
|
|
||||||
proc notStarted() =
|
proc notStarted() =
|
||||||
raiseAssert("MarketplaceService was called before it was started.")
|
raiseAssert("MarketplaceService was called before it was started.")
|
||||||
@ -33,8 +34,15 @@ proc fetchRequestInfo(
|
|||||||
try:
|
try:
|
||||||
let request = await market.getRequest(rid)
|
let request = await market.getRequest(rid)
|
||||||
if r =? request:
|
if r =? request:
|
||||||
return
|
let price = r.ask.pricePerBytePerSecond.truncate(uint64)
|
||||||
some(RequestInfo(pending: false, slots: r.ask.slots, slotSize: r.ask.slotSize))
|
return some(
|
||||||
|
RequestInfo(
|
||||||
|
pending: false,
|
||||||
|
slots: r.ask.slots,
|
||||||
|
slotSize: r.ask.slotSize,
|
||||||
|
pricePerBytePerSecond: price,
|
||||||
|
)
|
||||||
|
)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Failed to get request info", err = exc.msg
|
trace "Failed to get request info", err = exc.msg
|
||||||
return none(RequestInfo)
|
return none(RequestInfo)
|
||||||
|
|||||||
@ -12,6 +12,10 @@ declareGauge(requestSlotsGauge, "Marketplace active storage request slots")
|
|||||||
declareGauge(
|
declareGauge(
|
||||||
totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests"
|
totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests"
|
||||||
)
|
)
|
||||||
|
declareGauge(
|
||||||
|
totalPriceGauge,
|
||||||
|
"Marketplace total price per byte per second of all active storage requests",
|
||||||
|
)
|
||||||
|
|
||||||
type
|
type
|
||||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
||||||
@ -25,6 +29,7 @@ type
|
|||||||
onPending: OnUpdateMetric
|
onPending: OnUpdateMetric
|
||||||
onRequestSlots: OnUpdateMetric
|
onRequestSlots: OnUpdateMetric
|
||||||
onTotalSize: OnUpdateMetric
|
onTotalSize: OnUpdateMetric
|
||||||
|
onPrice: OnUpdateMetric
|
||||||
|
|
||||||
proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
|
proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
|
||||||
let metricsAddress = metricsAddress
|
let metricsAddress = metricsAddress
|
||||||
@ -58,6 +63,9 @@ method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
|||||||
method setTotalSize*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} =
|
method setTotalSize*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} =
|
||||||
m.onTotalSize(value.int64)
|
m.onTotalSize(value.int64)
|
||||||
|
|
||||||
|
method setPrice*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} =
|
||||||
|
m.onPrice(value.int64)
|
||||||
|
|
||||||
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
||||||
startServer(metricsAddress, metricsPort)
|
startServer(metricsAddress, metricsPort)
|
||||||
|
|
||||||
@ -84,6 +92,9 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
|||||||
proc onTotalSize(value: int64) =
|
proc onTotalSize(value: int64) =
|
||||||
totalStorageSizeGauge.set(value)
|
totalStorageSizeGauge.set(value)
|
||||||
|
|
||||||
|
proc onPrice(value: int64) =
|
||||||
|
totalPriceGauge.set(value)
|
||||||
|
|
||||||
return Metrics(
|
return Metrics(
|
||||||
todoNodes: onTodo,
|
todoNodes: onTodo,
|
||||||
okNodes: onOk,
|
okNodes: onOk,
|
||||||
@ -92,4 +103,5 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
|||||||
onPending: onPending,
|
onPending: onPending,
|
||||||
onRequestSlots: onRequestSlots,
|
onRequestSlots: onRequestSlots,
|
||||||
onTotalSize: onTotalSize,
|
onTotalSize: onTotalSize,
|
||||||
|
onPrice: onPrice,
|
||||||
)
|
)
|
||||||
|
|||||||
@ -103,3 +103,15 @@ suite "ChainMetrics":
|
|||||||
|
|
||||||
check:
|
check:
|
||||||
metrics.totalSize == (info.slots * info.slotSize).int
|
metrics.totalSize == (info.slots * info.slotSize).int
|
||||||
|
|
||||||
|
test "onStep should count the total active price per byte per second":
|
||||||
|
let rid = genRid()
|
||||||
|
store.iterateEntries.add(RequestEntry(id: rid))
|
||||||
|
|
||||||
|
let info = RequestInfo(slots: 12, pricePerBytePerSecond: 456.uint64)
|
||||||
|
marketplace.requestInfoReturns = some(info)
|
||||||
|
|
||||||
|
await onStep()
|
||||||
|
|
||||||
|
check:
|
||||||
|
metrics.totalPrice == info.pricePerBytePerSecond.int64
|
||||||
|
|||||||
@ -8,6 +8,7 @@ type MockMetrics* = ref object of Metrics
|
|||||||
pending*: int
|
pending*: int
|
||||||
slots*: int
|
slots*: int
|
||||||
totalSize*: int64
|
totalSize*: int64
|
||||||
|
totalPrice*: int64
|
||||||
|
|
||||||
method setTodoNodes*(m: MockMetrics, value: int) =
|
method setTodoNodes*(m: MockMetrics, value: int) =
|
||||||
m.todo = value
|
m.todo = value
|
||||||
@ -30,5 +31,8 @@ method setRequestSlots*(m: MockMetrics, value: int) =
|
|||||||
method setTotalSize*(m: MockMetrics, value: int64) =
|
method setTotalSize*(m: MockMetrics, value: int64) =
|
||||||
m.totalSize = value
|
m.totalSize = value
|
||||||
|
|
||||||
|
method setPrice*(m: MockMetrics, value: int64) =
|
||||||
|
m.totalPrice = value
|
||||||
|
|
||||||
proc createMockMetrics*(): MockMetrics =
|
proc createMockMetrics*(): MockMetrics =
|
||||||
MockMetrics()
|
MockMetrics()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user