Fix redundant use of content topics (#528)

made resulting changes to waku_filter

Made changes to wakunode2, filter_api and waku_filter

Update waku v2 test scripts referencing Content Topics

Update ContentFilter in chat example

Remove unneccesary loops from filter api
closes #496

Apply keep-alive for chat2 (#525)

Makes the arguments of the store jsonrpc api optional (#526)

* makes pubsubTopic filter optional

* makes contentFilter optional

* append Option

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

Update Changelog with changes to ContentFilter

Fix indentation and code semantics

Enables perssist-message flag in the store protocol for wakunode2 (#519)

* enables perssistmessage flag

* disables in memory storage when persist-messages is false

* adds the persistMessages input to the mountStore

* defaults the store flag to true

* adds the missing argument

* persists messages in memory conditioned to the persistMessages flag

* adds persistmessages flag to the config_bridge

* defaults persistmessages to true

* defaults the store flag to true and persist-messages to false

* updates store.md

* updates chat2 instructions about --store flag

* removes --store flag from chat2 command execution

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

Fix: light-mode relay for all light protocols (#529)

* Fix: light-mode relay for all light protocols

* Clear up confusing use of overloaded concepts

Fix ContentFilter Schema in wakunode test script

Enables perssist-message flag in the store protocol for wakunode2 (#519)

* enables perssistmessage flag

* disables in memory storage when persist-messages is false

* adds the persistMessages input to the mountStore

* defaults the store flag to true

* adds the missing argument

* persists messages in memory conditioned to the persistMessages flag

* adds persistmessages flag to the config_bridge

* defaults persistmessages to true

* defaults the store flag to true and persist-messages to false

* updates store.md

* updates chat2 instructions about --store flag

* removes --store flag from chat2 command execution

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

Fix: light-mode relay for all light protocols (#529)

* Fix: light-mode relay for all light protocols

* Clear up confusing use of overloaded concepts

Fix resulting issues after merge
This commit is contained in:
Ebube Sered Ud 2021-05-05 09:34:40 +01:00 committed by GitHub
parent 824b621a50
commit 533fba9874
11 changed files with 576 additions and 586 deletions

View File

@ -3,6 +3,7 @@
## Next version ## Next version
- Refactor: Split out `waku_types` types into right place; create utils folder. - Refactor: Split out `waku_types` types into right place; create utils folder.
- Refactor: Replace sequence of ContentTopics in ContentFilter with a single ContentTopic.
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules. - Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation) - PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers. - Added a peer manager for `relay`, `filter`, `store` and `swap` peers.

View File

@ -318,7 +318,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
info "Hit filter handler" info "Hit filter handler"
await node.subscribe( await node.subscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[DefaultContentTopic])], pubSubTopic: DefaultTopic, subscribe: true), FilterRequest(contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
filterHandler filterHandler
) )

View File

@ -291,8 +291,11 @@ procSuite "Waku v2 JSON-RPC API":
# Light node has not yet subscribed to any filters # Light node has not yet subscribed to any filters
node.filters.len() == 0 node.filters.len() == 0
let contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic, ContentTopic("2")]), let contentFilters = @[ContentFilter(contentTopic: defaultContentTopic),
ContentFilter(contentTopics: @[ContentTopic("3"), ContentTopic("4")])] ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
check: check:
@ -330,7 +333,7 @@ procSuite "Waku v2 JSON-RPC API":
# First ensure subscription exists # First ensure subscription exists
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic])], topic = some(defaultTopic)) let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: defaultContentTopic)], topic = some(defaultTopic))
check: check:
sub sub

View File

@ -39,7 +39,7 @@ procSuite "Waku Filter":
let let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto) dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
@ -88,7 +88,7 @@ procSuite "Waku Filter":
let let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto) dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
@ -118,7 +118,7 @@ procSuite "Waku Filter":
responseCompletionFuture = newFuture[bool]() responseCompletionFuture = newFuture[bool]()
let let
rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: false) rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: false)
await proto.unsubscribe(rpcU) await proto.unsubscribe(rpcU)
@ -145,7 +145,7 @@ procSuite "Waku Filter":
let let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto) dialSwitch.mount(proto)

View File

@ -30,7 +30,7 @@ procSuite "WakuNode":
Port(60000)) Port(60000))
pubSubTopic = "chat" pubSubTopic = "chat"
contentTopic = ContentTopic("/waku/2/default-content/proto") contentTopic = ContentTopic("/waku/2/default-content/proto")
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
message = WakuMessage(payload: "hello world".toBytes(), message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic) contentTopic: contentTopic)
@ -82,7 +82,7 @@ procSuite "WakuNode":
Port(60002)) Port(60002))
pubSubTopic = "chat" pubSubTopic = "chat"
contentTopic = ContentTopic("/waku/2/default-content/proto") contentTopic = ContentTopic("/waku/2/default-content/proto")
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
message = WakuMessage(payload: "hello world".toBytes(), message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic) contentTopic: contentTopic)
@ -149,8 +149,8 @@ procSuite "WakuNode":
otherPayload = @[byte 9] otherPayload = @[byte 9]
defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic) defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic)
otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic) otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic)
defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[defaultContentTopic])], subscribe: true) defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: defaultContentTopic)], subscribe: true)
otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[otherContentTopic])], subscribe: true) otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: otherContentTopic)], subscribe: true)
await node1.start() await node1.start()
node1.mountRelay() node1.mountRelay()
@ -221,7 +221,7 @@ procSuite "WakuNode":
contentTopic = "defaultCT" contentTopic = "defaultCT"
payload = @[byte 1] payload = @[byte 1]
message = WakuMessage(payload: payload, contentTopic: contentTopic) message = WakuMessage(payload: payload, contentTopic: contentTopic)
filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
await node1.start() await node1.start()
node1.mountRelay() node1.mountRelay()
@ -322,7 +322,7 @@ procSuite "WakuNode":
msg == message msg == message
completionFut.complete(true) completionFut.complete(true)
await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true), handler) await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), handler)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)

View File

@ -63,7 +63,7 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
# Successfully subscribed to all content filters # Successfully subscribed to all content filters
for cTopic in concat(contentFilters.mapIt(it.contentTopics)): for cTopic in contentFilters.mapIt(it.contentTopic):
# Create message cache for each subscribed content topic # Create message cache for each subscribed content topic
messageCache[cTopic] = @[] messageCache[cTopic] = @[]
@ -83,7 +83,7 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
if (await node.unsubscribe(fReq).withTimeout(futTimeout)): if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
# Successfully unsubscribed from all content filters # Successfully unsubscribed from all content filters
for cTopic in concat(contentFilters.mapIt(it.contentTopics)): for cTopic in contentFilters.mapIt(it.contentTopic):
# Remove message cache for each unsubscribed content topic # Remove message cache for each unsubscribed content topic
messageCache.del(cTopic) messageCache.del(cTopic)

View File

@ -81,21 +81,17 @@ func asEthKey*(key: PrivateKey): keys.PrivateKey =
proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
# Flatten all unsubscribe topics into single seq # Flatten all unsubscribe topics into single seq
var unsubscribeTopics: seq[ContentTopic] let unsubscribeTopics = contentFilters.mapIt(it.contentTopic)
for cf in contentFilters:
unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics)
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
var rIdToRemove: seq[string] = @[] var rIdToRemove: seq[string] = @[]
for rId, f in filters.mpairs: for rId, f in filters.mpairs:
# Iterate filter entries to remove matching content topics # Iterate filter entries to remove matching content topics
for cf in f.contentFilters.mitems:
# Iterate content filters in filter entry
cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
# make sure we delete the content filter # make sure we delete the content filter
# if no more topics are left # if no more topics are left
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0) f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
if f.contentFilters.len == 0: if f.contentFilters.len == 0:
rIdToRemove.add(rId) rIdToRemove.add(rId)

View File

@ -48,30 +48,21 @@ proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
# TODO: In case of no topics we should either trigger here for all messages, # TODO: In case of no topics we should either trigger here for all messages,
# or we should not allow such filter to exist in the first place. # or we should not allow such filter to exist in the first place.
for contentFilter in filter.contentFilters: for contentFilter in filter.contentFilters:
if contentFilter.contentTopics.len > 0: if msg.contentTopic == contentFilter.contentTopic:
if msg.contentTopic in contentFilter.contentTopics: filter.handler(msg)
filter.handler(msg) break
break
proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) = proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) =
# Flatten all unsubscribe topics into single seq # Flatten all unsubscribe topics into single seq
var unsubscribeTopics: seq[ContentTopic] let unsubscribeTopics = request.contentFilters.mapIt(it.contentTopic)
for cf in request.contentFilters:
unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics)
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
for subscriber in subscribers.mitems: for subscriber in subscribers.mitems:
if subscriber.peer.peerId != peerId: continue if subscriber.peer.peerId != peerId: continue
# Iterate through subscriber entries matching peer ID to remove matching content topics
for cf in subscriber.filter.contentFilters.mitems:
# Iterate content filters in filter entry
cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
# make sure we delete the content filter # make sure we delete the content filter
# if no more topics are left # if no more topics are left
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0) subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
# make sure we delete the subscriber # make sure we delete the subscriber
# if no more content filters left # if no more content filters left
@ -83,8 +74,7 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest
proc encode*(filter: ContentFilter): ProtoBuffer = proc encode*(filter: ContentFilter): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
for contentTopic in filter.contentTopics: result.write(1, filter.contentTopic)
result.write(1, contentTopic)
proc encode*(rpc: FilterRequest): ProtoBuffer = proc encode*(rpc: FilterRequest): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
@ -99,10 +89,10 @@ proc encode*(rpc: FilterRequest): ProtoBuffer =
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var contentTopics: seq[ContentTopic] var contentTopic: ContentTopic
discard ? pb.getRepeatedField(1, contentTopics) discard ? pb.getField(1, contentTopic)
ok(ContentFilter(contentTopics: contentTopics)) ok(ContentFilter(contentTopic: contentTopic))
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
@ -213,7 +203,7 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
continue continue
for filter in subscriber.filter.contentFilters: for filter in subscriber.filter.contentFilters:
if msg.contentTopic in filter.contentTopics: if msg.contentTopic == filter.contentTopic:
trace "Found matching contentTopic", filter=filter, msg=msg trace "Found matching contentTopic", filter=filter, msg=msg
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))

View File

@ -10,7 +10,7 @@ export waku_message
type type
ContentFilter* = object ContentFilter* = object
contentTopics*: seq[ContentTopic] contentTopic*: ContentTopic
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}