adapt event types and values to messaging-api spec

This commit is contained in:
Ivan FB 2026-04-30 00:55:50 +02:00
parent 300f584efc
commit bc902812fa
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
5 changed files with 37 additions and 37 deletions

View File

@ -16,26 +16,26 @@ proc periodicSender(w: Waku): Future[void] {.async.} =
echo "Failed to listen to message sent event: ", error
return
let errorListener = MessageErrorEvent.listen(
proc(event: MessageErrorEvent) {.async: (raises: []).} =
let errorListener = MessageSendErrorEvent.listen(
proc(event: MessageSendErrorEvent) {.async: (raises: []).} =
echo "Message failed to send with request ID: ",
event.requestId, " error: ", event.error
).valueOr:
echo "Failed to listen to message error event: ", error
echo "Failed to listen to message send error event: ", error
return
let propagatedListener = MessagePropagatedEvent.listen(
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
let propagatedListener = MessageSendPropagatedEvent.listen(
proc(event: MessageSendPropagatedEvent) {.async: (raises: []).} =
echo "Message propagated with request ID: ",
event.requestId, " hash: ", event.messageHash
).valueOr:
echo "Failed to listen to message propagated event: ", error
echo "Failed to listen to message send propagated event: ", error
return
defer:
MessageSentEvent.dropListener(sentListener)
MessageErrorEvent.dropListener(errorListener)
MessagePropagatedEvent.dropListener(propagatedListener)
MessageSendErrorEvent.dropListener(errorListener)
MessageSendPropagatedEvent.dropListener(propagatedListener)
## Periodically sends a Waku message every 30 seconds
var counter = 0

View File

@ -131,34 +131,34 @@ proc logosdelivery_start_node(
ctx.myLib[].brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageSent"):
$newJsonEvent("message_sent", event),
$newJsonEvent("message:sent", event),
).valueOr:
chronicles.error "MessageSentEvent.listen failed", err = $error
return err("MessageSentEvent.listen failed: " & $error)
let errorListener = MessageErrorEvent.listen(
let errorListener = MessageSendErrorEvent.listen(
ctx.myLib[].brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageError"):
$newJsonEvent("message_error", event),
proc(event: MessageSendErrorEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageSendError"):
$newJsonEvent("message:send-error", event),
).valueOr:
chronicles.error "MessageErrorEvent.listen failed", err = $error
return err("MessageErrorEvent.listen failed: " & $error)
chronicles.error "MessageSendErrorEvent.listen failed", err = $error
return err("MessageSendErrorEvent.listen failed: " & $error)
let propagatedListener = MessagePropagatedEvent.listen(
let propagatedListener = MessageSendPropagatedEvent.listen(
ctx.myLib[].brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessagePropagated"):
$newJsonEvent("message_propagated", event),
proc(event: MessageSendPropagatedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageSendPropagated"):
$newJsonEvent("message:send-propagated", event),
).valueOr:
chronicles.error "MessagePropagatedEvent.listen failed", err = $error
return err("MessagePropagatedEvent.listen failed: " & $error)
chronicles.error "MessageSendPropagatedEvent.listen failed", err = $error
return err("MessageSendPropagatedEvent.listen failed: " & $error)
let receivedListener = MessageReceivedEvent.listen(
ctx.myLib[].brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageReceived"):
$newJsonEvent("message_received", event),
$newJsonEvent("message:received", event),
).valueOr:
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
@ -184,9 +184,9 @@ proc logosdelivery_stop_node(
requireInitializedNode(ctx, "STOP_NODE"):
return err(errMsg)
MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageSendErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageSendPropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx)

View File

@ -17,8 +17,8 @@ type SendEventOutcome {.pure.} = enum
type SendEventListenerManager = ref object
brokerCtx: BrokerContext
sentListener: MessageSentEventListener
errorListener: MessageErrorEventListener
propagatedListener: MessagePropagatedEventListener
errorListener: MessageSendErrorEventListener
propagatedListener: MessageSendPropagatedEventListener
sentFuture: Future[void]
errorFuture: Future[void]
propagatedFuture: Future[void]
@ -48,9 +48,9 @@ proc newSendEventListenerManager(brokerCtx: BrokerContext): SendEventListenerMan
).valueOr:
raiseAssert error
manager.errorListener = MessageErrorEvent.listen(
manager.errorListener = MessageSendErrorEvent.listen(
brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
proc(event: MessageSendErrorEvent) {.async: (raises: []).} =
inc manager.errorCount
manager.errorRequestIds.add(event.requestId)
echo "ERROR EVENT TRIGGERED (#", manager.errorCount, "): ", event.error
@ -62,9 +62,9 @@ proc newSendEventListenerManager(brokerCtx: BrokerContext): SendEventListenerMan
).valueOr:
raiseAssert error
manager.propagatedListener = MessagePropagatedEvent.listen(
manager.propagatedListener = MessageSendPropagatedEvent.listen(
brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
proc(event: MessageSendPropagatedEvent) {.async: (raises: []).} =
inc manager.propagatedCount
manager.propagatedRequestIds.add(event.requestId)
echo "PROPAGATED EVENT TRIGGERED (#",
@ -79,8 +79,8 @@ proc newSendEventListenerManager(brokerCtx: BrokerContext): SendEventListenerMan
proc teardown(manager: SendEventListenerManager) =
MessageSentEvent.dropListener(manager.brokerCtx, manager.sentListener)
MessageErrorEvent.dropListener(manager.brokerCtx, manager.errorListener)
MessagePropagatedEvent.dropListener(manager.brokerCtx, manager.propagatedListener)
MessageSendErrorEvent.dropListener(manager.brokerCtx, manager.errorListener)
MessageSendPropagatedEvent.dropListener(manager.brokerCtx, manager.propagatedListener)
proc waitForEvents(
manager: SendEventListenerManager, timeout: Duration

View File

@ -10,14 +10,14 @@ EventBroker:
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
type MessageSendErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
type MessageSendPropagatedEvent* = object
requestId*: RequestId
messageHash*: string

View File

@ -181,7 +181,7 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
if not task.propagateEventEmitted:
info "Message successfully propagated",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
MessagePropagatedEvent.emit(
MessageSendPropagatedEvent.emit(
self.brokerCtx, task.requestId, task.msgHash.to0xHex()
)
task.propagateEventEmitted = true
@ -196,7 +196,7 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
error = task.errorDesc
MessageErrorEvent.emit(
MessageSendErrorEvent.emit(
self.brokerCtx, task.requestId, task.msgHash.to0xHex(), task.errorDesc
)
return
@ -211,7 +211,7 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
error = "Message too old",
age = task.messageAge()
task.state = DeliveryState.FailedToDeliver
MessageErrorEvent.emit(
MessageSendErrorEvent.emit(
self.brokerCtx,
task.requestId,
task.msgHash.to0xHex(),