chore: Filter in libwaku (#3177)

This commit is contained in:
Ivan FB 2024-11-29 15:31:08 +01:00 committed by GitHub
parent 2ab9c3d363
commit f856298caa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 197 additions and 7 deletions

View File

@ -92,6 +92,22 @@ int waku_relay_unsubscribe(void* ctx,
WakuCallBack callback,
void* userData);
int waku_filter_subscribe(void* ctx,
const char* pubSubTopic,
const char* contentTopics,
WakuCallBack callback,
void* userData);
int waku_filter_unsubscribe(void* ctx,
const char* pubSubTopic,
const char* contentTopics,
WakuCallBack callback,
void* userData);
int waku_filter_unsubscribe_all(void* ctx,
WakuCallBack callback,
void* userData);
int waku_relay_get_num_connected_peers(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,

View File

@ -12,6 +12,7 @@ import
waku/waku_core/message/message,
waku/node/waku_node,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread,
@ -20,6 +21,7 @@ import
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/requests/protocols/lightpush_request,
./waku_thread/inter_thread_communication/requests/protocols/filter_request,
./waku_thread/inter_thread_communication/requests/debug_node_request,
./waku_thread/inter_thread_communication/requests/discovery_request,
./waku_thread/inter_thread_communication/requests/ping_request,
@ -72,7 +74,7 @@ proc handleRes[T: string | void](
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler =
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
return proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
): Future[system.void] {.async.} =
@ -300,7 +302,7 @@ proc waku_relay_publish(
RelayRequest.createShared(
RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback(ctx)),
WakuRelayHandler(onReceivedMessage(ctx)),
wakuMessage,
),
)
@ -343,7 +345,7 @@ proc waku_relay_subscribe(
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
var cb = relayEventCallback(ctx)
var cb = onReceivedMessage(ctx)
waku_thread
.sendRequestToWakuThread(
@ -374,7 +376,7 @@ proc waku_relay_unsubscribe(
RelayRequest.createShared(
RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback(ctx)),
WakuRelayHandler(onReceivedMessage(ctx)),
),
)
.handleRes(callback, userData)
@ -419,6 +421,56 @@ proc waku_relay_get_num_peers_in_mesh(
)
.handleRes(callback, userData)
proc waku_filter_subscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
contentTopics: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx,
RequestType.FILTER,
FilterRequest.createShared(
FilterMsgType.SUBSCRIBE,
pubSubTopic,
contentTopics,
FilterPushHandler(onReceivedMessage(ctx)),
),
)
.handleRes(callback, userData)
proc waku_filter_unsubscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
contentTopics: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE, pubSubTopic, contentTopics),
)
.handleRes(callback, userData)
proc waku_filter_unsubscribe_all(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx, RequestType.FILTER, FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL)
)
.handleRes(callback, userData)
proc waku_lightpush_publish(
ctx: ptr WakuContext,
pubSubTopic: cstring,

View File

@ -0,0 +1,105 @@
import options, std/[strutils, sequtils]
import chronicles, chronos, results
import
../../../../../waku/waku_filter_v2/client,
../../../../../waku/waku_core/message/message,
../../../../../waku/factory/waku,
../../../../../waku/waku_filter_v2/common,
../../../../../waku/waku_core/subscription/push_handler,
../../../../../waku/node/peer_manager/peer_manager,
../../../../../waku/node/waku_node,
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_core/topics/content_topic,
../../../../alloc
type FilterMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
UNSUBSCRIBE_ALL
type FilterRequest* = object
operation: FilterMsgType
pubsubTopic: cstring
contentTopics: cstring ## comma-separated list of content-topics
filterPushEventCallback: FilterPushHandler ## handles incoming filter pushed msgs
proc createShared*(
T: type FilterRequest,
op: FilterMsgType,
pubsubTopic: cstring = "",
contentTopics: cstring = "",
filterPushEventCallback: FilterPushHandler = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].contentTopics = contentTopics.alloc()
ret[].filterPushEventCallback = filterPushEventCallback
return ret
proc destroyShared(self: ptr FilterRequest) =
deallocShared(self[].pubsubTopic)
deallocShared(self[].contentTopics)
deallocShared(self)
proc process*(
self: ptr FilterRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
const FilterOpTimeout = 5.seconds
if waku.node.wakuFilterClient.isNil():
let errorMsg = "FilterRequest waku.node.wakuFilterClient is nil"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
case self.operation
of SUBSCRIBE:
waku.node.wakuFilterClient.registerPushHandler(self.filterPushEventCallback)
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when subscribing"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
let pubsubTopic = some(PubsubTopic($self[].pubsubTopic))
let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it))
let subFut = waku.node.filterSubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter subscription timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
of UNSUBSCRIBE:
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
let pubsubTopic = some(PubsubTopic($self[].pubsubTopic))
let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it))
let subFut = waku.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
of UNSUBSCRIBE_ALL:
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
let unsubFut = waku.node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription all timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
return ok("")

View File

@ -11,6 +11,7 @@ import
./requests/protocols/relay_request,
./requests/protocols/store_request,
./requests/protocols/lightpush_request,
./requests/protocols/filter_request,
./requests/debug_node_request,
./requests/discovery_request,
./requests/ping_request
@ -24,6 +25,7 @@ type RequestType* {.pure.} = enum
DEBUG
DISCOVERY
LIGHTPUSH
FILTER
type InterThreadRequest* = object
reqType: RequestType
@ -64,6 +66,8 @@ proc process*(
cast[ptr DiscoveryRequest](request[].reqContent).process(waku)
of LIGHTPUSH:
cast[ptr LightpushRequest](request[].reqContent).process(waku)
of FILTER:
cast[ptr FilterRequest](request[].reqContent).process(waku)
return await retFut

View File

@ -320,10 +320,10 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
mountLightPushClient(node)
if conf.lightpushnode != "":
let lightPushNode = parsePeerInfo(conf.lightpushnode)
if lightPushNode.isOk():
mountLightPushClient(node)
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
else:
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
@ -341,11 +341,11 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
await node.mountFilterClient()
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
try:
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
except CatchableError:
return err(

View File

@ -1,7 +1,9 @@
{.push raises: [].}
import std/options, results, chronicles, chronos, metrics, bearssl/rand
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import
../waku_core/peers,
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
@ -71,6 +73,15 @@ proc publish*(
message: WakuMessage,
peer: PeerId | RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
when peer is PeerId:
info "publish",
peerId = shortLog(peer),
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
else:
info "publish",
peerId = shortLog(peer.peerId),
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
@ -85,6 +96,8 @@ proc publishToAny*(
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")