mirror of https://github.com/waku-org/nwaku.git
142 lines
5.0 KiB
Nim
142 lines
5.0 KiB
Nim
## Waku Filter client for subscribing and receiving filtered messages
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/options,
|
|
chronicles,
|
|
chronos,
|
|
libp2p/protocols/protocol,
|
|
bearssl/rand
|
|
import
|
|
../node/peer_manager,
|
|
../waku_message,
|
|
./common,
|
|
./protocol_metrics,
|
|
./rpc_codec,
|
|
./rpc
|
|
|
|
logScope:
|
|
topics = "waku filter client"
|
|
|
|
type
|
|
MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.}
|
|
WakuFilterClient* = ref object of LPProtocol
|
|
rng: ref HmacDrbgContext
|
|
messagePushHandler: MessagePushHandler
|
|
peerManager: PeerManager
|
|
|
|
func generateRequestId(rng: ref HmacDrbgContext): string =
|
|
var bytes: array[10, byte]
|
|
hmacDrbgGenerate(rng[], bytes)
|
|
return toHex(bytes)
|
|
|
|
proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} =
|
|
trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest
|
|
|
|
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
|
|
if connOpt.isNone():
|
|
trace "Failed to dial filter service peer", servicePeer
|
|
waku_filter_errors.inc(labelValues = [dialFailure])
|
|
return err(FilterSubscribeError.peerDialFailure($servicePeer))
|
|
|
|
let connection = connOpt.get()
|
|
|
|
# TODO: this can raise an exception
|
|
await connection.writeLP(filterSubscribeRequest.encode().buffer)
|
|
|
|
let respBuf = await connection.readLp(MaxSubscribeResponseSize)
|
|
let respDecodeRes = FilterSubscribeResponse.decode(respBuf)
|
|
if respDecodeRes.isErr():
|
|
trace "Failed to decode filter subscribe response", servicePeer
|
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
|
return err(FilterSubscribeError.badResponse(decodeRpcFailure))
|
|
|
|
let response = respDecodeRes.get()
|
|
|
|
if response.requestId != filterSubscribeRequest.requestId:
|
|
trace "Filter subscribe response requestId mismatch", servicePeer, response
|
|
waku_filter_errors.inc(labelValues = [requestIdMismatch])
|
|
return err(FilterSubscribeError.badResponse(requestIdMismatch))
|
|
|
|
if response.statusCode != 200:
|
|
trace "Filter subscribe error response", servicePeer, response
|
|
waku_filter_errors.inc(labelValues = [errorResponse])
|
|
let cause = if response.statusDesc.isSome(): response.statusDesc.get()
|
|
else: "filter subscribe error"
|
|
return err(FilterSubscribeError.parse(response.statusCode, cause=cause))
|
|
|
|
return ok()
|
|
|
|
proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} =
|
|
let requestId = generateRequestId(wfc.rng)
|
|
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)
|
|
|
|
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
|
|
|
|
proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
|
|
let requestId = generateRequestId(wfc.rng)
|
|
let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
|
|
requestId = requestId,
|
|
pubsubTopic = pubsubTopic,
|
|
contentTopics = contentTopics
|
|
)
|
|
|
|
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
|
|
|
|
proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
|
|
let requestId = generateRequestId(wfc.rng)
|
|
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
|
|
requestId = requestId,
|
|
pubsubTopic = pubsubTopic,
|
|
contentTopics = contentTopics
|
|
)
|
|
|
|
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
|
|
|
|
proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} =
|
|
let requestId = generateRequestId(wfc.rng)
|
|
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll(
|
|
requestId = requestId
|
|
)
|
|
|
|
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
|
|
|
|
proc initProtocolHandler(wfc: WakuFilterClient) =
|
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
|
let buf = await conn.readLp(MaxPushSize)
|
|
|
|
let decodeRes = MessagePush.decode(buf)
|
|
if decodeRes.isErr():
|
|
error "Failed to decode message push", peerId=conn.peerId
|
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
|
return
|
|
|
|
let messagePush = decodeRes.value #TODO: toAPI() split here
|
|
trace "Received message push", peerId=conn.peerId, messagePush
|
|
|
|
wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage)
|
|
|
|
# Protocol specifies no response for now
|
|
return
|
|
|
|
wfc.handler = handler
|
|
wfc.codec = WakuFilterPushCodec
|
|
|
|
proc new*(T: type WakuFilterClient,
|
|
rng: ref HmacDrbgContext,
|
|
messagePushHandler: MessagePushHandler,
|
|
peerManager: PeerManager): T =
|
|
|
|
let wfc = WakuFilterClient(
|
|
rng: rng,
|
|
messagePushHandler: messagePushHandler,
|
|
peerManager: peerManager
|
|
)
|
|
wfc.initProtocolHandler()
|
|
wfc
|