chore: Cbindings allow mounting the Store protocol from libwaku (#2276)

* libwaku: add changes to mount store in self-node
* libwaku: remove unnecessary code for store
This commit is contained in:
Ivan FB 2023-12-11 08:49:13 +01:00 committed by GitHub
parent 281c13a429
commit 28142f40b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 300 additions and 16 deletions

View File

@ -28,6 +28,13 @@ struct ConfigNode {
char key[128]; char key[128];
int relay; int relay;
char peers[2048]; char peers[2048];
int store;
char storeNode[2048];
char storeRetentionPolicy[64];
char storeDbUrl[256];
int storeVacuum;
int storeDbMigration;
int storeMaxNumDbConnections;
}; };
// libwaku Context // libwaku Context
@ -247,6 +254,14 @@ int main(int argc, char** argv) {
cfgNode.port = 60000; cfgNode.port = 60000;
cfgNode.relay = 1; cfgNode.relay = 1;
cfgNode.store = 1;
snprintf(cfgNode.storeNode, 2048, "");
snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000");
snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres");
cfgNode.storeVacuum = 0;
cfgNode.storeDbMigration = 0;
cfgNode.storeMaxNumDbConnections = 30;
if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode)
== ARGP_ERR_UNKNOWN) { == ARGP_ERR_UNKNOWN) {
show_help_and_exit(); show_help_and_exit();
@ -254,16 +269,24 @@ int main(int argc, char** argv) {
ctx = waku_init(event_handler, userData); ctx = waku_init(event_handler, userData);
char jsonConfig[1024]; char jsonConfig[2048];
snprintf(jsonConfig, 1024, "{ \ snprintf(jsonConfig, 2048, "{ \
\"host\": \"%s\", \ \"host\": \"%s\", \
\"port\": %d, \ \"port\": %d, \
\"key\": \"%s\", \ \"key\": \"%s\", \
\"relay\": %s \ \"relay\": %s, \
\"store\": %s, \
\"storeDbUrl\": \"%s\", \
\"storeRetentionPolicy\": \"%s\", \
\"storeMaxNumDbConnections\": %d \
}", cfgNode.host, }", cfgNode.host,
cfgNode.port, cfgNode.port,
cfgNode.key, cfgNode.key,
cfgNode.relay ? "true":"false"); cfgNode.relay ? "true":"false",
cfgNode.store ? "true":"false",
cfgNode.storeDbUrl,
cfgNode.storeRetentionPolicy,
cfgNode.storeMaxNumDbConnections);
WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) ); WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) );
WAKU_CALL( waku_version(&ctx, print_waku_version, userData) ); WAKU_CALL( waku_version(&ctx, print_waku_version, userData) );
@ -272,7 +295,6 @@ int main(int argc, char** argv) {
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");
WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) ); WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) );
waku_set_event_callback(event_handler, userData); waku_set_event_callback(event_handler, userData);
waku_start(&ctx, event_handler, userData); waku_start(&ctx, event_handler, userData);
@ -288,6 +310,7 @@ int main(int argc, char** argv) {
"/waku/2/default-waku/proto", "/waku/2/default-waku/proto",
event_handler, event_handler,
userData) ); userData) );
show_main_menu(); show_main_menu();
while(1) { while(1) {
handle_user_input(); handle_user_input();

5
library/callback.nim Normal file
View File

@ -0,0 +1,5 @@
type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}

View File

@ -18,8 +18,10 @@ import
./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request, ./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/waku_thread_request, ./waku_thread/inter_thread_communication/waku_thread_request,
./alloc ./alloc,
./callback
################################################################################ ################################################################################
### Wrapper around the waku node ### Wrapper around the waku node
@ -32,11 +34,6 @@ const RET_OK: cint = 0
const RET_ERR: cint = 1 const RET_ERR: cint = 1
const RET_MISSING_CALLBACK: cint = 2 const RET_MISSING_CALLBACK: cint = 2
type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}
### End of exported types ### End of exported types
################################################################################ ################################################################################
@ -348,5 +345,24 @@ proc waku_connect(ctx: ptr ptr Context,
return RET_OK return RET_OK
proc waku_store_query(ctx: ptr ptr Context,
queryJson: cstring,
peerId: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc.} =
ctx[][].userData = userData
## TODO: implement the logic that make the "self" node to act as a Store client
# if sendReqRes.isErr():
# let msg = $sendReqRes.error
# callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
# return RET_ERR
return RET_OK
### End of exported procs ### End of exported procs
################################################################################ ################################################################################

View File

@ -91,6 +91,71 @@ proc parseRelay(jsonNode: JsonNode,
return true return true
proc parseStore(jsonNode: JsonNode,
store: var bool,
storeNode: var string,
storeRetentionPolicy: var string,
storeDbUrl: var string,
storeVacuum: var bool,
storeDbMigration: var bool,
storeMaxNumDbConnections: var int,
jsonResp: var JsonEvent): bool =
if not jsonNode.contains("store"):
## the store parameter is not required. By default is is disabled
store = false
return true
if jsonNode["store"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The store config param should be a boolean");
return false
store = jsonNode["store"].getBool()
if jsonNode.contains("storeNode"):
if jsonNode["storeNode"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeNode config param should be a string");
return false
storeNode = jsonNode["storeNode"].getStr()
if jsonNode.contains("storeRetentionPolicy"):
if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeRetentionPolicy config param should be a string");
return false
storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr()
if jsonNode.contains("storeDbUrl"):
if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeDbUrl config param should be a string");
return false
storeDbUrl = jsonNode["storeDbUrl"].getStr()
if jsonNode.contains("storeVacuum"):
if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The storeVacuum config param should be a bool");
return false
storeVacuum = jsonNode["storeVacuum"].getBool()
if jsonNode.contains("storeDbMigration"):
if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The storeDbMigration config param should be a bool");
return false
storeDbMigration = jsonNode["storeDbMigration"].getBool()
if jsonNode.contains("storeMaxNumDbConnections"):
if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt:
jsonResp = JsonErrorEvent.new("The storeMaxNumDbConnections config param should be an int");
return false
storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt()
return true
proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) = proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) =
if jsonNode.contains("topics"): if jsonNode.contains("topics"):
for topic in jsonNode["topics"].items: for topic in jsonNode["topics"].items:
@ -103,6 +168,13 @@ proc parseConfig*(configNodeJson: string,
netConfig: var NetConfig, netConfig: var NetConfig,
relay: var bool, relay: var bool,
topics: var seq[string], topics: var seq[string],
store: var bool,
storeNode: var string,
storeRetentionPolicy: var string,
storeDbUrl: var string,
storeVacuum: var bool,
storeDbMigration: var bool,
storeMaxNumDbConnections: var int,
jsonResp: var JsonEvent): bool = jsonResp: var JsonEvent): bool =
if configNodeJson.len == 0: if configNodeJson.len == 0:
@ -110,7 +182,6 @@ proc parseConfig*(configNodeJson: string,
return false return false
var jsonNode: JsonNode var jsonNode: JsonNode
try: try:
jsonNode = parseJson(configNodeJson) jsonNode = parseJson(configNodeJson)
except JsonParsingError: except JsonParsingError:
@ -152,6 +223,11 @@ proc parseConfig*(configNodeJson: string,
# topics # topics
parseTopics(jsonNode, topics) parseTopics(jsonNode, topics)
# store
if not parseStore(jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl,
storeVacuum, storeDbMigration, storeMaxNumDbConnections, jsonResp):
return false
let wakuFlags = CapabilitiesBitfield.init( let wakuFlags = CapabilitiesBitfield.init(
lightpush = false, lightpush = false,
filter = false, filter = false,

View File

@ -3,6 +3,7 @@ import
std/options std/options
import import
chronos, chronos,
chronicles,
stew/results, stew/results,
stew/shims/net stew/shims/net
import import
@ -17,7 +18,12 @@ import
../../../../waku/node/waku_node, ../../../../waku/node/waku_node,
../../../../waku/node/builder, ../../../../waku/node/builder,
../../../../waku/node/config, ../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
../../../../waku/waku_archive/retention_policy/builder,
../../../../waku/waku_archive/retention_policy,
../../../../waku/waku_relay/protocol, ../../../../waku/waku_relay/protocol,
../../../../waku/waku_store,
../../../events/[json_error_event,json_message_event,json_base_event], ../../../events/[json_error_event,json_message_event,json_base_event],
../../../alloc, ../../../alloc,
../../config ../../config
@ -46,14 +52,77 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson) deallocShared(self[].configJson)
deallocShared(self) deallocShared(self)
proc configureStore(node: WakuNode,
storeNode: string,
storeRetentionPolicy: string,
storeDbUrl: string,
storeVacuum: bool,
storeDbMigration: bool,
storeMaxNumDbConnections: int):
Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg
## TODO: use a callback given as a parameter
discard
# Archive setup
let archiveDriverRes = ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onErrAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)
let retPolicyRes = RetentionPolicy.new(storeRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)
let mountArcRes = node.mountArchive(archiveDriverRes.get(),
retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)
# Store setup
try:
await mountStore(node)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
mountStoreClient(node)
if storeNode != "":
let storeNodeInfo = parsePeerInfo(storeNode)
if storeNodeInfo.isOk():
node.peerManager.addServicePeer(storeNodeInfo.value, WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNodeInfo.error)
return ok()
proc createNode(configJson: cstring): proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} = Future[Result[WakuNode, string]] {.async.} =
var privateKey: PrivateKey var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value Port(60000'u16)).value
## relay
var relay: bool var relay: bool
var topics = @[""] var topics = @[""]
## store
var store: bool
var storeNode: string
var storeRetentionPolicy: string
var storeDbUrl: string
var storeVacuum: bool
var storeDbMigration: bool
var storeMaxNumDbConnections: int
var jsonResp: JsonEvent var jsonResp: JsonEvent
if not parseConfig($configJson, if not parseConfig($configJson,
@ -61,6 +130,13 @@ proc createNode(configJson: cstring):
netConfig, netConfig,
relay, relay,
topics, topics,
store,
storeNode,
storeRetentionPolicy,
storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
jsonResp): jsonResp):
return err($jsonResp) return err($jsonResp)
@ -113,6 +189,15 @@ proc createNode(configJson: cstring):
await newNode.mountRelay() await newNode.mountRelay()
newNode.peerManager.start() newNode.peerManager.start()
if store:
(await newNode.configureStore(storeNode,
storeRetentionPolicy,
storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections)).isOkOr:
return err("error configuring store: " & $error)
return ok(newNode) return ok(newNode)
proc process*(self: ptr NodeLifecycleRequest, proc process*(self: ptr NodeLifecycleRequest,

View File

@ -0,0 +1,75 @@
import
std/[options,sequtils,strutils]
import
chronos,
stew/results,
stew/shims/net
import
../../../../../waku/node/waku_node,
../../../../../waku/waku_archive/driver/builder,
../../../../../waku/waku_archive/driver,
../../../../../waku/waku_archive/retention_policy/builder,
../../../../../waku/waku_archive/retention_policy,
../../../../alloc,
../../../../callback
type
StoreReqType* = enum
REMOTE_QUERY ## to perform a query to another Store node
LOCAL_QUERY ## to retrieve the data from 'self' node
type
StoreQueryRequest* = object
queryJson: cstring
peerAddr: cstring
timeoutMs: cint
storeCallback: WakuCallBack
type
StoreRequest* = object
operation: StoreReqType
storeReq: pointer
proc createShared*(T: type StoreRequest,
operation: StoreReqType,
request: pointer): ptr type T =
var ret = createShared(T)
ret[].request = request
return ret
proc createShared*(T: type StoreQueryRequest,
queryJson: cstring,
peerAddr: cstring,
timeoutMs: cint,
storeCallback: WakuCallBack = nil): ptr type T =
var ret = createShared(T)
ret[].timeoutMs = timeoutMs
ret[].queryJson = queryJson.alloc()
ret[].peerAddr = peerAddr.alloc()
ret[].storeCallback = storeCallback
return ret
proc destroyShared(self: ptr StoreQueryRequest) =
deallocShared(self[].queryJson)
deallocShared(self[].peerAddr)
deallocShared(self)
proc process(self: ptr StoreQueryRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
defer: destroyShared(self)
proc process*(self: ptr StoreRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
defer: deallocShared(self)
case self.operation:
of REMOTE_QUERY:
return await cast[ptr StoreQueryRequest](self[].storeReq).process(node)
of LOCAL_QUERY:
discard
# cast[ptr StoreQueryRequest](request[].reqContent).process(node)
return ok("")

View File

@ -12,13 +12,15 @@ import
../../../waku/node/waku_node, ../../../waku/node/waku_node,
./requests/node_lifecycle_request, ./requests/node_lifecycle_request,
./requests/peer_manager_request, ./requests/peer_manager_request,
./requests/protocols/relay_request ./requests/protocols/relay_request,
./requests/protocols/store_request
type type
RequestType* {.pure.} = enum RequestType* {.pure.} = enum
LIFECYCLE, LIFECYCLE,
PEER_MANAGER, PEER_MANAGER,
RELAY, RELAY,
STORE,
type type
InterThreadRequest* = object InterThreadRequest* = object
@ -50,6 +52,8 @@ proc process*(T: type InterThreadRequest,
cast[ptr PeerManagementRequest](request[].reqContent).process(node[]) cast[ptr PeerManagementRequest](request[].reqContent).process(node[])
of RELAY: of RELAY:
cast[ptr RelayRequest](request[].reqContent).process(node) cast[ptr RelayRequest](request[].reqContent).process(node)
of STORE:
cast[ptr StoreRequest](request[].reqContent).process(node)
return await retFut return await retFut