mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-28 02:33:10 +00:00
Fix multi propagate event emit, fix fail send test case
This commit is contained in:
parent
5af834b639
commit
fc8e9f6726
@ -116,9 +116,9 @@ proc validate(
|
||||
for requestId in manager.errorRequestIds:
|
||||
check requestId == expectedRequestId
|
||||
|
||||
proc createApiNodeConf(): NodeConfig =
|
||||
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
|
||||
result = NodeConfig.init(
|
||||
mode = WakuMode.Core,
|
||||
mode = mode,
|
||||
protocolsConfig = ProtocolsConfig.init(
|
||||
entryNodes = @[],
|
||||
clusterId = 1,
|
||||
@ -403,7 +403,7 @@ suite "Waku API - Send":
|
||||
|
||||
var node: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
node = (await createNode(createApiNodeConf(WakuMode.Edge))).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
@ -421,8 +421,9 @@ suite "Waku API - Send":
|
||||
let requestId = (await node.send(envelope)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
echo "Sent message with requestId=", requestId
|
||||
# Wait for events with timeout
|
||||
const eventTimeout = 10.seconds
|
||||
const eventTimeout = 62.seconds
|
||||
discard await eventManager.waitForEvents(eventTimeout)
|
||||
|
||||
eventManager.validate({SendEventOutcome.Error}, requestId)
|
||||
|
||||
@ -18,6 +18,7 @@ type DeliveryTask* = ref object
|
||||
tryCount*: int
|
||||
state*: DeliveryState
|
||||
deliveryTime*: Moment
|
||||
propagateEventEmitted*: bool
|
||||
errorDesc*: string
|
||||
|
||||
proc create*(
|
||||
|
||||
@ -66,8 +66,8 @@ method sendImpl*(
|
||||
return
|
||||
|
||||
if pushResult.isOk and pushResult.get() > 0:
|
||||
info "Message propagated via Relay",
|
||||
requestId = task.requestId, msgHash = task.msgHash
|
||||
info "Message propagated via Lightpush",
|
||||
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
|
||||
task.state = DeliveryState.SuccessfullyPropagated
|
||||
task.deliveryTime = Moment.now()
|
||||
# TODO: with a simple retry processor it might be more accurate to say `Sent`
|
||||
|
||||
@ -60,7 +60,7 @@ method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.a
|
||||
let noOfPublishedPeers = (await self.publishProc(task.pubsubTopic, task.msg)).valueOr:
|
||||
let errorMessage = error.desc.get($error.code)
|
||||
error "Failed to publish message with relay",
|
||||
request = task.requestId, msgHash = task.msgHash, error = errorMessage
|
||||
request = task.requestId, msgHash = task.msgHash.to0xHex(), error = errorMessage
|
||||
if error.code != LightPushErrorCode.NO_PEERS_TO_RELAY:
|
||||
task.state = DeliveryState.FailedToDeliver
|
||||
task.errorDesc = errorMessage
|
||||
|
||||
@ -173,9 +173,13 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
|
||||
case task.state
|
||||
of DeliveryState.SuccessfullyPropagated:
|
||||
# TODO: in case of of unable to strore check messages shall we report success instead?
|
||||
info "Message successfully propagated",
|
||||
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
|
||||
MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex())
|
||||
if not task.propagateEventEmitted:
|
||||
info "Message successfully propagated",
|
||||
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
|
||||
MessagePropagatedEvent.emit(
|
||||
self.brokerCtx, task.requestId, task.msgHash.to0xHex()
|
||||
)
|
||||
task.propagateEventEmitted = true
|
||||
return
|
||||
of DeliveryState.SuccessfullyValidated:
|
||||
info "Message successfully sent",
|
||||
@ -249,7 +253,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
|
||||
assert(not task.isNil(), "task for send must not be nil")
|
||||
|
||||
info "SendService.send: processing delivery task",
|
||||
requestId = task.requestId, msgHash = task.msgHash
|
||||
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
|
||||
|
||||
self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr:
|
||||
error "SendService.send: failed to subscribe to content topic",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user