Adapt to latest libp2p pubslih design changes. publish returns an outcome as Result error.

This commit is contained in:
NagyZoltanPeter 2025-02-19 11:07:22 +01:00
parent 48fbb98ba4
commit 18c470b52c
No known key found for this signature in database
GPG Key ID: 16EADB9673B65368
7 changed files with 38 additions and 29 deletions

View File

@ -116,7 +116,7 @@ proc process*(
let publishRes = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if publishRes.isErr():
let errorMsg = "Message not sent."
error "PUBLISH failed", error = errorMsg, reason = publishRes.error.msg
error "PUBLISH failed", error = errorMsg, reason = $publishRes.error
return err(errorMsg)
let numPeers = publishRes.get()
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 12cbd26c7bdd7e910f155343ef625bb1ae313cc6
Subproject commit 313d0fc53b468e5ef68031de7f5126e9c24b11d7

View File

@ -174,7 +174,7 @@ proc processMessages(self: SendMonitor) {.async.} =
let ret = await self.wakuRelay.publish(pubsubTopic, msg)
if ret.isErr():
error "could not publish with wakuRelay.publish",
msgHash, pubsubTopic, reason = ret.error.msg
msgHash, pubsubTopic, reason = $ret.error
continue
if not self.wakuLegacyLightpushClient.isNil():

View File

@ -55,7 +55,7 @@ proc getRelayPushHandler*(
if publishedResult.isErr():
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers",
msg_hash = msgHash, reason = publishedResult.error.msg
msg_hash = msgHash, reason = $publishedResult.error
return mapPubishingErrorToPushResult(publishedResult.error)
return lightpushSuccessResult(publishedResult.get().uint32)

View File

@ -53,20 +53,30 @@ func lighpushErrorResult*(
return err((statusCode, some(desc)))
func mapPubishingErrorToPushResult*(
resultException: ref PublishingError
publishOutcome: PublishOutcome
): WakuLightPushResult =
if resultException of NoTopicSpecifiedError:
return err((LightpushStatusCode.INVALID_MESSAGE_ERROR, some(resultException.msg)))
elif resultException of PayloadIsEmptyError:
return err((LightpushStatusCode.INVALID_MESSAGE_ERROR, some(resultException.msg)))
elif resultException of DuplicateMessageError:
return err((LightpushStatusCode.INVALID_MESSAGE_ERROR, some(resultException.msg)))
elif resultException of NotSubscribedToTopicError:
return
err((LightpushStatusCode.UNSUPPORTED_PUBSUB_TOPIC, some(resultException.msg)))
elif resultException of NoPeersToPublishError:
return err((LightpushStatusCode.NO_PEERS_TO_RELAY, some(resultException.msg)))
elif resultException of GeneratingMessageIdError:
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, some(resultException.msg)))
case publishOutcome
of NoTopicSpecified:
return err(
(LightpushStatusCode.INVALID_MESSAGE_ERROR, some("Empty topic, skipping publish"))
)
of DuplicateMessage:
return err(
(LightpushStatusCode.INVALID_MESSAGE_ERROR, some("Dropping already-seen message"))
)
of NoPeersToPublish:
return err(
(
LightpushStatusCode.NO_PEERS_TO_RELAY,
some("No peers for topic, skipping publish"),
)
)
of CannotGenerateMessageId:
return err(
(
LightpushStatusCode.INTERNAL_SERVER_ERROR,
some("Error generating message id, skipping publish"),
)
)
else:
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, some(resultException.msg)))
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, none[string]()))

View File

@ -54,7 +54,7 @@ proc getRelayPushHandler*(
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers",
msg_hash = msgHash, reason = publishResult.error.msg
msg_hash = msgHash, reason = $publishResult.error
# for legacy lightpush we do not detail the reason towards clients. All error during publish result in not-published-to-any-peer
# this let client of the legacy protocol to react as they did so far.
return err(protocol_metrics.notPublishedAnyPeer)

View File

@ -513,22 +513,21 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[Result[int, ref PublishingError]] {.async.} =
): Future[Result[int, PublishOutcome]] {.async.} =
let data = message.encode().buffer
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
try:
let relayedPeerCount = await procCall GossipSub(w).publishEx(pubsubTopic, data)
let publishRes = await procCall GossipSub(w).doPublish(pubsubTopic, data)
if relayedPeerCount > 0:
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
publishRes.isOkOr:
return err(error)
return ok(relayedPeerCount)
except PublishingError as ex:
return err(ex)
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok(publishRes.get())
proc getNumConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic