From 8826e2df66ddf1daaa329650bca4df690fbd3665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rich=CE=9Brd?= Date: Thu, 9 Mar 2023 11:48:25 -0400 Subject: [PATCH] feat: rendezvous server --- README.md | 2 +- cmd/waku/flags.go | 15 + cmd/waku/main.go | 2 + examples/basic2/go.mod | 1 + examples/basic2/go.sum | 2 + examples/c-bindings/main.c | 2 +- examples/c-bindings/main.h | 2 +- examples/chat2/go.mod | 1 + examples/chat2/go.sum | 2 + examples/filter2/go.mod | 1 + examples/filter2/go.sum | 2 + examples/noise/go.mod | 1 + examples/noise/go.sum | 2 + flake.nix | 2 +- go.mod | 1 + go.sum | 2 + waku/node.go | 26 +- waku/options.go | 6 + .../postgres/migrations/bindata.go | 48 ++ .../migrations/sql/3_rendezvous.down.sql | 3 + .../migrations/sql/3_rendezvous.up.sql | 11 + waku/persistence/sqlite/migrations/bindata.go | 48 ++ .../migrations/sql/3_rendezvous.down.sql | 3 + .../sqlite/migrations/sql/3_rendezvous.up.sql | 11 + waku/persistence/store.go | 25 +- waku/persistence/store_test.go | 7 +- waku/v2/node/wakunode2.go | 14 + waku/v2/node/wakuoptions.go | 14 + waku/v2/protocol/store/waku_store_common.go | 3 +- waku/v2/protocol/store/waku_store_protocol.go | 21 +- waku/v2/rendezvous/db.go | 439 ++++++++++++++++++ waku/v2/rendezvous/rendezvous.go | 52 +++ 32 files changed, 740 insertions(+), 31 deletions(-) create mode 100644 waku/persistence/postgres/migrations/sql/3_rendezvous.down.sql create mode 100644 waku/persistence/postgres/migrations/sql/3_rendezvous.up.sql create mode 100644 waku/persistence/sqlite/migrations/sql/3_rendezvous.down.sql create mode 100644 waku/persistence/sqlite/migrations/sql/3_rendezvous.up.sql create mode 100644 waku/v2/rendezvous/db.go create mode 100644 waku/v2/rendezvous/rendezvous.go diff --git a/README.md b/README.md index 917b40c5..a87fad3e 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ If you'd like to contribute to go-waku, please fork, fix, commit and send a pull To build and test this repository, you need: - [Go](https://golang.org/) (version 1.17 or later) - [protoc](https://grpc.io/docs/protoc-installation/) - - [Protocol Buffers for Go with Gadgets](https://github.com/gogo/protobuf) + - [protoc-gen-go](https://protobuf.dev/getting-started/gotutorial/#compiling-protocol-buffers) To enable the git hooks: diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 2951932a..8b24d189 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -363,6 +363,21 @@ var ( Destination: &options.DiscV5.AutoUpdate, EnvVars: []string{"WAKUNODE2_DISCV5_ENR_AUTO_UPDATE"}, }) + + Rendezvous = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "rendezvous", + Usage: "Enable rendezvous protocol server for peer discovery", + Destination: &options.Rendezvous.Enable, + EnvVars: []string{"WAKUNODE2_RENDEZVOUS"}, + }) + RendezvousNode = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ + Name: "rendezvous-node", + Usage: "Multiaddr of a waku2 rendezvous node. Option may be repeated", + Value: &cliutils.MultiaddrSlice{ + Values: &options.Rendezvous.Nodes, + }, + EnvVars: []string{"WAKUNODE2_RENDEZVOUSNODE"}, + }) PeerExchange = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "peer-exchange", Usage: "Enable waku peer exchange protocol (responder side)", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index af68d897..b024b943 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -73,6 +73,8 @@ func main() { DNSDiscovery, DNSDiscoveryUrl, DNSDiscoveryNameServer, + Rendezvous, + RendezvousNode, MetricsServer, MetricsServerAddress, MetricsServerPort, diff --git a/examples/basic2/go.mod b/examples/basic2/go.mod index 1127583e..fa289461 100644 --- a/examples/basic2/go.mod +++ b/examples/basic2/go.mod @@ -17,6 +17,7 @@ require ( github.com/beevik/ntp v0.3.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect diff --git a/examples/basic2/go.sum b/examples/basic2/go.sum index dd7a1d6b..819bb0e1 100644 --- a/examples/basic2/go.sum +++ b/examples/basic2/go.sum @@ -72,6 +72,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc= +github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= diff --git a/examples/c-bindings/main.c b/examples/c-bindings/main.c index d1cb049e..00835c1d 100644 --- a/examples/c-bindings/main.c +++ b/examples/c-bindings/main.c @@ -66,7 +66,7 @@ int main(int argc, char *argv[]) char *response; waku_set_event_callback(callBack); - char *configJSON = "{\"host\": \"0.0.0.0\", \"port\": 60000}"; + char *configJSON = "{\"host\": \"0.0.0.0\", \"port\": 60000, \"logLevel\":\"error\"}"; response = waku_new(configJSON); // configJSON can be NULL too to use defaults if (isError(response)) return 1; diff --git a/examples/c-bindings/main.h b/examples/c-bindings/main.h index 2342382f..aab99066 100644 --- a/examples/c-bindings/main.h +++ b/examples/c-bindings/main.h @@ -32,7 +32,7 @@ char *utils_extract_wakumessage_from_signal(const nx_json *signal) const char *contentTopic = nx_json_get(wakuMsgJson, "contentTopic")->text_value; int version = nx_json_get(wakuMsgJson, "version")->int_value; long long timestamp = nx_json_get(wakuMsgJson, "timestamp")->int_value; - char wakuMsg[1000]; + char wakuMsg[6000]; sprintf(wakuMsg, "{\"payload\":\"%s\",\"contentTopic\":\"%s\",\"timestamp\":%lld, \"version\":%d}", payload, contentTopic, timestamp, version); char *response = (char *)malloc(sizeof(char) * (strlen(wakuMsg) + 1)); strcpy(response, wakuMsg); diff --git a/examples/chat2/go.mod b/examples/chat2/go.mod index 7ee07e98..d8377ac2 100644 --- a/examples/chat2/go.mod +++ b/examples/chat2/go.mod @@ -30,6 +30,7 @@ require ( github.com/beevik/ntp v0.3.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect diff --git a/examples/chat2/go.sum b/examples/chat2/go.sum index 5c5f9921..3e913ec6 100644 --- a/examples/chat2/go.sum +++ b/examples/chat2/go.sum @@ -76,6 +76,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc= +github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= diff --git a/examples/filter2/go.mod b/examples/filter2/go.mod index 9ab9c178..db9572e9 100644 --- a/examples/filter2/go.mod +++ b/examples/filter2/go.mod @@ -17,6 +17,7 @@ require ( github.com/beevik/ntp v0.3.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect diff --git a/examples/filter2/go.sum b/examples/filter2/go.sum index dd7a1d6b..819bb0e1 100644 --- a/examples/filter2/go.sum +++ b/examples/filter2/go.sum @@ -72,6 +72,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc= +github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= diff --git a/examples/noise/go.mod b/examples/noise/go.mod index fce1f5ad..aee40ba3 100644 --- a/examples/noise/go.mod +++ b/examples/noise/go.mod @@ -20,6 +20,7 @@ require ( github.com/beevik/ntp v0.3.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect diff --git a/examples/noise/go.sum b/examples/noise/go.sum index c226b545..d08eb943 100644 --- a/examples/noise/go.sum +++ b/examples/noise/go.sum @@ -72,6 +72,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc= +github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= diff --git a/flake.nix b/flake.nix index 049c98a5..b7a76be3 100644 --- a/flake.nix +++ b/flake.nix @@ -25,7 +25,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-yED55+XK/JVplsVcZQin2RBECIUt3XMr0crwt+X2S2Q="; + vendorSha256 = "sha256-6oAZkdIibYESZJhxu1aQqctj6nUBKdc6dSmM694Ustc="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index 7dc35f28..9bba5911 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( ) require ( + github.com/berty/go-libp2p-rendezvous v0.4.1 github.com/go-chi/chi/v5 v5.0.0 github.com/lib/pq v1.10.4 github.com/waku-org/go-noise v0.0.4 diff --git a/go.sum b/go.sum index 93c33ed3..b6070eef 100644 --- a/go.sum +++ b/go.sum @@ -211,6 +211,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc= +github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= diff --git a/waku/node.go b/waku/node.go index 74cb447c..8c720198 100644 --- a/waku/node.go +++ b/waku/node.go @@ -14,6 +14,7 @@ import ( "time" wmetrics "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" @@ -57,6 +58,10 @@ func failOnErr(err error, msg string) { } } +func requiresDB(options Options) bool { + return options.Store.Enable || options.Rendezvous.Enable +} + const dialTimeout = 7 * time.Second // Execute starts a go-waku node with settings determined by the Options parameter @@ -81,7 +86,7 @@ func Execute(options Options) { var db *sql.DB var migrationFn func(*sql.DB) error - if options.Store.Enable { + if requiresDB(options) { db, migrationFn, err = ExtractDBAndMigration(options.Store.DatabaseURL) failOnErr(err, "Could not connect to DB") } @@ -172,17 +177,22 @@ func Execute(options Options) { } } - if options.Store.Enable { - nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...)) - dbStore, err := persistence.NewDBStore(logger, + var dbStore *persistence.DBStore + if requiresDB(options) { + dbStore, err = persistence.NewDBStore(logger, persistence.WithDB(db), - persistence.WithMigrations(migrationFn), + persistence.WithMigrations(migrationFn), // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime), ) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } + if options.Store.Enable { + nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...)) + nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) + } + if options.LightPush.Enable { nodeOpts = append(nodeOpts, node.WithLightPush()) } @@ -232,6 +242,11 @@ func Execute(options Options) { nodeOpts = append(nodeOpts, node.WithPeerExchange()) } + if options.Rendezvous.Enable { + rdb := rendezvous.NewDB(ctx, db, logger) + nodeOpts = append(nodeOpts, node.WithRendezvousServer(rdb)) + } + checkForRLN(logger, options, &nodeOpts) wakuNode, err := node.New(nodeOpts...) @@ -242,6 +257,7 @@ func Execute(options Options) { addPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4) addPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1) + addPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID) if options.Filter.UseV2 { addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1) diff --git a/waku/options.go b/waku/options.go index 02efef7b..659e10b9 100644 --- a/waku/options.go +++ b/waku/options.go @@ -139,6 +139,11 @@ type PeerExchangeOptions struct { Node *multiaddr.Multiaddr } +type RendezvousOptions struct { + Enable bool + Nodes []multiaddr.Multiaddr +} + // Options contains all the available features and settings that can be // configured via flags when executing go-waku as a service. type Options struct { @@ -172,6 +177,7 @@ type Options struct { RLNRelay RLNRelayOptions DiscV5 DiscV5Options DNSDiscovery DNSDiscoveryOptions + Rendezvous RendezvousOptions Metrics MetricsOptions RPCServer RPCServerOptions RESTServer RESTServerOptions diff --git a/waku/persistence/postgres/migrations/bindata.go b/waku/persistence/postgres/migrations/bindata.go index e54604c8..3d5dc89e 100644 --- a/waku/persistence/postgres/migrations/bindata.go +++ b/waku/persistence/postgres/migrations/bindata.go @@ -4,6 +4,8 @@ // 1_messages.up.sql (452B) // 2_messages_index.down.sql (60B) // 2_messages_index.up.sql (226B) +// 3_rendezvous.down.sql (65B) +// 3_rendezvous.up.sql (181B) // doc.go (74B) package migrations @@ -153,6 +155,46 @@ func _2_messages_indexUpSql() (*asset, error) { return a, nil } +var __3_rendezvousDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\x08\x4a\x4d\xcf\x2c\x2e\x29\x4a\x2c\xc9\xcc\xcf\x2b\xb6\xe6\xe2\xc2\xaa\xc8\x2f\x3f\x2f\x39\xd5\x9a\x0b\x10\x00\x00\xff\xff\x58\x3f\x49\x49\x41\x00\x00\x00") + +func _3_rendezvousDownSqlBytes() ([]byte, error) { + return bindataRead( + __3_rendezvousDownSql, + "3_rendezvous.down.sql", + ) +} + +func _3_rendezvousDownSql() (*asset, error) { + bytes, err := _3_rendezvousDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "3_rendezvous.down.sql", size: 65, mode: os.FileMode(0664), modTime: time.Unix(1678306445, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1f, 0x4b, 0xc0, 0x7d, 0x4f, 0xac, 0xc4, 0x75, 0x59, 0xcc, 0xfc, 0x1a, 0x6c, 0x18, 0x81, 0x29, 0x24, 0x33, 0x3, 0x10, 0x39, 0xd0, 0x67, 0x28, 0xa0, 0xe0, 0xfd, 0x36, 0x91, 0x25, 0x37, 0x83}} + return a, nil +} + +var __3_rendezvousUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\xce\xc1\x0a\x82\x40\x10\xc6\xf1\xfb\x3e\xc5\x77\x54\xf0\x18\x5d\x3a\x8d\x32\x94\x64\x11\xd3\x12\x78\x14\x1d\xc2\xcb\x2a\xb3\x1b\xf4\xf8\x81\x66\xd0\x6d\xf8\xcd\x77\xf8\x57\xc2\xe4\x19\x9e\xca\x86\x61\xfa\x1c\x63\xb2\x2e\x8d\x53\x88\xc8\x1c\x00\xf4\xd3\x2b\x24\x35\xdc\x59\x6a\x6a\x70\x93\xfa\x42\xd2\xe2\xcc\x6d\xb1\xfc\x67\x55\xc3\x83\xa4\x3a\x91\x64\xfb\x5d\xbe\x6a\x88\x9b\x15\x58\x40\xdf\xf3\x68\x8a\xfa\xea\xf9\xc8\x1b\x76\xc3\x60\x11\x65\xeb\x99\x5c\x7e\x70\xee\x2f\x26\x4c\xa1\xd7\x6f\xc4\x7a\xff\x86\x9f\x00\x00\x00\xff\xff\x00\x70\x80\x83\xb5\x00\x00\x00") + +func _3_rendezvousUpSqlBytes() ([]byte, error) { + return bindataRead( + __3_rendezvousUpSql, + "3_rendezvous.up.sql", + ) +} + +func _3_rendezvousUpSql() (*asset, error) { + bytes, err := _3_rendezvousUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "3_rendezvous.up.sql", size: 181, mode: os.FileMode(0664), modTime: time.Unix(1678306380, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x5c, 0xb8, 0x4f, 0x88, 0xe9, 0xc6, 0xc, 0xbb, 0x2e, 0x56, 0xa2, 0xcd, 0x9, 0xfa, 0x33, 0x94, 0xd7, 0x73, 0xc1, 0xa, 0xc5, 0x69, 0xfb, 0x9f, 0x75, 0xdb, 0x75, 0x58, 0x20, 0x5e, 0xf, 0x14}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -272,6 +314,10 @@ var _bindata = map[string]func() (*asset, error){ "2_messages_index.up.sql": _2_messages_indexUpSql, + "3_rendezvous.down.sql": _3_rendezvousDownSql, + + "3_rendezvous.up.sql": _3_rendezvousUpSql, + "doc.go": docGo, } @@ -320,6 +366,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}}, "2_messages_index.down.sql": &bintree{_2_messages_indexDownSql, map[string]*bintree{}}, "2_messages_index.up.sql": &bintree{_2_messages_indexUpSql, map[string]*bintree{}}, + "3_rendezvous.down.sql": &bintree{_3_rendezvousDownSql, map[string]*bintree{}}, + "3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/waku/persistence/postgres/migrations/sql/3_rendezvous.down.sql b/waku/persistence/postgres/migrations/sql/3_rendezvous.down.sql new file mode 100644 index 00000000..053a98e3 --- /dev/null +++ b/waku/persistence/postgres/migrations/sql/3_rendezvous.down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS Registrations; + +DROP TABLE IF EXISTS Nonce; diff --git a/waku/persistence/postgres/migrations/sql/3_rendezvous.up.sql b/waku/persistence/postgres/migrations/sql/3_rendezvous.up.sql new file mode 100644 index 00000000..fa03ce93 --- /dev/null +++ b/waku/persistence/postgres/migrations/sql/3_rendezvous.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE registrations ( + counter SERIAL PRIMARY KEY, + peer VARCHAR(64), + ns VARCHAR, + expire INTEGER, + addrs BYTEA +); + +CREATE TABLE nonce ( + nonce BYTEA +); diff --git a/waku/persistence/sqlite/migrations/bindata.go b/waku/persistence/sqlite/migrations/bindata.go index 26c595bd..ddf415f5 100644 --- a/waku/persistence/sqlite/migrations/bindata.go +++ b/waku/persistence/sqlite/migrations/bindata.go @@ -4,6 +4,8 @@ // 1_messages.up.sql (464B) // 2_messages_index.down.sql (60B) // 2_messages_index.up.sql (226B) +// 3_rendezvous.down.sql (65B) +// 3_rendezvous.up.sql (204B) // doc.go (74B) package migrations @@ -153,6 +155,46 @@ func _2_messages_indexUpSql() (*asset, error) { return a, nil } +var __3_rendezvousDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\x08\x4a\x4d\xcf\x2c\x2e\x29\x4a\x2c\xc9\xcc\xcf\x2b\xb6\xe6\xe2\xc2\xaa\xc8\x2f\x3f\x2f\x39\xd5\x9a\x0b\x10\x00\x00\xff\xff\x58\x3f\x49\x49\x41\x00\x00\x00") + +func _3_rendezvousDownSqlBytes() ([]byte, error) { + return bindataRead( + __3_rendezvousDownSql, + "3_rendezvous.down.sql", + ) +} + +func _3_rendezvousDownSql() (*asset, error) { + bytes, err := _3_rendezvousDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "3_rendezvous.down.sql", size: 65, mode: os.FileMode(0664), modTime: time.Unix(1678305816, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1f, 0x4b, 0xc0, 0x7d, 0x4f, 0xac, 0xc4, 0x75, 0x59, 0xcc, 0xfc, 0x1a, 0x6c, 0x18, 0x81, 0x29, 0x24, 0x33, 0x3, 0x10, 0x39, 0xd0, 0x67, 0x28, 0xa0, 0xe0, 0xfd, 0x36, 0x91, 0x25, 0x37, 0x83}} + return a, nil +} + +var __3_rendezvousUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\xce\xb1\xaa\x83\x40\x10\x85\xe1\x7e\x9f\xe2\x94\x0a\x96\x97\xdb\xa4\x1a\x65\x48\x96\xc4\x4d\x18\x36\x01\x4b\xd1\x21\xd8\xac\xb2\x1a\xc8\xe3\x07\x56\x2d\xec\x86\x8f\x9f\xe1\x54\xc2\xe4\x19\x9e\xca\x1b\x23\xea\x7b\x98\x97\xd8\x2e\xc3\x18\x66\x64\x06\x00\xba\xf1\x13\x16\x8d\xb0\xce\xf3\x99\x05\x0f\xb1\x35\x49\x83\x2b\x37\xa0\xa7\xbf\x5b\x57\x09\xd7\xec\x7c\x91\xf2\x49\x35\xe2\x45\x52\x5d\x48\xb2\xff\xbf\x7c\xd5\x30\xef\x56\x20\x81\x7e\xa7\x21\xea\xfe\x74\xc3\xb6\xef\x63\x0a\x4b\xeb\x48\x1a\x93\x9f\x8c\x39\xec\x0b\x63\xe8\x74\xdb\xb5\xde\x87\xf8\x17\x00\x00\xff\xff\xc3\x3a\x6f\xb8\xcc\x00\x00\x00") + +func _3_rendezvousUpSqlBytes() ([]byte, error) { + return bindataRead( + __3_rendezvousUpSql, + "3_rendezvous.up.sql", + ) +} + +func _3_rendezvousUpSql() (*asset, error) { + bytes, err := _3_rendezvousUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "3_rendezvous.up.sql", size: 204, mode: os.FileMode(0664), modTime: time.Unix(1678306389, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb4, 0x9e, 0xd1, 0xde, 0xd4, 0xd3, 0x7, 0xc8, 0x7e, 0xa8, 0x54, 0xb5, 0xb3, 0xa1, 0x3c, 0x56, 0xd5, 0xcd, 0x61, 0xed, 0x9c, 0x82, 0x57, 0x24, 0x1f, 0x42, 0x98, 0xf4, 0x33, 0xa4, 0xc0, 0x16}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -272,6 +314,10 @@ var _bindata = map[string]func() (*asset, error){ "2_messages_index.up.sql": _2_messages_indexUpSql, + "3_rendezvous.down.sql": _3_rendezvousDownSql, + + "3_rendezvous.up.sql": _3_rendezvousUpSql, + "doc.go": docGo, } @@ -320,6 +366,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}}, "2_messages_index.down.sql": &bintree{_2_messages_indexDownSql, map[string]*bintree{}}, "2_messages_index.up.sql": &bintree{_2_messages_indexUpSql, map[string]*bintree{}}, + "3_rendezvous.down.sql": &bintree{_3_rendezvousDownSql, map[string]*bintree{}}, + "3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/waku/persistence/sqlite/migrations/sql/3_rendezvous.down.sql b/waku/persistence/sqlite/migrations/sql/3_rendezvous.down.sql new file mode 100644 index 00000000..053a98e3 --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/3_rendezvous.down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS Registrations; + +DROP TABLE IF EXISTS Nonce; diff --git a/waku/persistence/sqlite/migrations/sql/3_rendezvous.up.sql b/waku/persistence/sqlite/migrations/sql/3_rendezvous.up.sql new file mode 100644 index 00000000..100b546a --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/3_rendezvous.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE registrations ( + counter INTEGER PRIMARY KEY AUTOINCREMENT, + peer VARCHAR(64), + ns VARCHAR, + expire INTEGER, + addrs VARBINARY +); + +CREATE TABLE nonce ( + nonce VARBINARY +); diff --git a/waku/persistence/store.go b/waku/persistence/store.go index d010ccda..8a8a8fa9 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -1,6 +1,7 @@ package persistence import ( + "context" "database/sql" "errors" "fmt" @@ -21,7 +22,7 @@ type MessageProvider interface { Put(env *protocol.Envelope) error Query(query *pb.HistoryQuery) ([]StoredMessage, error) MostRecentTimestamp() (int64, error) - Start(timesource timesource.Timesource) error + Start(ctx context.Context, timesource timesource.Timesource) error Stop() } @@ -45,8 +46,8 @@ type DBStore struct { enableMigrations bool - wg sync.WaitGroup - quit chan struct{} + wg sync.WaitGroup + cancel context.CancelFunc } type StoredMessage struct { @@ -124,7 +125,6 @@ func DefaultOptions() []DBOption { func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { result := new(DBStore) result.log = log.Named("dbstore") - result.quit = make(chan struct{}) optList := DefaultOptions() optList = append(optList, options...) @@ -146,7 +146,10 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { return result, nil } -func (d *DBStore) Start(timesource timesource.Timesource) error { +func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error { + ctx, cancel := context.WithCancel(ctx) + + d.cancel = cancel d.timesource = timesource err := d.cleanOlderRecords() @@ -155,7 +158,7 @@ func (d *DBStore) Start(timesource timesource.Timesource) error { } d.wg.Add(1) - go d.checkForOlderRecords(60 * time.Second) + go d.checkForOlderRecords(ctx, 60*time.Second) return nil } @@ -192,7 +195,7 @@ func (d *DBStore) cleanOlderRecords() error { return nil } -func (d *DBStore) checkForOlderRecords(t time.Duration) { +func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) { defer d.wg.Done() ticker := time.NewTicker(t) @@ -200,7 +203,7 @@ func (d *DBStore) checkForOlderRecords(t time.Duration) { for { select { - case <-d.quit: + case <-ctx.Done(): return case <-ticker.C: err := d.cleanOlderRecords() @@ -213,7 +216,11 @@ func (d *DBStore) checkForOlderRecords(t time.Duration) { // Stop closes a DB connection func (d *DBStore) Stop() { - d.quit <- struct{}{} + if d.cancel == nil { + return + } + + d.cancel() d.wg.Wait() d.db.Close() } diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index def9b154..597de85b 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -1,6 +1,7 @@ package persistence import ( + "context" "database/sql" "testing" "time" @@ -40,7 +41,7 @@ func TestDbStore(t *testing.T) { store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate)) require.NoError(t, err) - err = store.Start(timesource.NewDefaultClock()) + err = store.Start(context.Background(), timesource.NewDefaultClock()) require.NoError(t, err) res, err := store.GetAll() @@ -60,7 +61,7 @@ func TestStoreRetention(t *testing.T) { store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) - err = store.Start(timesource.NewDefaultClock()) + err = store.Start(context.Background(), timesource.NewDefaultClock()) require.NoError(t, err) insertTime := time.Now() @@ -83,7 +84,7 @@ func TestStoreRetention(t *testing.T) { store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) require.NoError(t, err) - err = store.Start(timesource.NewDefaultClock()) + err = store.Start(context.Background(), timesource.NewDefaultClock()) require.NoError(t, err) dbResults, err = store.GetAll() diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 3e9671ed..a02b7293 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -41,6 +41,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/swap" + "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -80,6 +81,7 @@ type WakuNode struct { peerConnector PeerConnectorService discoveryV5 Service peerExchange Service + rendezvous Service filter ReceptorService filterV2Full ReceptorService filterV2Light Service @@ -212,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } + w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.rendezvousDB, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...) @@ -390,6 +393,13 @@ func (w *WakuNode) Start(ctx context.Context) error { } } + if w.opts.enableRendezvous { + err := w.rendezvous.Start(ctx) + if err != nil { + return err + } + } + if w.opts.enableRLN { err = w.mountRlnRelay(ctx) if err != nil { @@ -415,6 +425,10 @@ func (w *WakuNode) Stop() { defer w.identificationEventSub.Close() defer w.addressChangesSub.Close() + if w.opts.enableRendezvous { + w.rendezvous.Stop() + } + w.relay.Stop() w.lightPush.Stop() w.store.Stop() diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index c164056e..f5078156 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -29,6 +29,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filterv2" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -79,6 +80,9 @@ type WakuNodeParameters struct { resumeNodes []multiaddr.Multiaddr messageProvider store.MessageProvider + enableRendezvous bool + rendezvousDB *rendezvous.DB + swapMode int swapDisconnectThreshold int swapPaymentThreshold int @@ -433,6 +437,16 @@ func WithWebsockets(address string, port int) WakuNodeOption { } } +// WithRendezvousServer is a WakuOption used to set the node as a rendezvous +// point, using an specific storage for the peer information +func WithRendezvousServer(db *rendezvous.DB) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableRendezvous = true + params.rendezvousDB = db + return nil + } +} + // WithSecureWebsockets is a WakuNodeOption used to enable secure websockets support func WithSecureWebsockets(address string, port int, certPath string, keyPath string) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index f4bed7cf..565f3f28 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -49,6 +49,7 @@ type WakuSwap interface { type WakuStore struct { ctx context.Context + cancel context.CancelFunc timesource timesource.Timesource MsgC chan *protocol.Envelope wg *sync.WaitGroup @@ -56,7 +57,6 @@ type WakuStore struct { log *zap.Logger started bool - quit chan struct{} msgProvider MessageProvider h host.Host @@ -71,7 +71,6 @@ func NewWakuStore(host host.Host, swap WakuSwap, p MessageProvider, timesource t wakuStore.swap = swap wakuStore.wg = &sync.WaitGroup{} wakuStore.log = log.Named("store") - wakuStore.quit = make(chan struct{}) wakuStore.timesource = timesource return wakuStore diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index b1e2d5e0..a52fa094 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -79,7 +79,7 @@ type MessageProvider interface { Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error) Put(env *protocol.Envelope) error MostRecentTimestamp() (int64, error) - Start(timesource timesource.Timesource) error + Start(ctx context.Context, timesource timesource.Timesource) error Stop() Count() (int, error) } @@ -110,21 +110,21 @@ func (store *WakuStore) Start(ctx context.Context) error { return nil } - err := store.msgProvider.Start(store.timesource) + err := store.msgProvider.Start(ctx, store.timesource) // TODO: store protocol should not start a message provider if err != nil { store.log.Error("Error starting message provider", zap.Error(err)) return nil } store.started = true - store.ctx = ctx + store.ctx, store.cancel = context.WithCancel(ctx) store.MsgC = make(chan *protocol.Envelope, 1024) store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest) store.wg.Add(2) - go store.storeIncomingMessages(ctx) - go store.updateMetrics(ctx) + go store.storeIncomingMessages(store.ctx) + go store.updateMetrics(store.ctx) store.log.Info("Store protocol started") @@ -174,7 +174,7 @@ func (store *WakuStore) updateMetrics(ctx context.Context) { } else { metrics.RecordMessage(store.ctx, "stored", msgCount) } - case <-store.quit: + case <-ctx.Done(): return } } @@ -229,6 +229,12 @@ func (store *WakuStore) MessageChannel() chan *protocol.Envelope { // Stop closes the store message channel and removes the protocol stream handler func (store *WakuStore) Stop() { + if store.cancel == nil { + return + } + + store.cancel() + store.started = false if store.MsgC != nil { @@ -236,8 +242,7 @@ func (store *WakuStore) Stop() { } if store.msgProvider != nil { - store.msgProvider.Stop() - store.quit <- struct{}{} + store.msgProvider.Stop() // TODO: StoreProtocol should not stop a message provider } if store.h != nil { diff --git a/waku/v2/rendezvous/db.go b/waku/v2/rendezvous/db.go new file mode 100644 index 00000000..4152a84c --- /dev/null +++ b/waku/v2/rendezvous/db.go @@ -0,0 +1,439 @@ +package rendezvous + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "database/sql" + "encoding/binary" + "errors" + "fmt" + "time" + + dbi "github.com/berty/go-libp2p-rendezvous/db" + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +type DB struct { + db *sql.DB + logger *zap.Logger + + insertPeerRegistration *sql.Stmt + deletePeerRegistrations *sql.Stmt + deletePeerRegistrationsNs *sql.Stmt + countPeerRegistrations *sql.Stmt + selectPeerRegistrations *sql.Stmt + selectPeerRegistrationsNS *sql.Stmt + selectPeerRegistrationsC *sql.Stmt + selectPeerRegistrationsNSC *sql.Stmt + deleteExpiredRegistrations *sql.Stmt + getCounter *sql.Stmt + + nonce []byte + + cancel func() +} + +func NewDB(ctx context.Context, db *sql.DB, logger *zap.Logger) *DB { + rdb := &DB{ + db: db, + logger: logger.Named("rendezvous/db"), + } + + return rdb +} + +func (db *DB) Start(ctx context.Context) error { + err := db.loadNonce() + if err != nil { + db.Close() + return err + } + + err = db.prepareStmts() + if err != nil { + db.Close() + return err + } + + bgctx, cancel := context.WithCancel(ctx) + db.cancel = cancel + go db.background(bgctx) + + return nil +} + +func (db *DB) Close() error { + db.cancel() + return db.db.Close() +} + +func (db *DB) insertNonce() error { + nonce := make([]byte, 32) + _, err := rand.Read(nonce) + if err != nil { + return err + } + + _, err = db.db.Exec("INSERT INTO nonce VALUES (?)", nonce) + if err != nil { + return err + } + + db.nonce = nonce + return nil +} + +func (db *DB) loadNonce() error { + var nonce []byte + row := db.db.QueryRow("SELECT nonce FROM nonce") + err := row.Scan(&nonce) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return db.insertNonce() + } + return err + } + db.nonce = nonce + return nil +} + +func (db *DB) prepareStmts() error { + stmt, err := db.db.Prepare("INSERT INTO registrations VALUES (NULL, ?, ?, ?, ?)") + if err != nil { + return err + } + db.insertPeerRegistration = stmt + + stmt, err = db.db.Prepare("DELETE FROM registrations WHERE peer = ?") + if err != nil { + return err + } + db.deletePeerRegistrations = stmt + + stmt, err = db.db.Prepare("DELETE FROM registrations WHERE peer = ? AND ns = ?") + if err != nil { + return err + } + db.deletePeerRegistrationsNs = stmt + + stmt, err = db.db.Prepare("SELECT COUNT(*) FROM registrations WHERE peer = ?") + if err != nil { + return err + } + db.countPeerRegistrations = stmt + + stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE expire > ? LIMIT ?") + if err != nil { + return err + } + db.selectPeerRegistrations = stmt + + stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE ns = ? AND expire > ? LIMIT ?") + if err != nil { + return err + } + db.selectPeerRegistrationsNS = stmt + + stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE counter > ? AND expire > ? LIMIT ?") + if err != nil { + return err + } + db.selectPeerRegistrationsC = stmt + + stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE counter > ? AND ns = ? AND expire > ? LIMIT ?") + if err != nil { + return err + } + db.selectPeerRegistrationsNSC = stmt + + stmt, err = db.db.Prepare("DELETE FROM registrations WHERE expire < ?") + if err != nil { + return err + } + db.deleteExpiredRegistrations = stmt + + stmt, err = db.db.Prepare("SELECT MAX(counter) FROM registrations") + if err != nil { + return err + } + db.getCounter = stmt + + return nil +} + +func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, error) { + pid := p.Pretty() + maddrs := packAddrs(addrs) + expire := time.Now().Unix() + int64(ttl) + + tx, err := db.db.Begin() + if err != nil { + return 0, err + } + + delOld := tx.Stmt(db.deletePeerRegistrationsNs) + insertNew := tx.Stmt(db.insertPeerRegistration) + getCounter := tx.Stmt(db.getCounter) + + _, err = delOld.Exec(pid, ns) + if err != nil { + _ = tx.Rollback() + return 0, err + } + + _, err = insertNew.Exec(pid, ns, expire, maddrs) + if err != nil { + _ = tx.Rollback() + return 0, err + } + + var counter uint64 + row := getCounter.QueryRow() + err = row.Scan(&counter) + if err != nil { + _ = tx.Rollback() + return 0, err + } + + err = tx.Commit() + return counter, err +} + +func (db *DB) CountRegistrations(p peer.ID) (int, error) { + pid := p.Pretty() + + row := db.countPeerRegistrations.QueryRow(pid) + + var count int + err := row.Scan(&count) + + return count, err +} + +func (db *DB) Unregister(p peer.ID, ns string) error { + pid := p.Pretty() + + var err error + + if ns == "" { + _, err = db.deletePeerRegistrations.Exec(pid) + } else { + _, err = db.deletePeerRegistrationsNs.Exec(pid, ns) + } + + return err +} + +func (db *DB) Discover(ns string, cookie []byte, limit int) ([]dbi.RegistrationRecord, []byte, error) { + now := time.Now().Unix() + + var ( + counter int64 + rows *sql.Rows + err error + ) + + if cookie != nil { + counter, err = unpackCookie(cookie) + if err != nil { + db.logger.Error("unpacking cookie", zap.Error(err)) + return nil, nil, err + } + } + + if counter > 0 { + if ns == "" { + rows, err = db.selectPeerRegistrationsC.Query(counter, now, limit) + } else { + rows, err = db.selectPeerRegistrationsNSC.Query(counter, ns, now, limit) + } + } else { + if ns == "" { + rows, err = db.selectPeerRegistrations.Query(now, limit) + } else { + rows, err = db.selectPeerRegistrationsNS.Query(ns, now, limit) + } + } + + if err != nil { + db.logger.Error("query", zap.Error(err)) + return nil, nil, err + } + + defer rows.Close() + + regs := make([]dbi.RegistrationRecord, 0, limit) + for rows.Next() { + var ( + reg dbi.RegistrationRecord + rid string + rns string + expire int64 + raddrs []byte + addrs [][]byte + p peer.ID + ) + + err = rows.Scan(&counter, &rid, &rns, &expire, &raddrs) + if err != nil { + db.logger.Error("row scan error", zap.Error(err)) + return nil, nil, err + } + + p, err = peer.Decode(rid) + if err != nil { + db.logger.Error("error decoding peer id", zap.Error(err)) + continue + } + + addrs, err := unpackAddrs(raddrs) + if err != nil { + db.logger.Error("error unpacking address", zap.Error(err)) + continue + } + + reg.Id = p + reg.Addrs = addrs + reg.Ttl = int(expire - now) + + if ns == "" { + reg.Ns = rns + } + + regs = append(regs, reg) + } + + err = rows.Err() + if err != nil { + return nil, nil, err + } + + if counter > 0 { + cookie = packCookie(counter, ns, db.nonce) + } + + return regs, cookie, nil +} + +func (db *DB) ValidCookie(ns string, cookie []byte) bool { + return validCookie(cookie, ns, db.nonce) +} + +func (db *DB) background(ctx context.Context) { + for { + db.cleanupExpired() + + select { + case <-time.After(15 * time.Minute): + case <-ctx.Done(): + return + } + } +} + +func (db *DB) cleanupExpired() { + now := time.Now().Unix() + _, err := db.deleteExpiredRegistrations.Exec(now) + if err != nil { + db.logger.Error("deleting expired registrations", zap.Error(err)) + } +} + +func packAddrs(addrs [][]byte) []byte { + packlen := 0 + for _, addr := range addrs { + packlen = packlen + 2 + len(addr) + } + + packed := make([]byte, packlen) + buf := packed + for _, addr := range addrs { + binary.BigEndian.PutUint16(buf, uint16(len(addr))) + buf = buf[2:] + copy(buf, addr) + buf = buf[len(addr):] + } + + return packed +} + +func unpackAddrs(packed []byte) ([][]byte, error) { + var addrs [][]byte + + buf := packed + for len(buf) > 1 { + l := binary.BigEndian.Uint16(buf) + buf = buf[2:] + if len(buf) < int(l) { + return nil, fmt.Errorf("bad packed address: not enough bytes %v %v", packed, buf) + } + addr := make([]byte, l) + copy(addr, buf[:l]) + buf = buf[l:] + addrs = append(addrs, addr) + } + + if len(buf) > 0 { + return nil, fmt.Errorf("bad packed address: unprocessed bytes: %v %v", packed, buf) + } + + return addrs, nil +} + +// cookie: counter:SHA256(nonce + ns + counter) +func packCookie(counter int64, ns string, nonce []byte) []byte { + cbits := make([]byte, 8) + binary.BigEndian.PutUint64(cbits, uint64(counter)) + + hash := sha256.New() + _, err := hash.Write(nonce) + if err != nil { + panic(err) + } + _, err = hash.Write([]byte(ns)) + if err != nil { + panic(err) + } + _, err = hash.Write(cbits) + if err != nil { + panic(err) + } + + return hash.Sum(cbits) +} + +func unpackCookie(cookie []byte) (int64, error) { + if len(cookie) < 8 { + return 0, fmt.Errorf("bad packed cookie: not enough bytes: %v", cookie) + } + + counter := binary.BigEndian.Uint64(cookie[:8]) + return int64(counter), nil +} + +func validCookie(cookie []byte, ns string, nonce []byte) bool { + if len(cookie) != 40 { + return false + } + + cbits := cookie[:8] + hash := sha256.New() + _, err := hash.Write(nonce) + if err != nil { + panic(err) + } + _, err = hash.Write([]byte(ns)) + if err != nil { + panic(err) + } + _, err = hash.Write(cbits) + if err != nil { + panic(err) + } + hbits := hash.Sum(nil) + + return bytes.Equal(cookie[8:], hbits) +} diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go new file mode 100644 index 00000000..4c498771 --- /dev/null +++ b/waku/v2/rendezvous/rendezvous.go @@ -0,0 +1,52 @@ +package rendezvous + +import ( + "context" + + rvs "github.com/berty/go-libp2p-rendezvous" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +const RendezvousID = rvs.RendezvousProto + +type Rendezvous struct { + host host.Host + peerConnector PeerConnector + db *DB + rendezvousSvc *rvs.RendezvousService + + log *zap.Logger +} + +type PeerConnector interface { + PeerChannel() chan<- peer.AddrInfo +} + +func NewRendezvous(host host.Host, db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { + logger := log.Named("rendezvous") + + return &Rendezvous{ + host: host, + db: db, + peerConnector: peerConnector, + log: logger, + } +} + +func (r *Rendezvous) Start(ctx context.Context) error { + err := r.db.Start(ctx) + if err != nil { + return err + } + + r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) + r.log.Info("rendezvous protocol started") + return nil +} + +func (r *Rendezvous) Stop() { + r.host.RemoveStreamHandler(rvs.RendezvousProto) + r.rendezvousSvc = nil +}