mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: libwaku tweaks (#3233)
* make lightpush return msg hash after successful publish * libwaku avoid the use of string * library alloc.nim allocate memory when nil cstring is passed * libwaku store_request remove extra destroyShared(self)
This commit is contained in:
parent
f301c6d9db
commit
48859c4266
@ -4,6 +4,11 @@ type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
|
||||
proc alloc*(str: cstring): cstring =
|
||||
# Byte allocation from the given address.
|
||||
# There should be the corresponding manual deallocation with deallocShared !
|
||||
if str.isNil():
|
||||
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
|
||||
ret[0] = '\0' # Set the null terminator
|
||||
return ret
|
||||
|
||||
let ret = cast[cstring](allocShared(len(str) + 1))
|
||||
copyMem(ret, str, len(str) + 1)
|
||||
return ret
|
||||
|
||||
@ -313,16 +313,10 @@ proc waku_relay_publish(
|
||||
defer:
|
||||
deallocShared(pst)
|
||||
|
||||
let targetPubSubTopic =
|
||||
if len(pst) == 0:
|
||||
DefaultPubsubTopic
|
||||
else:
|
||||
$pst
|
||||
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.RELAY,
|
||||
RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), nil, wakuMessage),
|
||||
RelayRequest.createShared(RelayMsgType.PUBLISH, pst, nil, wakuMessage),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
@ -370,9 +364,7 @@ proc waku_relay_subscribe(
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.RELAY,
|
||||
RelayRequest.createShared(
|
||||
RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(cb)
|
||||
),
|
||||
RelayRequest.createShared(RelayMsgType.SUBSCRIBE, pst, WakuRelayHandler(cb)),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
@ -398,7 +390,7 @@ proc waku_relay_add_protected_shard(
|
||||
RelayMsgType.ADD_PROTECTED_SHARD,
|
||||
clusterId = clusterId,
|
||||
shardId = shardId,
|
||||
publicKey = $pubk,
|
||||
publicKey = pubk,
|
||||
),
|
||||
callback,
|
||||
userData,
|
||||
@ -421,9 +413,7 @@ proc waku_relay_unsubscribe(
|
||||
ctx,
|
||||
RequestType.RELAY,
|
||||
RelayRequest.createShared(
|
||||
RelayMsgType.UNSUBSCRIBE,
|
||||
PubsubTopic($pst),
|
||||
WakuRelayHandler(onReceivedMessage(ctx)),
|
||||
RelayMsgType.UNSUBSCRIBE, pst, WakuRelayHandler(onReceivedMessage(ctx))
|
||||
),
|
||||
callback,
|
||||
userData,
|
||||
@ -445,7 +435,7 @@ proc waku_relay_get_num_connected_peers(
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.RELAY,
|
||||
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)),
|
||||
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, pst),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
@ -466,7 +456,7 @@ proc waku_relay_get_num_peers_in_mesh(
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.RELAY,
|
||||
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)),
|
||||
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, pst),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
@ -557,18 +547,10 @@ proc waku_lightpush_publish(
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
let targetPubSubTopic =
|
||||
if len(pst) == 0:
|
||||
DefaultPubsubTopic
|
||||
else:
|
||||
$pst
|
||||
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.LIGHTPUSH,
|
||||
LightpushRequest.createShared(
|
||||
LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage
|
||||
),
|
||||
LightpushRequest.createShared(LightpushMsgType.PUBLISH, pst, wakuMessage),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
|
||||
@ -32,7 +32,7 @@ type LightpushRequest* = object
|
||||
proc createShared*(
|
||||
T: type LightpushRequest,
|
||||
op: LightpushMsgType,
|
||||
pubsubTopic: PubsubTopic,
|
||||
pubsubTopic: cstring,
|
||||
m = WakuMessage(),
|
||||
): ptr type T =
|
||||
var ret = createShared(T)
|
||||
@ -97,12 +97,12 @@ proc process*(
|
||||
error "PUBLISH failed", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
(
|
||||
let msgHashHex = (
|
||||
await waku.node.wakuLightpushClient.publish(
|
||||
pubsubTopic, msg, peer = peerOpt.get()
|
||||
)
|
||||
).isOkOr:
|
||||
).valueOr:
|
||||
error "PUBLISH failed", error = error
|
||||
return err("LightpushRequest error publishing: " & $error)
|
||||
|
||||
return ok("")
|
||||
return ok(msgHashHex)
|
||||
|
||||
@ -41,12 +41,12 @@ type RelayRequest* = object
|
||||
proc createShared*(
|
||||
T: type RelayRequest,
|
||||
op: RelayMsgType,
|
||||
pubsubTopic: PubsubTopic = "",
|
||||
pubsubTopic: cstring = nil,
|
||||
relayEventCallback: WakuRelayHandler = nil,
|
||||
m = WakuMessage(),
|
||||
clusterId: cint = 0,
|
||||
shardId: cint = 0,
|
||||
publicKey: string = "",
|
||||
publicKey: cstring = nil,
|
||||
): ptr type T =
|
||||
var ret = createShared(T)
|
||||
ret[].operation = op
|
||||
|
||||
@ -106,9 +106,6 @@ proc destroyShared(self: ptr StoreRequest) =
|
||||
proc process_remote_query(
|
||||
self: ptr StoreRequest, waku: ptr Waku
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
let jsonContentRes = catch:
|
||||
parseJson($self[].jsonQuery)
|
||||
|
||||
|
||||
@ -972,7 +972,7 @@ proc lightpushPublish*(
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
message: WakuMessage,
|
||||
peer: RemotePeerInfo,
|
||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
|
||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||
## Returns whether relaying was successful or not.
|
||||
## `WakuMessage` should contain a `contentTopic` field for light node
|
||||
@ -986,7 +986,7 @@ proc lightpushPublish*(
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
peer: RemotePeerInfo,
|
||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
|
||||
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
if not node.wakuLightpushClient.isNil():
|
||||
notice "publishing message with lightpush",
|
||||
@ -1023,7 +1023,7 @@ proc lightpushPublish*(
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc lightpushPublish*(
|
||||
node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.
|
||||
): Future[WakuLightPushResult[string]] {.
|
||||
async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead"
|
||||
.} =
|
||||
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
|
||||
@ -1040,13 +1040,7 @@ proc lightpushPublish*(
|
||||
elif not node.wakuLightPush.isNil():
|
||||
peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId))
|
||||
|
||||
let publishRes =
|
||||
await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())
|
||||
|
||||
if publishRes.isErr():
|
||||
error "failed to publish message", error = publishRes.error
|
||||
|
||||
return publishRes
|
||||
return await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())
|
||||
|
||||
## Waku RLN Relay
|
||||
proc mountRlnRelay*(
|
||||
|
||||
@ -71,24 +71,23 @@ proc publish*(
|
||||
wl: WakuLightPushClient,
|
||||
pubSubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
peer: PeerId | RemotePeerInfo,
|
||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
when peer is PeerId:
|
||||
info "publish",
|
||||
peerId = shortLog(peer),
|
||||
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||
else:
|
||||
info "publish",
|
||||
peerId = shortLog(peer.peerId),
|
||||
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||
|
||||
peer: RemotePeerInfo,
|
||||
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
|
||||
## On success, returns the msg_hash of the published message
|
||||
let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||
?await wl.sendPushRequest(pushRequest, peer)
|
||||
|
||||
for obs in wl.publishObservers:
|
||||
obs.onMessagePublished(pubSubTopic, message)
|
||||
|
||||
return ok()
|
||||
notice "publishing message with lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msg_hash_hex_str
|
||||
|
||||
return ok(msg_hash_hex_str)
|
||||
|
||||
proc publishToAny*(
|
||||
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
|
||||
|
||||
@ -9,7 +9,7 @@
|
||||
## which spawn a full service Waku node
|
||||
## that could be used also as a lightpush client, helping testing and development.
|
||||
|
||||
import results, chronos, std/options, metrics
|
||||
import results, chronos, chronicles, std/options, metrics, stew/byteutils
|
||||
import
|
||||
../waku_core,
|
||||
./protocol,
|
||||
@ -21,9 +21,10 @@ import
|
||||
|
||||
proc handleSelfLightPushRequest*(
|
||||
self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
): Future[WakuLightPushResult[string]] {.async.} =
|
||||
## Handles the lightpush requests made by the node to itself.
|
||||
## Normally used in REST-lightpush requests
|
||||
## On success, returns the msg_hash of the published message.
|
||||
|
||||
try:
|
||||
# provide self peerId as now this node is used directly, thus there is no light client sender peer.
|
||||
@ -45,6 +46,14 @@ proc handleSelfLightPushRequest*(
|
||||
else:
|
||||
return err("unknown failure")
|
||||
|
||||
return ok()
|
||||
let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex()
|
||||
|
||||
notice "publishing message with self hosted lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
self_peer_id = selfPeerId,
|
||||
msg_hash = msg_hash_hex_str
|
||||
|
||||
return ok(msg_hash_hex_str)
|
||||
except Exception:
|
||||
return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user