mirror of https://github.com/waku-org/nwaku.git
287 lines
10 KiB
Nim
287 lines
10 KiB
Nim
## Waku Filter protocol for subscribing and filtering messages
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[options,sequtils,sets,strutils,tables],
|
|
stew/byteutils,
|
|
chronicles,
|
|
chronos,
|
|
libp2p/peerid,
|
|
libp2p/protocols/protocol
|
|
import
|
|
../node/peer_manager,
|
|
../waku_core,
|
|
./common,
|
|
./protocol_metrics,
|
|
./rpc_codec,
|
|
./rpc,
|
|
./subscriptions
|
|
|
|
logScope:
|
|
topics = "waku filter"
|
|
|
|
const
|
|
MaxContentTopicsPerRequest* = 100
|
|
|
|
type
|
|
WakuFilter* = ref object of LPProtocol
|
|
subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria
|
|
peerManager: PeerManager
|
|
maintenanceTask: TimerCallback
|
|
|
|
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
|
trace "pinging subscriber", peerId=peerId
|
|
|
|
if not wf.subscriptions.isSubscribed(peerId):
|
|
debug "pinging peer has no subscriptions", peerId=peerId
|
|
return err(FilterSubscribeError.notFound())
|
|
|
|
wf.subscriptions.refreshSubscription(peerId)
|
|
|
|
ok()
|
|
|
|
proc subscribe(wf: WakuFilter,
|
|
peerId: PeerID,
|
|
pubsubTopic: Option[PubsubTopic],
|
|
contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
|
|
|
# TODO: check if this condition is valid???
|
|
if pubsubTopic.isNone() or contentTopics.len == 0:
|
|
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
|
|
|
if contentTopics.len > MaxContentTopicsPerRequest:
|
|
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " &
|
|
$MaxContentTopicsPerRequest))
|
|
|
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
|
|
|
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
|
|
|
wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
|
|
return err(FilterSubscribeError.serviceUnavailable(error))
|
|
|
|
ok()
|
|
|
|
proc unsubscribe(wf: WakuFilter,
|
|
peerId: PeerID,
|
|
pubsubTopic: Option[PubsubTopic],
|
|
contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
|
if pubsubTopic.isNone() or contentTopics.len == 0:
|
|
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
|
|
|
if contentTopics.len > MaxContentTopicsPerRequest:
|
|
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
|
|
|
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
|
|
|
debug "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
|
|
|
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
|
|
return err(FilterSubscribeError.notFound())
|
|
|
|
ok()
|
|
|
|
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
|
if not wf.subscriptions.isSubscribed(peerId):
|
|
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
|
return err(FilterSubscribeError.notFound())
|
|
|
|
debug "removing peer subscription", peerId=peerId
|
|
wf.subscriptions.removePeer(peerId)
|
|
wf.subscriptions.cleanUp()
|
|
|
|
ok()
|
|
|
|
proc handleSubscribeRequest*(wf: WakuFilter,
|
|
peerId: PeerId,
|
|
request: FilterSubscribeRequest): FilterSubscribeResponse =
|
|
info "received filter subscribe request", peerId=peerId, request=request
|
|
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
|
|
|
var subscribeResult: FilterSubscribeResult
|
|
|
|
let requestStartTime = Moment.now()
|
|
|
|
block:
|
|
## Handle subscribe request
|
|
case request.filterSubscribeType
|
|
of FilterSubscribeType.SUBSCRIBER_PING:
|
|
subscribeResult = wf.pingSubscriber(peerId)
|
|
of FilterSubscribeType.SUBSCRIBE:
|
|
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
|
|
of FilterSubscribeType.UNSUBSCRIBE:
|
|
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
|
|
of FilterSubscribeType.UNSUBSCRIBE_ALL:
|
|
subscribeResult = wf.unsubscribeAll(peerId)
|
|
|
|
let
|
|
requestDuration = Moment.now() - requestStartTime
|
|
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
|
|
waku_filter_request_duration_seconds.observe(
|
|
requestDurationSec, labelValues = [$request.filterSubscribeType])
|
|
|
|
if subscribeResult.isErr():
|
|
return FilterSubscribeResponse(
|
|
requestId: request.requestId,
|
|
statusCode: subscribeResult.error.kind.uint32,
|
|
statusDesc: some($subscribeResult.error)
|
|
)
|
|
else:
|
|
return FilterSubscribeResponse.ok(request.requestId)
|
|
|
|
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
|
trace "pushing message to subscribed peer", peer=peer
|
|
|
|
if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
|
|
# Check that peer has not been removed from peer store
|
|
trace "no addresses for peer", peer=peer
|
|
return
|
|
|
|
## TODO: Check if dial is necessary always???
|
|
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
|
|
if conn.isNone():
|
|
## We do not remove this peer, but allow the underlying peer manager
|
|
## to do so if it is deemed necessary
|
|
trace "no connection to peer", peer=peer
|
|
return
|
|
|
|
await conn.get().writeLp(buffer)
|
|
|
|
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} =
|
|
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic,
|
|
contentTopic=messagePush.wakuMessage.contentTopic,
|
|
peers=peers,
|
|
hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
|
|
|
let bufferToPublish = messagePush.encode().buffer
|
|
|
|
var pushFuts: seq[Future[void]]
|
|
for peerId in peers:
|
|
let pushFut = wf.pushToPeer(peerId, bufferToPublish)
|
|
pushFuts.add(pushFut)
|
|
|
|
await allFutures(pushFuts)
|
|
|
|
proc maintainSubscriptions*(wf: WakuFilter) =
|
|
trace "maintaining subscriptions"
|
|
|
|
## Remove subscriptions for peers that have been removed from peer store
|
|
var peersToRemove: seq[PeerId]
|
|
for peerId in wf.subscriptions.peersSubscribed.keys:
|
|
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
|
|
debug "peer has been removed from peer store, removing subscription", peerId=peerId
|
|
peersToRemove.add(peerId)
|
|
|
|
if peersToRemove.len > 0:
|
|
wf.subscriptions.removePeers(peersToRemove)
|
|
|
|
wf.subscriptions.cleanUp()
|
|
|
|
## Periodic report of number of subscriptions
|
|
waku_filter_subscriptions.set(wf.subscriptions.peersSubscribed.len.float64)
|
|
|
|
const MessagePushTimeout = 20.seconds
|
|
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
|
|
trace "handling message", pubsubTopic=pubsubTopic, message=message
|
|
|
|
let handleMessageStartTime = Moment.now()
|
|
|
|
block:
|
|
## Find subscribers and push message to them
|
|
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
|
if subscribedPeers.len == 0:
|
|
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
|
|
return
|
|
|
|
let messagePush = MessagePush(
|
|
pubsubTopic: pubsubTopic,
|
|
wakuMessage: message)
|
|
|
|
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
|
|
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic,
|
|
contentTopic=message.contentTopic,
|
|
hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
|
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
|
else:
|
|
debug "pushed message succesfully to all subscribers",
|
|
pubsubTopic=pubsubTopic,
|
|
contentTopic=message.contentTopic,
|
|
hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
|
|
|
let
|
|
handleMessageDuration = Moment.now() - handleMessageStartTime
|
|
handleMessageDurationSec = handleMessageDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
|
|
waku_filter_handle_message_duration_seconds.observe(handleMessageDurationSec)
|
|
|
|
proc initProtocolHandler(wf: WakuFilter) =
|
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
|
trace "filter subscribe request handler triggered", peerId=conn.peerId
|
|
|
|
let buf = await conn.readLp(int(MaxSubscribeSize))
|
|
|
|
let decodeRes = FilterSubscribeRequest.decode(buf)
|
|
if decodeRes.isErr():
|
|
error "Failed to decode filter subscribe request", peerId=conn.peerId, err=decodeRes.error
|
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
|
return
|
|
|
|
let request = decodeRes.value #TODO: toAPI() split here
|
|
|
|
let response = wf.handleSubscribeRequest(conn.peerId, request)
|
|
|
|
debug "sending filter subscribe response", peerId=conn.peerId, response=response
|
|
|
|
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
|
|
return
|
|
|
|
wf.handler = handler
|
|
wf.codec = WakuFilterSubscribeCodec
|
|
|
|
proc new*(T: type WakuFilter,
|
|
peerManager: PeerManager,
|
|
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
|
maxFilterPeers: uint32 = MaxFilterPeers,
|
|
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): T =
|
|
|
|
let wf = WakuFilter(
|
|
subscriptions: FilterSubscriptions.init(subscriptionTimeout,
|
|
maxFilterPeers,
|
|
maxFilterCriteriaPerPeer
|
|
),
|
|
peerManager: peerManager
|
|
)
|
|
|
|
wf.initProtocolHandler()
|
|
return wf
|
|
|
|
const MaintainSubscriptionsInterval* = 1.minutes
|
|
|
|
proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) =
|
|
trace "starting to maintain subscriptions"
|
|
var maintainSubs: CallbackFunc
|
|
maintainSubs = CallbackFunc(
|
|
proc(udata: pointer) {.gcsafe.} =
|
|
maintainSubscriptions(wf)
|
|
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
|
|
)
|
|
|
|
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
|
|
|
|
method start*(wf: WakuFilter) {.async.} =
|
|
debug "starting filter protocol"
|
|
wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval)
|
|
|
|
await procCall LPProtocol(wf).start()
|
|
|
|
method stop*(wf: WakuFilter) {.async.} =
|
|
debug "stopping filter protocol"
|
|
if not wf.maintenanceTask.isNil():
|
|
wf.maintenanceTask.clearTimer()
|
|
|
|
await procCall LPProtocol(wf).stop()
|