feat: add filter bindings and improve docs (#284)

This commit is contained in:
Richard Ramos 2022-08-09 09:48:23 -04:00 committed by GitHub
parent 924acf67d9
commit 2b4a2d72d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1148 additions and 405 deletions

View File

@ -7,6 +7,7 @@ import com.example.waku.events.MessageEvent
import com.example.waku.messages.Message
import com.example.waku.store.StoreQuery
import com.example.waku.store.StoreResponse
import com.example.waku.filter.FilterSubscription
import gowaku.Gowaku
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
@ -315,7 +316,7 @@ fun Node.peers(): List<Peer> {
* Query message history
* @param query Query
* @param peerID PeerID to ask the history from. Use "" to automatically select a peer
* @param ms If ms is greater than 0, the broadcast of the message must happen before the timeout
* @param ms If ms is greater than 0, response must be received before the timeout
* (in milliseconds) is reached, or an error will be returned
* @return Response containing the messages and cursor for pagination. Use the cursor in further queries to retrieve more results
*/
@ -324,3 +325,28 @@ fun Node.storeQuery(query: StoreQuery, peerID: String = "", ms: Long = 0): Store
val response = Gowaku.storeQuery(queryJSON, peerID, ms)
return handleResponse<StoreResponse>(response)
}
/**
* Creates a subscription in a lightnode for messages
* @param filter Filter criteria
* @param peerID PeerID to subscribe to. Use "" to automatically select a peer
* @param ms If ms is greater than 0, the subscription must be done before the timeout
* (in milliseconds) is reached, or an error will be returned
*/
func Node.filterSubscribe(filter: FilterSubscription, peerID: String = "", ms: Long = 0) {
val filterJSON = Json.encodeToString(filter)
val response = Gowaku.filterSubscribe(filterJSON, peerID, ms)
handleResponse(response)
}
/**
* Removes subscriptions in a light node
* @param filter Filter criteria
* @param ms If ms is greater than 0, the unsubscription must be done before the timeout
* (in milliseconds) is reached, or an error will be returned
*/
func Node.filterUnsubscribe(filter: FilterSubscription, ms: Long = 0) {
val filterJSON = Json.encodeToString(filter)
val response = Gowaku.filterUnsubscribe(filterJSON, ms)
handleResponse(response)
}

View File

@ -0,0 +1,6 @@
package com.example.waku.filter
import kotlinx.serialization.Serializable
@Serializable
data class ContentFilter(val contentTopic: String)

View File

@ -0,0 +1,9 @@
package com.example.waku.filter
import kotlinx.serialization.Serializable
@Serializable
data class FilterSubscription(
var contentFilters: List<ContentFilter>,
var topic: String?,
)

View File

@ -8,6 +8,6 @@ data class StoreQuery(
var pubsubTopic: String? = DefaultPubsubTopic(),
var startTime: Long? = null,
var endTime: Long? = null,
var contentFilter: List<ContentFilter>?,
var contentFilters: List<ContentFilter>?,
var pagingOptions: PagingOptions?
)

View File

@ -13,6 +13,7 @@ namespace Waku
public int? keepAliveInterval { get; set; }
public bool? relay { get; set; }
public int? minPeersToPublish {get; set; }
public bool? enableFilter {get; set; }
}
public enum EventType
@ -109,6 +110,12 @@ namespace Waku
public PagingOptions? pagingInfo { get; set; }
}
public class FilterSubscription
{
public IList<ContentFilter> contentFilters { get; set; } = new List<ContentFilter>();
public string? topic { get; set; }
}
public class Node
{
private bool _running;
@ -525,7 +532,7 @@ namespace Waku
/// </summary>
/// <param name="query">Query</param>
/// <param name="peerID">PeerID to ask the history from. Use NULL to automatically select a peer</param>
/// <param name="ms">If ms is greater than 0, the broadcast of the message must happen before the timeout (in milliseconds) is reached, or an error will be returned</param>
/// <param name="ms">If ms is greater than 0, the response must be received before the timeout (in milliseconds) is reached, or an error will be returned</param>
/// <returns>Response containing the messages and cursor for pagination. Use the cursor in further queries to retrieve more results</returns>
public StoreResponse StoreQuery(StoreQuery query, string? peerID = null, int ms = 0)
{
@ -535,8 +542,36 @@ namespace Waku
return Response.HandleStoreResponse(ptr, "could not extract query response");
}
[DllImport(Constants.dllName)]
internal static extern IntPtr waku_filter_subscribe(string filterJSON, string? peerID, int ms);
/// <summary>
/// Creates a subscription in a lightnode for messages
/// </summary>
/// <param name="filter">Filter criteria</param>
/// <param name="peerID">PeerID to subscribe to</param>
/// <param name="ms">If ms is greater than 0, the subscription must be done before the timeout (in milliseconds) is reached, or an error will be returned</param>
public FilterSubscribe(FilterSubscription filter, string? peerID = null, int ms = 0)
{
string filterJSON = JsonSerializer.Serialize(filter);
IntPtr ptr = waku_filter_subscribe(filterJSON, peerID, ms);
Response.HandleResponse(ptr);
}
[DllImport(Constants.dllName)]
internal static extern IntPtr waku_filter_unsubscribe(string filterJSON, int ms);
/// <summary>
/// Removes subscriptions in a light node
/// </summary>
/// <param name="filter">Filter criteria</param>
/// <param name="ms">If ms is greater than 0, the unsubscription must be done before the timeout (in milliseconds) is reached, or an error will be returned</param>
public FilterUnsubscribe(FilterSubscription filter, int ms = 0)
{
string filterJSON = JsonSerializer.Serialize(filter);
IntPtr ptr = waku_filter_unsubscribe(filterJSON, ms);
Response.HandleResponse(ptr);
}
}
}

File diff suppressed because it is too large Load Diff

44
library/api_filter.go Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"C"
mobile "github.com/status-im/go-waku/mobile"
)
//export waku_filter_subscribe
// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "topic": "the pubsub topic" // optional
// }
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char {
response := mobile.FilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
return C.CString(response)
}
//export waku_filter_unsubscribe
// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "topic": "the pubsub topic" // optional
// }
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
func waku_filter_unsubscribe(filterJSON *C.char, ms C.int) *C.char {
response := mobile.FilterUnsubscribe(C.GoString(filterJSON), int(ms))
return C.CString(response)
}

View File

@ -44,6 +44,7 @@ type wakuConfig struct {
NodeKey *string `json:"nodeKey,omitempty"`
KeepAliveInterval *int `json:"keepAliveInterval,omitempty"`
EnableRelay *bool `json:"relay"`
EnableFilter *bool `json:"filter"`
MinPeersToPublish *int `json:"minPeersToPublish"`
}
@ -52,6 +53,7 @@ var defaultPort = 60000
var defaultKeepAliveInterval = 20
var defaultEnableRelay = true
var defaultMinPeersToPublish = 0
var defaultEnableFilter = false
func getConfig(configJSON string) (wakuConfig, error) {
var config wakuConfig
@ -70,6 +72,10 @@ func getConfig(configJSON string) (wakuConfig, error) {
config.EnableRelay = &defaultEnableRelay
}
if config.EnableFilter == nil {
config.EnableFilter = &defaultEnableFilter
}
if config.Host == nil {
config.Host = &defaultHost
}
@ -131,6 +137,10 @@ func NewNode(configJSON string) string {
opts = append(opts, node.WithWakuRelayAndMinPeers(*config.MinPeersToPublish))
}
if *config.EnableFilter {
opts = append(opts, node.WithWakuFilter(false))
}
ctx := context.Background()
w, err := node.New(ctx, opts...)

106
mobile/api_filter.go Normal file
View File

@ -0,0 +1,106 @@
package gowaku
import (
"context"
"encoding/json"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type FilterArgument struct {
Topic string `json:"topic,omitempty"`
ContentFilters []pb.ContentFilter `json:"contentFilters,omitempty"`
}
func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
var f FilterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return filter.ContentFilter{}, err
}
result := filter.ContentFilter{
Topic: f.Topic,
}
for _, cf := range f.ContentFilters {
result.ContentTopics = append(result.ContentTopics, cf.ContentTopic)
}
return result, err
}
func FilterSubscribe(filterJSON string, peerID string, ms int) string {
cf, err := toContentFilter(filterJSON)
if err != nil {
return makeJSONResponse(err)
}
if wakuNode == nil {
return makeJSONResponse(errWakuNodeNotReady)
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
}
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return makeJSONResponse(err)
}
fOptions = append(fOptions, filter.WithPeer(p))
} else {
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
}
_, f, err := wakuNode.Filter().Subscribe(ctx, cf, fOptions...)
if err != nil {
return makeJSONResponse(err)
}
go func(f filter.Filter) {
for envelope := range f.Chan {
send("message", toSubscriptionMessage(envelope))
}
}(f)
return prepareJSONResponse(true, nil)
}
func FilterUnsubscribe(filterJSON string, ms int) string {
cf, err := toContentFilter(filterJSON)
if err != nil {
return makeJSONResponse(err)
}
if wakuNode == nil {
return makeJSONResponse(errWakuNodeNotReady)
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
}
err = wakuNode.Filter().UnsubscribeFilter(ctx, cf)
if err != nil {
return makeJSONResponse(err)
}
return makeJSONResponse(nil)
}

View File

@ -12,8 +12,8 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/relay"
)
var subscriptions map[string]*relay.Subscription = make(map[string]*relay.Subscription)
var mutex sync.Mutex
var relaySubscriptions map[string]*relay.Subscription = make(map[string]*relay.Subscription)
var relaySubsMutex sync.Mutex
func RelayEnoughPeers(topic string) string {
if wakuNode == nil {
@ -86,10 +86,10 @@ func RelaySubscribe(topic string) string {
topicToSubscribe := getTopic(topic)
mutex.Lock()
defer mutex.Unlock()
relaySubsMutex.Lock()
defer relaySubsMutex.Unlock()
_, ok := subscriptions[topicToSubscribe]
_, ok := relaySubscriptions[topicToSubscribe]
if ok {
return makeJSONResponse(nil)
}
@ -99,7 +99,7 @@ func RelaySubscribe(topic string) string {
return makeJSONResponse(err)
}
subscriptions[topicToSubscribe] = subscription
relaySubscriptions[topicToSubscribe] = subscription
go func(subscription *relay.Subscription) {
for envelope := range subscription.C {
@ -117,17 +117,17 @@ func RelayUnsubscribe(topic string) string {
topicToUnsubscribe := getTopic(topic)
mutex.Lock()
defer mutex.Unlock()
relaySubsMutex.Lock()
defer relaySubsMutex.Unlock()
subscription, ok := subscriptions[topicToUnsubscribe]
subscription, ok := relaySubscriptions[topicToUnsubscribe]
if ok {
return makeJSONResponse(nil)
}
subscription.Unsubscribe()
delete(subscriptions, topicToUnsubscribe)
delete(relaySubscriptions, topicToUnsubscribe)
err := wakuNode.Relay().Unsubscribe(context.Background(), topicToUnsubscribe)
if err != nil {

View File

@ -27,13 +27,16 @@ func prepareJSONResponse(result interface{}, err error) string {
func makeJSONResponse(err error) string {
var errString *string = nil
result := true
if err != nil {
errStr := err.Error()
errString = &errStr
result = false
}
out := jsonResponse{
Error: errString,
Error: errString,
Result: result,
}
outBytes, _ := json.Marshal(out)