Fabiana Cecin 549834203d
Bump to nim-libp2p 2.0.0
* bump libp2p pin to release/v2.0.0 (c43199378)
* pin nimble.lock: lsquic/websock/boringssl/protobuf_serialization/npeg/jwt
* add libp2p_mix dep and point libp2p/protocols/mix -> libp2p_mix
* migrate rng to libp2p Rng type (prod, channels, noise, tests)
* noise: take Rng, extract bearSslDrbg internally
* waku_switch: TransportConfig factory; withMaxInOut; local MaxConnections
* waku_relay/rendezvous/discv5/kademlia: v2.0.0 API (rng, config, ServiceDiscovery)
* tests: newStandardSwitch shim; PeerId.random(rng); common.rng()/crypto.newRng()
* drop libp2p/utils/semaphore (use chronos AsyncSemaphore)
* add waku/compat/option_valueor shim where needed
* add std/options where transitive re-export dropped
2026-06-02 15:42:58 -03:00

76 lines
2.2 KiB
Nim

import waku/compat/option_valueor
import std/[options, times], chronos
import brokers/broker_context
import waku/waku_core, waku/api/types, waku/requests/node_requests
type DeliveryState* {.pure.} = enum
Entry
SuccessfullyPropagated
# message is known to be sent to the network but not yet validated
SuccessfullyValidated
# message is known to be stored at least on one store node, thus validated
FallbackRetry # retry sending with fallback processor if available
NextRoundRetry # try sending in next loop
FailedToDeliver # final state of failed delivery
type DeliveryTask* = ref object
requestId*: RequestId
pubsubTopic*: PubsubTopic
msg*: WakuMessage
msgHash*: WakuMessageHash
tryCount*: int
state*: DeliveryState
deliveryTime*: Moment
propagateEventEmitted*: bool
errorDesc*: string
proc new*(
T: typedesc[DeliveryTask],
requestId: RequestId,
envelop: MessageEnvelope,
brokerCtx: BrokerContext,
): Result[T, string] =
let msg = envelop.toWakuMessage()
# TODO: use sync request for such as soon as available
let relayShardRes = (
RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic)
).valueOr:
error "RequestRelayShard.request failed", error = error
return err("Failed create DeliveryTask: " & $error)
let pubsubTopic = relayShardRes.relayShard.toPubsubTopic()
let msgHash = computeMessageHash(pubsubTopic, msg)
return ok(
T(
requestId: requestId,
pubsubTopic: pubsubTopic,
msg: msg,
msgHash: msgHash,
tryCount: 0,
state: DeliveryState.Entry,
)
)
func `==`*(r, l: DeliveryTask): bool =
if r.isNil() == l.isNil():
r.isNil() or r.msgHash == l.msgHash
else:
false
proc messageAge*(self: DeliveryTask): timer.Duration =
let actual = getNanosecondTime(getTime().toUnixFloat())
if self.msg.timestamp >= 0 and self.msg.timestamp < actual:
nanoseconds(actual - self.msg.timestamp)
else:
ZeroDuration
proc deliveryAge*(self: DeliveryTask): timer.Duration =
if self.state == DeliveryState.SuccessfullyPropagated:
timer.Moment.now() - self.deliveryTime
else:
ZeroDuration
proc isEphemeral*(self: DeliveryTask): bool =
return self.msg.ephemeral