feat(c-bindings): support creating multiple instances (#929)

This commit is contained in:
richΛrd 2023-12-15 10:46:21 -04:00 committed by GitHub
parent 097123a30e
commit 5e3c9fdfa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 972 additions and 523 deletions

View File

@ -25,7 +25,7 @@ void on_error(int ret, const char *result, void *user_data)
return;
}
printf("function execution failed. Returned code: %d\n", ret);
printf("function execution failed. Returned code: %d, %s\n", ret, result);
exit(1);
}
@ -33,7 +33,7 @@ void on_response(int ret, const char *result, void *user_data)
{
if (ret != 0)
{
printf("function execution failed. Returned code: %d\n", ret);
printf("function execution failed. Returned code: %d, %s\n", ret, result);
exit(1);
}
@ -119,26 +119,26 @@ void callBack(int ret, const char *signal, void *user_data)
int main(int argc, char *argv[])
{
// Set callback to be executed each time a message is received
waku_set_event_callback(callBack);
// configJSON can be NULL too to use defaults. Any value not defined will have
// a default set
char *configJSON = "{\"host\": \"0.0.0.0\", \"port\": 60000, "
"\"logLevel\":\"error\", \"store\":true}";
waku_new(configJSON, on_error, NULL);
void* ctx = waku_new(configJSON, on_error, NULL);
// Set callback to be executed each time a message is received
waku_set_event_callback(ctx, callBack);
// Start the node, enabling the waku protocols
waku_start(on_error, NULL);
waku_start(ctx, on_error, NULL);
// Obtain the node's peerID
char *peerID = NULL;
waku_peerid(on_response, (void *)&peerID);
waku_peerid(ctx, on_response, (void *)&peerID);
printf("PeerID: %s\n", peerID);
// Obtain the node's multiaddresses
char *addresses = NULL;
waku_listen_addresses(on_response, (void *)&addresses);
waku_listen_addresses(ctx, on_response, (void *)&addresses);
printf("Addresses: %s\n", addresses);
// Build a content topic
@ -154,12 +154,12 @@ int main(int argc, char *argv[])
// To use dns discovery, and retrieve nodes from a enrtree url
char *discoveredNodes = NULL;
waku_dns_discovery("enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im",
waku_dns_discovery(ctx, "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im",
"", 0, on_response, (void *)&discoveredNodes);
printf("Discovered nodes: %s\n", discoveredNodes);
// Connect to a node
waku_connect("/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/"
waku_connect(ctx, "/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/"
"p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ",
0, on_response, NULL);
@ -176,7 +176,7 @@ int main(int argc, char *argv[])
sprintf(contentFilter,
"{\"pubsubTopic\":\"%s\",\"contentTopics\":[\"%s\"]}",
defaultPubsubTopic, contentTopic);
waku_relay_subscribe(contentFilter, on_error, NULL);
waku_relay_subscribe(ctx, contentFilter, on_error, NULL);
int i = 0;
int version = 1;
@ -202,7 +202,7 @@ int main(int argc, char *argv[])
// Broadcast via waku relay
char *messageID = NULL;
waku_relay_publish(encodedMessage, defaultPubsubTopic, 0, on_response,
waku_relay_publish(ctx, encodedMessage, defaultPubsubTopic, 0, on_response,
(void *)&messageID);
printf("MessageID: %s\n", messageID);
@ -221,7 +221,10 @@ int main(int argc, char *argv[])
// printf("%s\n", local_result);
// Stop the node's execution
waku_stop(on_response, NULL);
waku_stop(ctx, on_response, NULL);
// Release resources allocated to waku
waku_free(ctx, on_response, NULL);
// TODO: free all char*

View File

@ -479,16 +479,53 @@ If a key is `undefined`, or `null`, a default value will be set. If using `secur
- `certPath`: secure websocket certificate path
- `keyPath`: secure websocket key path
### `extern void* waku_init()`
### `extern int waku_new(char* jsonConfig, WakuCallBack onErrCb void* userData)`
Allocate memory for a waku node.
**Returns**
The result of this function must be passed to all waku_* functions that require a `void* ctx`
### `extern int waku_new(void* ctx, char* jsonConfig, WakuCallBack onErrCb void* userData)`
Instantiates a Waku node.
**Parameters**
1. `char* jsonConfig`: JSON string containing the options used to initialize a go-waku node.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* jsonConfig`: JSON string containing the options used to initialize a go-waku node.
Type [`JsonConfig`](#jsonconfig-type).
It can be `NULL` to use defaults.
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_start(void* ctx, WakuCallBack onErrCb void* userData)`
Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
**Parameters**
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_stop(void* ctx, WakuCallBack onErrCb void* userData)`
Stops a Waku node.
**Parameters**
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
@ -496,42 +533,30 @@ Instantiates a Waku node.
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_start(WakuCallBack onErrCb void* userData)`
### `extern int waku_free(void* ctx, WakuCallBack onErrCb void* userData)`
Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
Release the resources allocated to a waku node (stopping the node first if necessary)
**Parameters**
1. `WakuCallBack onErrCb`: callback to be executed if the function fails
2. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_stop(WakuCallBack onErrCb void* userData)`
Stops a Waku node.
**Parameters**
1. `WakuCallBack onErrCb`: callback to be executed if the function fails
2. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_peerid(WakuCallBack cb, void* userData)`
### `extern int waku_peerid(void* ctx, WakuCallBack cb, void* userData)`
Get the peer ID of the waku node.
**Parameters**
1. `WakuCallBack cb`: callback to be executed.
2. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack cb`: callback to be executed.
3. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -539,14 +564,15 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `cb` will receive the base58 encoded peer ID, for example `QmWjHKUrXDHPCwoWXpUZ77E8o6UbAoTTZwf1AD1tDC4KNP`.
### `extern int waku_listen_addresses(WakuCallBack cb, void* userData)`
### `extern int waku_listen_addresses(void* ctx, WakuCallBack cb, void* userData)`
Get the multiaddresses the Waku node is listening to.
**Parameters**
1. `WakuCallBack cb`: callback to be executed
2. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack cb`: callback to be executed
3. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -567,16 +593,17 @@ For example:
## Connecting to peers
### `extern int waku_add_peer(char* address, char* protocolId, WakuCallBack cb, void* userData)`
### `extern int waku_add_peer(void* ctx, char* address, char* protocolId, WakuCallBack cb, void* userData)`
Add a node multiaddress and protocol to the waku node's peerstore.
**Parameters**
1. `char* address`: A multiaddress (with peer id) to reach the peer being added.
2. `char* protocolId`: A protocol we expect the peer to support.
3. `WakuCallBack cb`: callback to be executed
4. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* address`: A multiaddress (with peer id) to reach the peer being added.
3. `char* protocolId`: A protocol we expect the peer to support.
4. `WakuCallBack cb`: callback to be executed
5. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -585,14 +612,15 @@ If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `cb` will receive the base 58 peer ID of the peer that was added.
For example: `QmWjHKUrXDHPCwoWXpUZ77E8o6UbAoTTZwf1AD1tDC4KNP`
### `extern int waku_connect(char* address, int timeoutMs, WakuCallBack onErrCb void* userData)`
### `extern int waku_connect(void* ctx, char* address, int timeoutMs, WakuCallBack onErrCb void* userData)`
Dial peer using a multiaddress.
**Parameters**
1. `char* address`: A multiaddress to reach the peer being dialed.
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* address`: A multiaddress to reach the peer being dialed.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
@ -603,20 +631,36 @@ Dial peer using a multiaddress.
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_connect_peerid(char* peerId, int timeoutMs, WakuCallBack onErrCb void* userData)`
### `extern int waku_connect_peerid(void* ctx, char* peerId, int timeoutMs, WakuCallBack onErrCb void* userData)`
Dial peer using its peer ID.
**Parameters**
1`char* peerID`: Peer ID to dial.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* peerID`: Peer ID to dial.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect`](#extern-char-waku_connectchar-address-int-timeoutms).
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
4. `WakuCallBack onErrCb`: callback to be executed if the function fails
5. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_disconnect(void* ctx, char* peerId, WakuCallBack onErrCb void* userData)`
Disconnect a peer using its peerID
**Parameters**
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* peerID`: Peer ID to disconnect.
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
@ -624,27 +668,15 @@ Dial peer using its peer ID.
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_disconnect(char* peerId, WakuCallBack onErrCb void* userData)`
Disconnect a peer using its peerID
**Parameters**
1. `char* peerID`: Peer ID to disconnect.
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_peer_cnt(WakuCallBack cb, void* userData)`
### `extern int waku_peer_cnt(void* ctx, WakuCallBack cb, void* userData)`
Get number of connected peers.
**Parameters**
1. `WakuCallBack cb`: callback to be executed
2. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack cb`: callback to be executed
3. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -652,13 +684,15 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `cb` will receive the number of connected peers.
### `extern int waku_peers(WakuCallBack cb, void* userData)`
### `extern int waku_peers(void* ctx, WakuCallBack cb, void* userData)`
Retrieve the list of peers known by the Waku node.
**Parameters**
1. `WakuCallBack cb`: callback to be executed if the function is succesful
2. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack cb`: callback to be executed if the function is succesful
3. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -748,21 +782,22 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
If the function is executed succesfully, `onOkCb` will receive the default pubsub topic: `/waku/2/default-waku/proto`
### `extern int waku_relay_publish(char* messageJson, char* pubsubTopic, int timeoutMs, WakuCallBack cb, void* userData)`
### `extern int waku_relay_publish(void* ctx, char* messageJson, char* pubsubTopic, int timeoutMs, WakuCallBack cb, void* userData)`
Publish a message using Waku Relay.
**Parameters**
1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
2. `char* pubsubTopic`: pubsub topic on which to publish the message.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
3. `char* pubsubTopic`: pubsub topic on which to publish the message.
If `NULL`, it derives the pubsub topic from content-topic based on autosharding.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
4. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
4. `WakuCallBack cb`: callback to be executed
5. `void* userData`: used to pass custom information to the callback function
5. `WakuCallBack cb`: callback to be executed
6. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -770,16 +805,17 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `onOkCb` will receive the message ID.
### `extern int waku_relay_enough_peers(char* pubsubTopic, WakuCallBack cb, void* userData)`
### `extern int waku_relay_enough_peers(void* ctx, char* pubsubTopic, WakuCallBack cb, void* userData)`
Determine if there are enough peers to publish a message on a given pubsub topic.
**Parameters**
1. `char* pubsubTopic`: Pubsub topic to verify.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* pubsubTopic`: Pubsub topic to verify.
If `NULL`, it verifies the number of peers in the default pubsub topic.
2. `WakuCallBack cb`: callback to be executed
3. `void* userData`: used to pass custom information to the callback function
3. `WakuCallBack cb`: callback to be executed
4. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -787,27 +823,30 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `onOkCb` will receive a string `boolean` indicating whether there are enough peers, i.e. `true` or `false`
### `extern int waku_relay_subscribe(char* filterJSON, WakuCallBack onErrCb, void* userData)`
### `extern int waku_relay_subscribe(void* ctx, char* filterJSON, WakuCallBack onErrCb, void* userData)`
Subscribe to a Waku Relay pubsub topic to receive messages.
**Parameters**
1. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_relay_topics(WakuCallBack cb, void* userData)`
### `extern int waku_relay_topics(void* ctx, WakuCallBack cb, void* userData)`
Get the list of subscribed pubsub topics in Waku Relay.
**Parameters**
1. `WakuCallBack cb`: callback to be executed
2. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `WakuCallBack cb`: callback to be executed
3. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -824,7 +863,7 @@ For example:
**Events**
When a message is received, a ``"message"` event` is emitted containing the message, pubsub topic, and node ID in which
When a message is received, a `"message"` event` is emitted containing the message, pubsub topic, and node ID in which
the message was received.
The `event` type is [`JsonMessageEvent`](#jsonmessageevent-type).
@ -847,16 +886,17 @@ For Example:
}
```
### `extern int waku_relay_unsubscribe(char* filterJSON, WakuCallBack onErrCb, void* userData)`
### `extern int waku_relay_unsubscribe(void* ctx, char* filterJSON, WakuCallBack onErrCb, void* userData)`
Closes the pubsub subscription to pubsub topic matching a criteria. No more messages will be received
from this pubsub topic.
**Parameters**
1. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to unsubscribe from
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to unsubscribe from
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -865,24 +905,25 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
## Waku Filter
### `extern int waku_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
### `extern int waku_filter_subscribe(void* ctx, char* filterJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
Creates a subscription to a filter full node matching a content filter..
**Parameters**
1. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive
2. `char* peerID`: Peer ID to subscribe to.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive
3. `char* peerID`: Peer ID to subscribe to.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
Use `NULL` to automatically select a node.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
4. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
4. `WakuCallBack cb`: callback to be executed
5. `void* userData`: used to pass custom information to the callback function
5. `WakuCallBack cb`: callback to be executed
6. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -932,35 +973,14 @@ For Example:
```
### `extern int waku_filter_ping(char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)`
### `extern int waku_filter_ping(void* ctx, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)`
Used to know if a service node has an active subscription for this client
**Parameters**
1. `char* peerID`: Peer ID to check for an active subscription
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_filter_unsubscribe(filterJSON *C.char, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)`
Sends a requests to a service node to stop pushing messages matching this filter to this client. It might be used to modify an existing subscription by providing a subset of the original filter criteria
**Parameters**
1. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type) criteria to unsubscribe from
2. `char* peerID`: Peer ID to unsubscribe from
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* peerID`: Peer ID to check for an active subscription
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
@ -976,7 +996,30 @@ Sends a requests to a service node to stop pushing messages matching this filter
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_filter_unsubscribe_all(char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
### `extern int waku_filter_unsubscribe(void* ctx, filterJSON *C.char, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)`
Sends a requests to a service node to stop pushing messages matching this filter to this client. It might be used to modify an existing subscription by providing a subset of the original filter criteria
**Parameters**
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type) criteria to unsubscribe from
3. `char* peerID`: Peer ID to unsubscribe from
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
4. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
5. `WakuCallBack onErrCb`: callback to be executed if the function fails
6. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
### `extern int waku_filter_unsubscribe_all(void* ctx, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
Sends a requests to a service node (or all service nodes) to stop pushing messages
@ -1015,24 +1058,25 @@ For example:
## Waku Legacy Filter
### `extern int waku_legacy_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)`
### `extern int waku_legacy_filter_subscribe(void* ctx, char* filterJSON, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)`
Creates a subscription in a lightnode for messages that matches a content filter and optionally a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor).
**Parameters**
1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#legacyfiltersubscription-type) to subscribe to.
2. `char* peerID`: Peer ID to subscribe to.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#legacyfiltersubscription-type) to subscribe to.
3. `char* peerID`: Peer ID to subscribe to.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
Use `NULL` to automatically select a node.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
4. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
4. `WakuCallBack onErrCb`: callback to be executed if the function fails
5. `void* userData`: used to pass custom information to the callback function
5. `WakuCallBack onErrCb`: callback to be executed if the function fails
6. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -1063,19 +1107,20 @@ For Example:
}
```
### `extern int waku_legacy_filter_unsubscribe(char* filterJSON, int timeoutMs, WakuCallBack onErrCb, void* userData)`
### `extern int waku_legacy_filter_unsubscribe(void* ctx, char* filterJSON, int timeoutMs, WakuCallBack onErrCb, void* userData)`
Removes subscriptions in a light node matching a content filter and, optionally, a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor).
**Parameters**
1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#filtersubscription-type).
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#filtersubscription-type).
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
4. `WakuCallBack onErrCb`: callback to be executed if the function fails
5. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -1083,20 +1128,52 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible
## Waku Lightpush
### `extern int waku_lightpush_publish(char* messageJSON, char* topic, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
### `extern int waku_lightpush_publish(void* ctx, char* messageJSON, char* topic, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
Publish a message using Waku Lightpush.
**Parameters**
1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
2. `char* pubsubTopic`: pubsub topic on which to publish the message.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
3. `char* pubsubTopic`: pubsub topic on which to publish the message.
If `NULL`, it derives the pubsub topic from content-topic based on autosharding.
3. `char* peerID`: Peer ID supporting the lightpush protocol.
4. `char* peerID`: Peer ID supporting the lightpush protocol.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
Use `NULL` to automatically select a node.
5. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
6. `WakuCallBack cb`: callback to be executed
7. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `cb` will receive the message ID.
## Waku Store
### `extern int waku_store_query(void* ctx, char* queryJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
Retrieves historical messages on specific content topics. This method may be called with [`PagingOptions`](#pagingoptions-type),
to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](#pagingoptions-type), the node
must return messages on a per-page basis and include [`PagingOptions`](#pagingoptions-type) in the response. These [`PagingOptions`](#pagingoptions-type)
must contain a cursor pointing to the Index from which a new page can be requested.
**Parameters**
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type).
3. `char* peerID`: Peer ID supporting the store protocol.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
4. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
@ -1106,41 +1183,11 @@ Publish a message using Waku Lightpush.
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `cb` will receive the message ID.
## Waku Store
### `extern int waku_store_query(char* queryJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)`
Retrieves historical messages on specific content topics. This method may be called with [`PagingOptions`](#pagingoptions-type),
to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](#pagingoptions-type), the node
must return messages on a per-page basis and include [`PagingOptions`](#pagingoptions-type) in the response. These [`PagingOptions`](#pagingoptions-type)
must contain a cursor pointing to the Index from which a new page can be requested.
**Parameters**
1. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type).
2. `char* peerID`: Peer ID supporting the store protocol.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
4. `WakuCallBack cb`: callback to be executed
2. `void* userData`: used to pass custom information to the callback function
**Returns**
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
If the function execution fails, `cb` will receive a string containing an error.
If the function is executed succesfully, `cb` will receive a [`StoreResponse`](#storeresponse-type).
### `extern int waku_store_local_query(char* queryJSON, WakuCallBack cb, void* userData)`
### `extern int waku_store_local_query(void* ctx, char* queryJSON, WakuCallBack cb, void* userData)`
Retrieves locally stored historical messages on specific content topics. This method may be called with [`PagingOptions`](#pagingoptions-type),
to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](#pagingoptions-type), the node
@ -1149,13 +1196,14 @@ must contain a cursor pointing to the Index from which a new page can be request
**Parameters**
1. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type).
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type).
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
3. `WakuCallBack cb`: callback to be executed
2. `void* userData`: used to pass custom information to the callback function
4. `WakuCallBack cb`: callback to be executed
5. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -1265,20 +1313,21 @@ If the function is executed succesfully, `cb` will receive the decoded payload a
## DNS Discovery
### `extern int waku_dns_discovery(char* url, char* nameserver, int timeoutMs, WakuCallBack cb, void* userData)`
### `extern int waku_dns_discovery(void* ctx, char* url, char* nameserver, int timeoutMs, WakuCallBack cb, void* userData)`
Returns a list of multiaddress and enrs given a url to a DNS discoverable ENR tree
**Parameters**
1. `char* url`: URL containing a discoverable ENR tree
2. `char* nameserver`: The nameserver to resolve the ENR tree url.
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* url`: URL containing a discoverable ENR tree
3. `char* nameserver`: The nameserver to resolve the ENR tree url.
If `NULL` or empty, it will automatically use the default system dns.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
4. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
4. `WakuCallBack cb`: callback to be executed
3. `void* userData`: used to pass custom information to the callback function
5. `WakuCallBack cb`: callback to be executed
6. `void* userData`: used to pass custom information to the callback function
**Returns**
@ -1302,14 +1351,15 @@ If the function is executed succesfully, `onOkCb` will receive an array objects
## DiscoveryV5
### `extern int waku_discv5_update_bootnodes(char* bootnodes, WakuCallBack onErrCb, void* userData)`
### `extern int waku_discv5_update_bootnodes(void* ctx, char* bootnodes, WakuCallBack onErrCb, void* userData)`
Update the bootnode list used for discovering new peers via DiscoveryV5
**Parameters**
1. `char* bootnodes`: JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
2. `WakuCallBack onErrCb`: callback to be executed if the function fails
3. `void* userData`: used to pass custom information to the callback function
1. `void* ctx`: waku node instance, returned by `waku_init`.
2. `char* bootnodes`: JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
3. `WakuCallBack onErrCb`: callback to be executed if the function fails
4. `void* userData`: used to pass custom information to the callback function
**Returns**

View File

@ -88,41 +88,106 @@ func main() {}
// - dns4DomainName: the domain name resolving to the node's public IPv4 address.
//
//export waku_new
func waku_new(configJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.NewNode(C.GoString(configJSON))
return onError(err, cb, userData)
func waku_new(configJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) unsafe.Pointer {
if cb == nil {
panic("error: missing callback in waku_new")
}
cid := C.malloc(C.size_t(unsafe.Sizeof(uintptr(0))))
pid := (*uint)(cid)
instance := library.Init()
*pid = instance.ID
err := library.NewNode(instance, C.GoString(configJSON))
if err != nil {
onError(err, cb, userData)
return nil
}
return cid
}
// Starts the waku node
//
//export waku_start
func waku_start(onErr C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.Start()
func waku_start(ctx unsafe.Pointer, onErr C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, onErr, userData)
}
err = library.Start(instance)
return onError(err, onErr, userData)
}
// Stops a waku node
//
//export waku_stop
func waku_stop(onErr C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.Stop()
func waku_stop(ctx unsafe.Pointer, onErr C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, onErr, userData)
}
err = library.Stop(instance)
return onError(err, onErr, userData)
}
// Release the resources allocated to a waku node (stopping the node first if necessary)
//
//export waku_free
func waku_free(ctx unsafe.Pointer, onErr C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
return onError(err, onErr, userData)
}
err = library.Stop(instance)
if err != nil {
return onError(err, onErr, userData)
}
err = library.Free(instance)
if err == nil {
C.free(ctx)
}
return onError(err, onErr, userData)
}
// Determine is a node is started or not
//
//export waku_is_started
func waku_is_started() C.int {
started := library.IsStarted()
func waku_is_started(ctx unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
return 0
}
started := library.IsStarted(instance)
if started {
return 1
}
return 0
}
type fn func() (string, error)
type fn func(instance *library.WakuInstance) (string, error)
type fnNoInstance func() (string, error)
func singleFnExec(f fn, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
func singleFnExec(f fn, ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
result, err := f(instance)
if err != nil {
return onError(err, cb, userData)
}
return onSuccesfulResponse(result, cb, userData)
}
func singleFnExecNoCtx(f fnNoInstance, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
result, err := f()
if err != nil {
return onError(err, cb, userData)
@ -133,62 +198,77 @@ func singleFnExec(f fn, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
// Obtain the peer ID of the waku node
//
//export waku_peerid
func waku_peerid(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.PeerID()
}, cb, userData)
func waku_peerid(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.PeerID(instance)
}, ctx, cb, userData)
}
// Obtain the multiaddresses the wakunode is listening to
//
//export waku_listen_addresses
func waku_listen_addresses(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.ListenAddresses()
}, cb, userData)
func waku_listen_addresses(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.ListenAddresses(instance)
}, ctx, cb, userData)
}
// Add node multiaddress and protocol to the wakunode peerstore
//
//export waku_add_peer
func waku_add_peer(address *C.char, protocolID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.AddPeer(C.GoString(address), C.GoString(protocolID))
}, cb, userData)
func waku_add_peer(ctx unsafe.Pointer, address *C.char, protocolID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.AddPeer(instance, C.GoString(address), C.GoString(protocolID))
}, ctx, cb, userData)
}
// Connect to peer at multiaddress. if ms > 0, cancel the function execution if it takes longer than N milliseconds
//
//export waku_connect
func waku_connect(address *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.Connect(C.GoString(address), int(ms))
func waku_connect(ctx unsafe.Pointer, address *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.Connect(instance, C.GoString(address), int(ms))
return onError(err, cb, userData)
}
// Connect to known peer by peerID. if ms > 0, cancel the function execution if it takes longer than N milliseconds
//
//export waku_connect_peerid
func waku_connect_peerid(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.ConnectPeerID(C.GoString(peerID), int(ms))
func waku_connect_peerid(ctx unsafe.Pointer, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.ConnectPeerID(instance, C.GoString(peerID), int(ms))
return onError(err, cb, userData)
}
// Close connection to a known peer by peerID
//
//export waku_disconnect
func waku_disconnect(peerID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.Disconnect(C.GoString(peerID))
func waku_disconnect(ctx unsafe.Pointer, peerID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.Disconnect(instance, C.GoString(peerID))
return onError(err, cb, userData)
}
// Get number of connected peers
//
//export waku_peer_cnt
func waku_peer_cnt(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
peerCnt, err := library.PeerCnt()
func waku_peer_cnt(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
peerCnt, err := library.PeerCnt(instance)
return fmt.Sprintf("%d", peerCnt), err
}, cb, userData)
}, ctx, cb, userData)
}
// Create a content topic string according to RFC 23
@ -211,15 +291,20 @@ func waku_default_pubsub_topic(cb C.WakuCallBack, userData unsafe.Pointer) C.int
// signature for the callback should be `void myCallback(char* signalJSON)`
//
//export waku_set_event_callback
func waku_set_event_callback(cb C.WakuCallBack) {
library.SetEventCallback(unsafe.Pointer(cb))
func waku_set_event_callback(ctx unsafe.Pointer, cb C.WakuCallBack) {
instance, err := getInstance(ctx)
if err != nil {
panic(err.Error()) // TODO: refactor to return an error instead of panic
}
library.SetEventCallback(instance, unsafe.Pointer(cb))
}
// Retrieve the list of peers known by the waku node
//
//export waku_peers
func waku_peers(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.Peers()
}, cb, userData)
func waku_peers(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.Peers(instance)
}, ctx, cb, userData)
}

View File

@ -21,17 +21,22 @@ import (
// (in milliseconds) is reached, or an error will be returned
//
//export waku_dns_discovery
func waku_dns_discovery(url *C.char, nameserver *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
func waku_dns_discovery(ctx unsafe.Pointer, url *C.char, nameserver *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.DNSDiscovery(C.GoString(url), C.GoString(nameserver), int(ms))
}, cb, userData)
}, ctx, cb, userData)
}
// Update the bootnode list used for discovering new peers via DiscoveryV5
// The bootnodes param should contain a JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
//
//export waku_discv5_update_bootnodes
func waku_discv5_update_bootnodes(bootnodes *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.SetBootnodes(C.GoString(bootnodes))
func waku_discv5_update_bootnodes(ctx unsafe.Pointer, bootnodes *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.SetBootnodes(instance, C.GoString(bootnodes))
return onError(err, cb, userData)
}

View File

@ -14,7 +14,7 @@ import (
//
//export waku_decode_symmetric
func waku_decode_symmetric(messageJSON *C.char, symmetricKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return singleFnExecNoCtx(func() (string, error) {
return library.DecodeSymmetric(C.GoString(messageJSON), C.GoString(symmetricKey))
}, cb, userData)
}
@ -23,7 +23,7 @@ func waku_decode_symmetric(messageJSON *C.char, symmetricKey *C.char, cb C.WakuC
//
//export waku_decode_asymmetric
func waku_decode_asymmetric(messageJSON *C.char, privateKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return singleFnExecNoCtx(func() (string, error) {
return library.DecodeAsymmetric(C.GoString(messageJSON), C.GoString(privateKey))
}, cb, userData)
}
@ -35,7 +35,7 @@ func waku_decode_asymmetric(messageJSON *C.char, privateKey *C.char, cb C.WakuCa
//
//export waku_encode_asymmetric
func waku_encode_asymmetric(messageJSON *C.char, publicKey *C.char, optionalSigningKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return singleFnExecNoCtx(func() (string, error) {
return library.EncodeAsymmetric(C.GoString(messageJSON), C.GoString(publicKey), C.GoString(optionalSigningKey))
}, cb, userData)
}
@ -47,7 +47,7 @@ func waku_encode_asymmetric(messageJSON *C.char, publicKey *C.char, optionalSign
//
//export waku_encode_symmetric
func waku_encode_symmetric(messageJSON *C.char, symmetricKey *C.char, optionalSigningKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return singleFnExecNoCtx(func() (string, error) {
return library.EncodeSymmetric(C.GoString(messageJSON), C.GoString(symmetricKey), C.GoString(optionalSigningKey))
}, cb, userData)
}

View File

@ -24,10 +24,10 @@ import (
// It returns a json object containing the details of the subscriptions along with any errors in case of partial failures
//
//export waku_filter_subscribe
func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.FilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
}, cb, userData)
func waku_filter_subscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.FilterSubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms))
}, ctx, cb, userData)
}
// Used to know if a service node has an active subscription for this client
@ -36,8 +36,13 @@ func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.Wa
// (in milliseconds) is reached, or an error will be returned
//
//export waku_filter_ping
func waku_filter_ping(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.FilterPing(C.GoString(peerID), int(ms))
func waku_filter_ping(ctx unsafe.Pointer, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.FilterPing(instance, C.GoString(peerID), int(ms))
return onError(err, cb, userData)
}
@ -55,8 +60,13 @@ func waku_filter_ping(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsa
// (in milliseconds) is reached, or an error will be returned
//
//export waku_filter_unsubscribe
func waku_filter_unsubscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.FilterUnsubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
func waku_filter_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.FilterUnsubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms))
return onError(err, cb, userData)
}
@ -67,8 +77,8 @@ func waku_filter_unsubscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.
// (in milliseconds) is reached, or an error will be returned
//
//export waku_filter_unsubscribe_all
func waku_filter_unsubscribe_all(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.FilterUnsubscribeAll(C.GoString(peerID), int(ms))
}, cb, userData)
func waku_filter_unsubscribe_all(ctx unsafe.Pointer, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.FilterUnsubscribeAll(instance, C.GoString(peerID), int(ms))
}, ctx, cb, userData)
}

View File

@ -27,8 +27,13 @@ import (
// (in milliseconds) is reached, or an error will be returned
//
//export waku_legacy_filter_subscribe
func waku_legacy_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.LegacyFilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
func waku_legacy_filter_subscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.LegacyFilterSubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms))
return onError(err, cb, userData)
}
@ -48,7 +53,12 @@ func waku_legacy_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int,
// (in milliseconds) is reached, or an error will be returned
//
//export waku_legacy_filter_unsubscribe
func waku_legacy_filter_unsubscribe(filterJSON *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.LegacyFilterUnsubscribe(C.GoString(filterJSON), int(ms))
func waku_legacy_filter_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.LegacyFilterUnsubscribe(instance, C.GoString(filterJSON), int(ms))
return onError(err, cb, userData)
}

View File

@ -16,8 +16,8 @@ import (
// (in milliseconds) is reached, or an error will be returned
//
//export waku_lightpush_publish
func waku_lightpush_publish(messageJSON *C.char, topic *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.LightpushPublish(C.GoString(messageJSON), C.GoString(topic), C.GoString(peerID), int(ms))
}, cb, userData)
func waku_lightpush_publish(ctx unsafe.Pointer, messageJSON *C.char, topic *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.LightpushPublish(instance, C.GoString(messageJSON), C.GoString(topic), C.GoString(peerID), int(ms))
}, ctx, cb, userData)
}

View File

@ -14,14 +14,14 @@ import (
// to verify the number of peers in the default pubsub topic
//
//export waku_relay_enough_peers
func waku_relay_enough_peers(topic *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
result, err := library.RelayEnoughPeers(C.GoString(topic))
func waku_relay_enough_peers(ctx unsafe.Pointer, topic *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
result, err := library.RelayEnoughPeers(instance, C.GoString(topic))
if result {
return "true", err
}
return "false", err
}, cb, userData)
}, ctx, cb, userData)
}
// Publish a message using waku relay and returns the message ID. Use NULL for topic to derive the pubsub topic from the contentTopic.
@ -29,10 +29,10 @@ func waku_relay_enough_peers(topic *C.char, cb C.WakuCallBack, userData unsafe.P
// (in milliseconds) is reached, or an error will be returned.
//
//export waku_relay_publish
func waku_relay_publish(messageJSON *C.char, topic *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.RelayPublish(C.GoString(messageJSON), C.GoString(topic), int(ms))
}, cb, userData)
func waku_relay_publish(ctx unsafe.Pointer, messageJSON *C.char, topic *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.RelayPublish(instance, C.GoString(messageJSON), C.GoString(topic), int(ms))
}, ctx, cb, userData)
}
// Subscribe to WakuRelay to receive messages matching a content filter.
@ -47,8 +47,13 @@ func waku_relay_publish(messageJSON *C.char, topic *C.char, ms C.int, cb C.WakuC
// the message was received
//
//export waku_relay_subscribe
func waku_relay_subscribe(filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.RelaySubscribe(C.GoString(filterJSON))
func waku_relay_subscribe(ctx unsafe.Pointer, filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.RelaySubscribe(instance, C.GoString(filterJSON))
return onError(err, cb, userData)
}
@ -56,10 +61,10 @@ func waku_relay_subscribe(filterJSON *C.char, cb C.WakuCallBack, userData unsafe
// is subscribed to in WakuRelay
//
//export waku_relay_topics
func waku_relay_topics(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.RelayTopics()
}, cb, userData)
func waku_relay_topics(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.RelayTopics(instance)
}, ctx, cb, userData)
}
// Closes the pubsub subscription to stop receiving messages matching a content filter
@ -71,7 +76,12 @@ func waku_relay_topics(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
// }
//
//export waku_relay_unsubscribe
func waku_relay_unsubscribe(filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
err := library.RelayUnsubscribe(C.GoString(filterJSON))
func waku_relay_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.RelayUnsubscribe(instance, C.GoString(filterJSON))
return onError(err, cb, userData)
}

View File

@ -39,10 +39,10 @@ import (
// (in milliseconds) is reached, or an error will be returned
//
//export waku_store_query
func waku_store_query(queryJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.StoreQuery(C.GoString(queryJSON), C.GoString(peerID), int(ms))
}, cb, userData)
func waku_store_query(ctx unsafe.Pointer, queryJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.StoreQuery(instance, C.GoString(queryJSON), C.GoString(peerID), int(ms))
}, ctx, cb, userData)
}
// Query historic messages stored in the localDB using waku store protocol.
@ -72,8 +72,8 @@ func waku_store_query(queryJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCall
// Requires the `store` option to be passed when setting up the initial configuration
//
//export waku_store_local_query
func waku_store_local_query(queryJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func() (string, error) {
return library.StoreLocalQuery(C.GoString(queryJSON))
}, cb, userData)
func waku_store_local_query(ctx unsafe.Pointer, queryJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
return singleFnExec(func(instance *library.WakuInstance) (string, error) {
return library.StoreLocalQuery(instance, C.GoString(queryJSON))
}, ctx, cb, userData)
}

View File

@ -9,6 +9,8 @@ import "C"
import (
"errors"
"unsafe"
"github.com/waku-org/go-waku/library"
)
const ret_ok = 0
@ -51,3 +53,12 @@ func onError(err error, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
C._waku_execCB(cb, C.int(retCode), nil, userData)
return ret_ok
}
func getInstance(wakuCtx unsafe.Pointer) (*library.WakuInstance, error) {
pid := (*uint)(wakuCtx)
if pid == nil {
return nil, errors.New("invalid waku context")
}
return library.GetInstance(*pid)
}

View File

@ -58,34 +58,37 @@ func DNSDiscovery(url string, nameserver string, ms int) (string, error) {
}
// StartDiscoveryV5 starts discv5 discovery
func StartDiscoveryV5() error {
if wakuState.node == nil {
return errWakuNodeNotReady
func StartDiscoveryV5(instance *WakuInstance) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
if wakuState.node.DiscV5() == nil {
if instance.node.DiscV5() == nil {
return errors.New("DiscV5 is not mounted")
}
return wakuState.node.DiscV5().Start(context.Background())
return instance.node.DiscV5().Start(instance.ctx)
}
// StopDiscoveryV5 stops discv5 discovery
func StopDiscoveryV5() error {
if wakuState.node == nil {
return errWakuNodeNotReady
func StopDiscoveryV5(instance *WakuInstance) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
if wakuState.node.DiscV5() == nil {
if instance.node.DiscV5() == nil {
return errors.New("DiscV5 is not mounted")
}
wakuState.node.DiscV5().Stop()
instance.node.DiscV5().Stop()
return nil
}
// SetBootnodes is used to update the bootnodes receiving a JSON array of ENRs
func SetBootnodes(bootnodes string) error {
if wakuState.node == nil {
return errWakuNodeNotReady
func SetBootnodes(instance *WakuInstance, bootnodes string) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
if wakuState.node.DiscV5() == nil {
if instance.node.DiscV5() == nil {
return errors.New("DiscV5 is not mounted")
}
@ -112,5 +115,5 @@ func SetBootnodes(bootnodes string) error {
nodes = append(nodes, node)
}
return wakuState.node.DiscV5().SetBootnodes(nodes)
return instance.node.DiscV5().SetBootnodes(nodes)
}

View File

@ -36,24 +36,24 @@ type subscribeResult struct {
}
// FilterSubscribe is used to create a subscription to a filter node to receive messages
func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) (string, error) {
cf, err := toContentFilter(filterJSON)
if err != nil {
return "", err
}
if wakuState.node == nil {
return "", errWakuNodeNotReady
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
@ -67,7 +67,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
}
subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
subscriptions, err := instance.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
if err != nil && subscriptions == nil {
return "", err
}
@ -75,7 +75,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
send(instance, "message", toSubscriptionMessage(envelope))
}
}(subscriptionDetails)
}
@ -89,19 +89,19 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
}
// FilterPing is used to determine if a peer has an active subscription
func FilterPing(peerID string, ms int) error {
if wakuState.node == nil {
return errWakuNodeNotReady
func FilterPing(instance *WakuInstance, peerID string, ms int) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
var pID peer.ID
@ -115,28 +115,28 @@ func FilterPing(peerID string, ms int) error {
return errors.New("peerID is required")
}
return wakuState.node.FilterLightnode().Ping(ctx, pID)
return instance.node.FilterLightnode().Ping(ctx, pID)
}
// FilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
func FilterUnsubscribe(filterJSON string, peerID string, ms int) error {
func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error {
cf, err := toContentFilter(filterJSON)
if err != nil {
return err
}
if wakuState.node == nil {
return errWakuNodeNotReady
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
@ -150,7 +150,7 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error {
return errors.New("peerID is required")
}
result, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
result, err := instance.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
if err != nil {
return err
}
@ -168,19 +168,19 @@ type unsubscribeAllResult struct {
}
// FilterUnsubscribeAll is used to remove an active subscription to a peer. If no peerID is defined, it will stop all active filter subscriptions
func FilterUnsubscribeAll(peerID string, ms int) (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
@ -194,7 +194,7 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) {
fOptions = append(fOptions, filter.UnsubscribeAll())
}
result, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
result, err := instance.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
if err != nil {
return "", err
}

View File

@ -34,24 +34,24 @@ func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, erro
// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages
// Deprecated: Use FilterSubscribe instead
func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) error {
func LegacyFilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error {
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
return err
}
if wakuState.node == nil {
return errWakuNodeNotReady
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
var fOptions []legacy_filter.FilterSubscribeOption
@ -65,14 +65,14 @@ func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) error {
fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection())
}
_, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
_, f, err := instance.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
if err != nil {
return err
}
go func(f legacy_filter.Filter) {
for envelope := range f.Chan {
send("message", toSubscriptionMessage(envelope))
send(instance, "message", toSubscriptionMessage(envelope))
}
}(f)
@ -81,25 +81,25 @@ func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) error {
// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead
func LegacyFilterUnsubscribe(filterJSON string, ms int) error {
func LegacyFilterUnsubscribe(instance *WakuInstance, filterJSON string, ms int) error {
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
return err
}
if wakuState.node == nil {
return errWakuNodeNotReady
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
return wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
return instance.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
}

View File

@ -11,19 +11,19 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
)
func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms int) (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func lightpushPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic string, peerID string, ms int) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
var lpOptions []lightpush.Option
@ -41,16 +41,16 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic))
}
hash, err := wakuState.node.Lightpush().Publish(ctx, msg, lpOptions...)
hash, err := instance.node.Lightpush().Publish(ctx, msg, lpOptions...)
return hexutil.Encode(hash), err
}
// LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol
func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) {
func LightpushPublish(instance *WakuInstance, messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) {
msg, err := wakuMessage(messageJSON)
if err != nil {
return "", err
}
return lightpushPublish(msg, pubsubTopic, peerID, ms)
return lightpushPublish(instance, msg, pubsubTopic, peerID, ms)
}

View File

@ -7,68 +7,135 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// NewNode initializes a waku node. Receives a JSON string containing the configuration, and use default values for those config items not specified
func NewNode(configJSON string) string {
err := library.NewNode(configJSON)
return makeJSONResponse(err)
// NewNode initializes a waku node.
// Receives a JSON string containing the configuration, and use default values for those config items not specified
// Returns an instance id
func NewNode(instanceID uint, configJSON string) string {
instance := library.Init()
err := library.NewNode(instance, configJSON)
if err != nil {
_ = library.Free(instance)
}
return prepareJSONResponse(instance.ID, err)
}
// Start starts the waku node
func Start() string {
err := library.Start()
func Start(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.Start(instance)
return makeJSONResponse(err)
}
// Stop stops a waku node
func Stop() string {
err := library.Stop()
func Stop(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.Stop(instance)
return makeJSONResponse(err)
}
// Release resources allocated to a waku node
func Free(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.Free(instance)
return makeJSONResponse(err)
}
// IsStarted is used to determine is a node is started or not
func IsStarted() string {
return prepareJSONResponse(library.IsStarted(), nil)
func IsStarted(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
return prepareJSONResponse(library.IsStarted(instance), nil)
}
// PeerID is used to obtain the peer ID of the waku node
func PeerID() string {
peerID, err := library.PeerID()
func PeerID(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
peerID, err := library.PeerID(instance)
return prepareJSONResponse(peerID, err)
}
// ListenAddresses returns the multiaddresses the wakunode is listening to
func ListenAddresses() string {
addresses, err := library.ListenAddresses()
func ListenAddresses(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
addresses, err := library.ListenAddresses(instance)
return prepareJSONResponse(addresses, err)
}
// AddPeer adds a node multiaddress and protocol to the wakunode peerstore
func AddPeer(address string, protocolID string) string {
peerID, err := library.AddPeer(address, protocolID)
func AddPeer(instanceID uint, address string, protocolID string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
peerID, err := library.AddPeer(instance, address, protocolID)
return prepareJSONResponse(peerID, err)
}
// Connect is used to connect to a peer at multiaddress. if ms > 0, cancel the function execution if it takes longer than N milliseconds
func Connect(address string, ms int) string {
err := library.Connect(address, ms)
func Connect(instanceID uint, address string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.Connect(instance, address, ms)
return makeJSONResponse(err)
}
// ConnectPeerID is usedd to connect to a known peer by peerID. if ms > 0, cancel the function execution if it takes longer than N milliseconds
func ConnectPeerID(peerID string, ms int) string {
err := library.ConnectPeerID(peerID, ms)
func ConnectPeerID(instanceID uint, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.ConnectPeerID(instance, peerID, ms)
return makeJSONResponse(err)
}
// Disconnect closes a connection to a known peer by peerID
func Disconnect(peerID string) string {
err := library.Disconnect(peerID)
func Disconnect(instanceID uint, peerID string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.Disconnect(instance, peerID)
return makeJSONResponse(err)
}
// PeerCnt returns the number of connected peers
func PeerCnt() string {
peerCnt, err := library.PeerCnt()
func PeerCnt(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
peerCnt, err := library.PeerCnt(instance)
return prepareJSONResponse(peerCnt, err)
}
@ -84,7 +151,12 @@ func DefaultPubsubTopic() string {
}
// Peers retrieves the list of peers known by the waku node
func Peers() string {
peers, err := library.Peers()
func Peers(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
peers, err := library.Peers(instance)
return prepareJSONResponse(peers, err)
}

View File

@ -11,19 +11,34 @@ func DNSDiscovery(url string, nameserver string, ms int) string {
}
// StartDiscoveryV5 starts discv5 discovery
func StartDiscoveryV5() string {
err := library.StartDiscoveryV5()
func StartDiscoveryV5(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.StartDiscoveryV5(instance)
return makeJSONResponse(err)
}
// StopDiscoveryV5 stops discv5 discovery
func StopDiscoveryV5() string {
err := library.StopDiscoveryV5()
func StopDiscoveryV5(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.StopDiscoveryV5(instance)
return makeJSONResponse(err)
}
// SetBootnodes is used to update the bootnodes receiving a JSON array of ENRs
func SetBootnodes(bootnodes string) string {
err := library.SetBootnodes(bootnodes)
func SetBootnodes(instanceID uint, bootnodes string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.SetBootnodes(instance, bootnodes)
return makeJSONResponse(err)
}

View File

@ -5,25 +5,45 @@ import (
)
// FilterSubscribe is used to create a subscription to a filter node to receive messages
func FilterSubscribe(filterJSON string, peerID string, ms int) string {
response, err := library.FilterSubscribe(filterJSON, peerID, ms)
func FilterSubscribe(instanceID uint, filterJSON string, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
response, err := library.FilterSubscribe(instance, filterJSON, peerID, ms)
return prepareJSONResponse(response, err)
}
// FilterPing is used to determine if a peer has an active subscription
func FilterPing(peerID string, ms int) string {
err := library.FilterPing(peerID, ms)
func FilterPing(instanceID uint, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.FilterPing(instance, peerID, ms)
return makeJSONResponse(err)
}
// FilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
func FilterUnsubscribe(filterJSON string, peerID string, ms int) string {
err := library.FilterUnsubscribe(filterJSON, peerID, ms)
func FilterUnsubscribe(instanceID uint, filterJSON string, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.FilterUnsubscribe(instance, filterJSON, peerID, ms)
return makeJSONResponse(err)
}
// FilterUnsubscribeAll is used to remove an active subscription to a peer. If no peerID is defined, it will stop all active filter subscriptions
func FilterUnsubscribeAll(peerID string, ms int) string {
response, err := library.FilterUnsubscribeAll(peerID, ms)
func FilterUnsubscribeAll(instanceID uint, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
response, err := library.FilterUnsubscribeAll(instance, peerID, ms)
return prepareJSONResponse(response, err)
}

View File

@ -6,14 +6,24 @@ import (
// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages
// Deprecated: Use FilterSubscribe instead
func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) string {
err := library.LegacyFilterSubscribe(filterJSON, peerID, ms)
func LegacyFilterSubscribe(instanceID uint, filterJSON string, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.LegacyFilterSubscribe(instance, filterJSON, peerID, ms)
return makeJSONResponse(err)
}
// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead
func LegacyFilterUnsubscribe(filterJSON string, ms int) string {
err := library.LegacyFilterUnsubscribe(filterJSON, ms)
func LegacyFilterUnsubscribe(instanceID uint, filterJSON string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.LegacyFilterUnsubscribe(instance, filterJSON, ms)
return makeJSONResponse(err)
}

View File

@ -3,7 +3,12 @@ package gowaku
import "github.com/waku-org/go-waku/library"
// LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol
func LightpushPublish(messageJSON string, topic string, peerID string, ms int) string {
response, err := library.LightpushPublish(messageJSON, topic, peerID, ms)
func LightpushPublish(instanceID uint, messageJSON string, topic string, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
response, err := library.LightpushPublish(instance, messageJSON, topic, peerID, ms)
return prepareJSONResponse(response, err)
}

View File

@ -5,31 +5,56 @@ import (
)
// RelayEnoughPeers determines if there are enough peers to publish a message on a topic
func RelayEnoughPeers(topic string) string {
response, err := library.RelayEnoughPeers(topic)
func RelayEnoughPeers(instanceID uint, topic string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
response, err := library.RelayEnoughPeers(instance, topic)
return prepareJSONResponse(response, err)
}
// RelayPublish publishes a message using waku relay and returns the message ID
func RelayPublish(messageJSON string, topic string, ms int) string {
hash, err := library.RelayPublish(messageJSON, topic, ms)
func RelayPublish(instanceID uint, messageJSON string, topic string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
hash, err := library.RelayPublish(instance, messageJSON, topic, ms)
return prepareJSONResponse(hash, err)
}
// RelaySubscribe subscribes to a WakuRelay topic.
func RelaySubscribe(topic string) string {
err := library.RelaySubscribe(topic)
func RelaySubscribe(instanceID uint, topic string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.RelaySubscribe(instance, topic)
return makeJSONResponse(err)
}
// RelayTopics returns a list of pubsub topics the node is subscribed to in WakuRelay
func RelayTopics() string {
topics, err := library.RelayTopics()
func RelayTopics(instanceID uint) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
topics, err := library.RelayTopics(instance)
return prepareJSONResponse(topics, err)
}
// RelayUnsubscribe closes the pubsub subscription to a pubsub topic
func RelayUnsubscribe(topic string) string {
err := library.RelayUnsubscribe(topic)
func RelayUnsubscribe(instanceID uint, topic string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.RelayUnsubscribe(instance, topic)
return makeJSONResponse(err)
}

View File

@ -5,13 +5,23 @@ import (
)
// StoreQuery is used to retrieve historic messages using waku store protocol.
func StoreQuery(queryJSON string, peerID string, ms int) string {
response, err := library.StoreQuery(queryJSON, peerID, ms)
func StoreQuery(instanceID uint, queryJSON string, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
response, err := library.StoreQuery(instance, queryJSON, peerID, ms)
return prepareJSONResponse(response, err)
}
// StoreLocalQuery is used to retrieve historic messages stored in the localDB using waku store protocol.
func StoreLocalQuery(queryJSON string) string {
response, err := library.StoreLocalQuery(queryJSON)
func StoreLocalQuery(instanceID uint, queryJSON string) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
response, err := library.StoreLocalQuery(instance, queryJSON)
return prepareJSONResponse(response, err)
}

View File

@ -12,8 +12,13 @@ type SignalHandler interface {
// SetMobileSignalHandler setup geth callback to notify about new signal
// used for gomobile builds
// nolint
func SetMobileSignalHandler(handler SignalHandler) {
library.SetMobileSignalHandler(func(data []byte) {
func SetMobileSignalHandler(instanceID uint, handler SignalHandler) {
instance, err := library.GetInstance(instanceID)
if err != nil {
panic(err.Error()) // TODO: refactor to return an error instead
}
library.SetMobileSignalHandler(instance, func(data []byte) {
if len(data) > 0 {
handler.HandleSignal(string(data))
}

View File

@ -9,7 +9,9 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"
"unsafe"
"go.uber.org/zap/zapcore"
@ -29,19 +31,31 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
)
// WakuState represents the state of the waku node
type WakuState struct {
// WakuInstance represents the state of the waku node
type WakuInstance struct {
ctx context.Context
cancel context.CancelFunc
ID uint
node *node.WakuNode
node *node.WakuNode
cb unsafe.Pointer
mobileSignalHandler MobileSignalHandler
relayTopics []string
}
var wakuState WakuState
var wakuInstances map[uint]*WakuInstance
var wakuInstancesMutex sync.RWMutex
var errWakuNodeNotReady = errors.New("go-waku not initialized")
var errWakuNodeNotReady = errors.New("not initialized")
var errWakuNodeAlreadyConfigured = errors.New("already configured")
var errWakuNodeNotConfigured = errors.New("not configured")
var errWakuAlreadyStarted = errors.New("already started")
var errWakuNodeNotStarted = errors.New("not started")
func init() {
wakuInstances = make(map[uint]*WakuInstance)
}
func randomHex(n int) (string, error) {
bytes := make([]byte, n)
@ -51,10 +65,72 @@ func randomHex(n int) (string, error) {
return hex.EncodeToString(bytes), nil
}
func Init() *WakuInstance {
wakuInstancesMutex.Lock()
defer wakuInstancesMutex.Unlock()
id := uint(len(wakuInstances))
instance := &WakuInstance{
ID: id,
}
wakuInstances[id] = instance
return instance
}
func GetInstance(id uint) (*WakuInstance, error) {
wakuInstancesMutex.RLock()
defer wakuInstancesMutex.RUnlock()
instance, ok := wakuInstances[id]
if !ok {
return nil, errors.New("instance not found")
}
return instance, nil
}
type ValidationType int64
const (
None ValidationType = iota
MustBeStarted ValidationType = iota
MustBeStopped
NotConfigured
)
func validateInstance(instance *WakuInstance, validationType ValidationType) error {
if instance == nil {
return errWakuNodeNotReady
}
switch validationType {
case NotConfigured:
if instance.node != nil {
return errWakuNodeAlreadyConfigured
}
case MustBeStarted:
if instance.node == nil {
return errWakuNodeNotConfigured
}
if instance.ctx == nil {
return errWakuNodeNotStarted
}
case MustBeStopped:
if instance.node == nil {
return errWakuNodeNotConfigured
}
if instance.ctx != nil {
return errWakuAlreadyStarted
}
}
return nil
}
// NewNode initializes a waku node. Receives a JSON string containing the configuration, and use default values for those config items not specified
func NewNode(configJSON string) error {
if wakuState.node != nil {
return errors.New("go-waku already initialized. stop it first")
func NewNode(instance *WakuInstance, configJSON string) error {
if err := validateInstance(instance, NotConfigured); err != nil {
return err
}
config, err := getConfig(configJSON)
@ -152,7 +228,7 @@ func NewNode(configJSON string) error {
opts = append(opts, discv5Opts)
}
wakuState.relayTopics = config.RelayTopics
instance.relayTopics = config.RelayTopics
lvl, err := zapcore.ParseLevel(*config.LogLevel)
if err != nil {
@ -167,34 +243,43 @@ func NewNode(configJSON string) error {
return err
}
wakuState.node = w
instance.node = w
return nil
}
// Start starts the waku node
func Start() error {
if wakuState.node == nil {
return errWakuNodeNotReady
func stop(instance *WakuInstance) {
if instance.cancel != nil {
instance.node.Stop()
instance.cancel()
instance.cancel = nil
instance.ctx = nil
}
}
wakuState.ctx, wakuState.cancel = context.WithCancel(context.Background())
if err := wakuState.node.Start(wakuState.ctx); err != nil {
// Start starts the waku node
func Start(instance *WakuInstance) error {
if err := validateInstance(instance, MustBeStopped); err != nil {
return err
}
if wakuState.node.DiscV5() != nil {
if err := wakuState.node.DiscV5().Start(context.Background()); err != nil {
wakuState.node.Stop()
instance.ctx, instance.cancel = context.WithCancel(context.Background())
if err := instance.node.Start(instance.ctx); err != nil {
return err
}
if instance.node.DiscV5() != nil {
if err := instance.node.DiscV5().Start(context.Background()); err != nil {
stop(instance)
return err
}
}
for _, topic := range wakuState.relayTopics {
err := relaySubscribe(topic)
for _, topic := range instance.relayTopics {
err := relaySubscribe(instance, topic)
if err != nil {
wakuState.node.Stop()
stop(instance)
return err
}
}
@ -203,42 +288,55 @@ func Start() error {
}
// Stop stops a waku node
func Stop() error {
if wakuState.node == nil {
return errWakuNodeNotReady
func Stop(instance *WakuInstance) error {
if err := validateInstance(instance, None); err != nil {
return err
}
wakuState.node.Stop()
stop(instance)
wakuState.cancel()
return nil
}
wakuState.node = nil
// Free stops a waku instance and frees the resources allocated to a waku node
func Free(instance *WakuInstance) error {
if err := validateInstance(instance, None); err != nil {
return err
}
if instance.cancel != nil {
stop(instance)
}
wakuInstancesMutex.Lock()
defer wakuInstancesMutex.Unlock()
delete(wakuInstances, instance.ID)
return nil
}
// IsStarted is used to determine is a node is started or not
func IsStarted() bool {
return wakuState.node != nil
func IsStarted(instance *WakuInstance) bool {
return instance != nil && instance.ctx != nil
}
// PeerID is used to obtain the peer ID of the waku node
func PeerID() (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func PeerID(instance *WakuInstance) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
return wakuState.node.ID(), nil
return instance.node.ID(), nil
}
// ListenAddresses returns the multiaddresses the wakunode is listening to
func ListenAddresses() (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func ListenAddresses(instance *WakuInstance) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var addresses []string
for _, addr := range wakuState.node.ListenAddresses() {
for _, addr := range instance.node.ListenAddresses() {
addresses = append(addresses, addr.String())
}
@ -246,9 +344,9 @@ func ListenAddresses() (string, error) {
}
// AddPeer adds a node multiaddress and protocol to the wakunode peerstore
func AddPeer(address string, protocolID string) (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func AddPeer(instance *WakuInstance, address string, protocolID string) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
ma, err := multiaddr.NewMultiaddr(address)
@ -256,7 +354,7 @@ func AddPeer(address string, protocolID string) (string, error) {
return "", err
}
peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, wakuState.relayTopics, libp2pProtocol.ID(protocolID))
peerID, err := instance.node.AddPeer(ma, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
if err != nil {
return "", err
}
@ -265,28 +363,28 @@ func AddPeer(address string, protocolID string) (string, error) {
}
// Connect is used to connect to a peer at multiaddress. if ms > 0, cancel the function execution if it takes longer than N milliseconds
func Connect(address string, ms int) error {
if wakuState.node == nil {
return errWakuNodeNotReady
func Connect(instance *WakuInstance, address string, ms int) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
return wakuState.node.DialPeer(ctx, address)
return instance.node.DialPeer(ctx, address)
}
// ConnectPeerID is usedd to connect to a known peer by peerID. if ms > 0, cancel the function execution if it takes longer than N milliseconds
func ConnectPeerID(peerID string, ms int) error {
if wakuState.node == nil {
return errWakuNodeNotReady
func ConnectPeerID(instance *WakuInstance, peerID string, ms int) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
@ -298,19 +396,19 @@ func ConnectPeerID(peerID string, ms int) error {
}
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
return wakuState.node.DialPeerByID(ctx, pID)
return instance.node.DialPeerByID(ctx, pID)
}
// Disconnect closes a connection to a known peer by peerID
func Disconnect(peerID string) error {
if wakuState.node == nil {
return errWakuNodeNotReady
func Disconnect(instance *WakuInstance, peerID string) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
pID, err := peer.Decode(peerID)
@ -318,16 +416,16 @@ func Disconnect(peerID string) error {
return err
}
return wakuState.node.ClosePeerById(pID)
return instance.node.ClosePeerById(pID)
}
// PeerCnt returns the number of connected peers
func PeerCnt() (int, error) {
if wakuState.node == nil {
return 0, errWakuNodeNotReady
func PeerCnt(instance *WakuInstance) (int, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return 0, err
}
return wakuState.node.PeerCount(), nil
return instance.node.PeerCount(), nil
}
// ContentTopic creates a content topic string according to RFC 23
@ -356,12 +454,12 @@ func toSubscriptionMessage(msg *protocol.Envelope) *subscriptionMsg {
}
// Peers retrieves the list of peers known by the waku node
func Peers() (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func Peers(instance *WakuInstance) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
peers, err := wakuState.node.Peers()
peers, err := instance.node.Peers()
if err != nil {
return "", err
}

View File

@ -11,9 +11,9 @@ import (
)
// RelayEnoughPeers determines if there are enough peers to publish a message on a topic
func RelayEnoughPeers(topic string) (bool, error) {
if wakuState.node == nil {
return false, errWakuNodeNotReady
func RelayEnoughPeers(instance *WakuInstance, topic string) (bool, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return false, err
}
topicToCheck := protocol.DefaultPubsubTopic{}.String()
@ -21,45 +21,45 @@ func RelayEnoughPeers(topic string) (bool, error) {
topicToCheck = topic
}
return wakuState.node.Relay().EnoughPeersToPublishToTopic(topicToCheck), nil
return instance.node.Relay().EnoughPeersToPublishToTopic(topicToCheck), nil
}
func relayPublish(msg *pb.WakuMessage, pubsubTopic string, ms int) (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func relayPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic string, ms int) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
hash, err := wakuState.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic))
hash, err := instance.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic))
return hexutil.Encode(hash), err
}
// RelayPublish publishes a message using waku relay and returns the message ID
func RelayPublish(messageJSON string, topic string, ms int) (string, error) {
func RelayPublish(instance *WakuInstance, messageJSON string, topic string, ms int) (string, error) {
msg, err := wakuMessage(messageJSON)
if err != nil {
return "", err
}
return relayPublish(msg, topic, int(ms))
return relayPublish(instance, msg, topic, int(ms))
}
func relaySubscribe(filterJSON string) error {
func relaySubscribe(instance *WakuInstance, filterJSON string) error {
cf, err := toContentFilter(filterJSON)
if err != nil {
return err
}
subscriptions, err := wakuState.node.Relay().Subscribe(context.Background(), cf)
subscriptions, err := instance.node.Relay().Subscribe(context.Background(), cf)
if err != nil {
return err
}
@ -67,7 +67,7 @@ func relaySubscribe(filterJSON string) error {
for _, sub := range subscriptions {
go func(subscription *relay.Subscription) {
for envelope := range subscription.Ch {
send("message", toSubscriptionMessage(envelope))
send(instance, "message", toSubscriptionMessage(envelope))
}
}(sub)
}
@ -76,33 +76,33 @@ func relaySubscribe(filterJSON string) error {
}
// RelaySubscribe subscribes to a WakuRelay topic.
func RelaySubscribe(contentFilterJSON string) error {
if wakuState.node == nil {
return errWakuNodeNotReady
func RelaySubscribe(instance *WakuInstance, contentFilterJSON string) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
return relaySubscribe(contentFilterJSON)
return relaySubscribe(instance, contentFilterJSON)
}
// RelayTopics returns a list of pubsub topics the node is subscribed to in WakuRelay
func RelayTopics() (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func RelayTopics(instance *WakuInstance) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
return marshalJSON(wakuState.node.Relay().Topics())
return marshalJSON(instance.node.Relay().Topics())
}
// RelayUnsubscribe closes the pubsub subscription to a pubsub topic
func RelayUnsubscribe(contentFilterJSON string) error {
func RelayUnsubscribe(instance *WakuInstance, contentFilterJSON string) error {
cf, err := toContentFilter(contentFilterJSON)
if err != nil {
return err
}
if wakuState.node == nil {
return errWakuNodeNotReady
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
return wakuState.node.Relay().Unsubscribe(context.Background(), cf)
return instance.node.Relay().Unsubscribe(context.Background(), cf)
}

View File

@ -8,16 +8,11 @@
#include "_cgo_export.h"
typedef void (*callback)(int retCode, const char *jsonEvent, void* userData);
callback gCallback = 0;
bool ServiceSignalEvent(const char *jsonEvent) {
if (gCallback) {
gCallback(0, jsonEvent, NULL);
bool ServiceSignalEvent(void *cb, const char *jsonEvent) {
if (cb) {
((callback)cb)(0, jsonEvent, NULL);
}
return true;
}
void SetEventCallback(void *cb) {
gCallback = (callback)cb;
}

View File

@ -4,8 +4,7 @@ package library
#include <stddef.h>
#include <stdbool.h>
#include <stdlib.h>
extern bool ServiceSignalEvent(const char *jsonEvent);
extern void SetEventCallback(void *cb);
extern bool ServiceSignalEvent(void *cb, const char *jsonEvent);
*/
import "C"
@ -36,7 +35,7 @@ func newEnvelope(signalType string, event interface{}) *signalEnvelope {
}
// send sends application signal (in JSON) upwards to application (via default notification handler)
func send(signalType string, event interface{}) {
func send(instance *WakuInstance, signalType string, event interface{}) {
signal := newEnvelope(signalType, event)
data, err := json.Marshal(&signal)
@ -45,25 +44,33 @@ func send(signalType string, event interface{}) {
return
}
// If a Go implementation of signal handler is set, let's use it.
if mobileSignalHandler != nil {
mobileSignalHandler(data)
if instance.mobileSignalHandler != nil {
instance.mobileSignalHandler(data)
} else {
// ...and fallback to C implementation otherwise.
dataStr := string(data)
str := C.CString(dataStr)
C.ServiceSignalEvent(str)
C.ServiceSignalEvent(instance.cb, str)
C.free(unsafe.Pointer(str))
}
}
// SetEventCallback is to set a callback in order to receive application
// signals which are used to react to asynchronous events in waku.
func SetEventCallback(cb unsafe.Pointer) {
C.SetEventCallback(cb)
func SetEventCallback(instance *WakuInstance, cb unsafe.Pointer) {
if err := validateInstance(instance, None); err != nil {
panic(err.Error())
}
instance.cb = cb
}
// SetMobileSignalHandler sets the callback to be executed when a signal
// is received in a mobile device
func SetMobileSignalHandler(m MobileSignalHandler) {
mobileSignalHandler = m
func SetMobileSignalHandler(instance *WakuInstance, m MobileSignalHandler) {
if err := validateInstance(instance, None); err != nil {
panic(err.Error())
}
instance.mobileSignalHandler = m
}

View File

@ -35,8 +35,8 @@ type storeMessagesReply struct {
Error string `json:"error,omitempty"`
}
func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.HistoryRequestOption) (string, error) {
res, err := wakuState.node.Store().Query(
func queryResponse(ctx context.Context, instance *WakuInstance, args storeMessagesArgs, options []store.HistoryRequestOption) (string, error) {
res, err := instance.node.Store().Query(
ctx,
store.Query{
PubsubTopic: args.Topic,
@ -64,9 +64,9 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.
}
// StoreQuery is used to retrieve historic messages using waku store protocol.
func StoreQuery(queryJSON string, peerID string, ms int) (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func StoreQuery(instance *WakuInstance, queryJSON string, peerID string, ms int) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var args storeMessagesArgs
@ -95,19 +95,19 @@ func StoreQuery(queryJSON string, peerID string, ms int) (string, error) {
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
ctx = instance.ctx
}
return queryResponse(ctx, args, options)
return queryResponse(ctx, instance, args, options)
}
// StoreLocalQuery is used to retrieve historic messages stored in the localDB using waku store protocol.
func StoreLocalQuery(queryJSON string) (string, error) {
if wakuState.node == nil {
return "", errWakuNodeNotReady
func StoreLocalQuery(instance *WakuInstance, queryJSON string) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var args storeMessagesArgs
@ -123,5 +123,5 @@ func StoreLocalQuery(queryJSON string) (string, error) {
store.WithLocalQuery(),
}
return queryResponse(context.TODO(), args, options)
return queryResponse(instance.ctx, instance, args, options)
}