mirror of https://github.com/status-im/go-waku.git
feat: rendezvous server
This commit is contained in:
parent
63bb4509bf
commit
8826e2df66
|
@ -95,7 +95,7 @@ If you'd like to contribute to go-waku, please fork, fix, commit and send a pull
|
|||
To build and test this repository, you need:
|
||||
- [Go](https://golang.org/) (version 1.17 or later)
|
||||
- [protoc](https://grpc.io/docs/protoc-installation/)
|
||||
- [Protocol Buffers for Go with Gadgets](https://github.com/gogo/protobuf)
|
||||
- [protoc-gen-go](https://protobuf.dev/getting-started/gotutorial/#compiling-protocol-buffers)
|
||||
|
||||
To enable the git hooks:
|
||||
|
||||
|
|
|
@ -363,6 +363,21 @@ var (
|
|||
Destination: &options.DiscV5.AutoUpdate,
|
||||
EnvVars: []string{"WAKUNODE2_DISCV5_ENR_AUTO_UPDATE"},
|
||||
})
|
||||
|
||||
Rendezvous = altsrc.NewBoolFlag(&cli.BoolFlag{
|
||||
Name: "rendezvous",
|
||||
Usage: "Enable rendezvous protocol server for peer discovery",
|
||||
Destination: &options.Rendezvous.Enable,
|
||||
EnvVars: []string{"WAKUNODE2_RENDEZVOUS"},
|
||||
})
|
||||
RendezvousNode = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
|
||||
Name: "rendezvous-node",
|
||||
Usage: "Multiaddr of a waku2 rendezvous node. Option may be repeated",
|
||||
Value: &cliutils.MultiaddrSlice{
|
||||
Values: &options.Rendezvous.Nodes,
|
||||
},
|
||||
EnvVars: []string{"WAKUNODE2_RENDEZVOUSNODE"},
|
||||
})
|
||||
PeerExchange = altsrc.NewBoolFlag(&cli.BoolFlag{
|
||||
Name: "peer-exchange",
|
||||
Usage: "Enable waku peer exchange protocol (responder side)",
|
||||
|
|
|
@ -73,6 +73,8 @@ func main() {
|
|||
DNSDiscovery,
|
||||
DNSDiscoveryUrl,
|
||||
DNSDiscoveryNameServer,
|
||||
Rendezvous,
|
||||
RendezvousNode,
|
||||
MetricsServer,
|
||||
MetricsServerAddress,
|
||||
MetricsServerPort,
|
||||
|
|
|
@ -17,6 +17,7 @@ require (
|
|||
github.com/beevik/ntp v0.3.0 // indirect
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect
|
||||
github.com/btcsuite/btcd v0.20.1-beta // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
|
||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
|
||||
|
|
|
@ -72,6 +72,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
|
||||
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||
|
|
|
@ -66,7 +66,7 @@ int main(int argc, char *argv[])
|
|||
char *response;
|
||||
waku_set_event_callback(callBack);
|
||||
|
||||
char *configJSON = "{\"host\": \"0.0.0.0\", \"port\": 60000}";
|
||||
char *configJSON = "{\"host\": \"0.0.0.0\", \"port\": 60000, \"logLevel\":\"error\"}";
|
||||
response = waku_new(configJSON); // configJSON can be NULL too to use defaults
|
||||
if (isError(response))
|
||||
return 1;
|
||||
|
|
|
@ -32,7 +32,7 @@ char *utils_extract_wakumessage_from_signal(const nx_json *signal)
|
|||
const char *contentTopic = nx_json_get(wakuMsgJson, "contentTopic")->text_value;
|
||||
int version = nx_json_get(wakuMsgJson, "version")->int_value;
|
||||
long long timestamp = nx_json_get(wakuMsgJson, "timestamp")->int_value;
|
||||
char wakuMsg[1000];
|
||||
char wakuMsg[6000];
|
||||
sprintf(wakuMsg, "{\"payload\":\"%s\",\"contentTopic\":\"%s\",\"timestamp\":%lld, \"version\":%d}", payload, contentTopic, timestamp, version);
|
||||
char *response = (char *)malloc(sizeof(char) * (strlen(wakuMsg) + 1));
|
||||
strcpy(response, wakuMsg);
|
||||
|
|
|
@ -30,6 +30,7 @@ require (
|
|||
github.com/beevik/ntp v0.3.0 // indirect
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect
|
||||
github.com/btcsuite/btcd v0.20.1-beta // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
|
||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
|
||||
|
|
|
@ -76,6 +76,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
|
||||
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||
|
|
|
@ -17,6 +17,7 @@ require (
|
|||
github.com/beevik/ntp v0.3.0 // indirect
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect
|
||||
github.com/btcsuite/btcd v0.20.1-beta // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
|
||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
|
||||
|
|
|
@ -72,6 +72,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
|
||||
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||
|
|
|
@ -20,6 +20,7 @@ require (
|
|||
github.com/beevik/ntp v0.3.0 // indirect
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect
|
||||
github.com/btcsuite/btcd v0.20.1-beta // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
|
||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
|
||||
|
|
|
@ -72,6 +72,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
|
||||
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
];
|
||||
doCheck = false;
|
||||
# FIXME: This needs to be manually changed when updating modules.
|
||||
vendorSha256 = "sha256-yED55+XK/JVplsVcZQin2RBECIUt3XMr0crwt+X2S2Q=";
|
||||
vendorSha256 = "sha256-6oAZkdIibYESZJhxu1aQqctj6nUBKdc6dSmM694Ustc=";
|
||||
# Fix for 'nix run' trying to execute 'go-waku'.
|
||||
meta = { mainProgram = "waku"; };
|
||||
};
|
||||
|
|
1
go.mod
1
go.mod
|
@ -35,6 +35,7 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1
|
||||
github.com/go-chi/chi/v5 v5.0.0
|
||||
github.com/lib/pq v1.10.4
|
||||
github.com/waku-org/go-noise v0.0.4
|
||||
|
|
2
go.sum
2
go.sum
|
@ -211,6 +211,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
|
||||
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
|
||||
|
|
26
waku/node.go
26
waku/node.go
|
@ -14,6 +14,7 @@ import (
|
|||
"time"
|
||||
|
||||
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
|
||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||
|
||||
"github.com/ethereum/go-ethereum/accounts/keystore"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
@ -57,6 +58,10 @@ func failOnErr(err error, msg string) {
|
|||
}
|
||||
}
|
||||
|
||||
func requiresDB(options Options) bool {
|
||||
return options.Store.Enable || options.Rendezvous.Enable
|
||||
}
|
||||
|
||||
const dialTimeout = 7 * time.Second
|
||||
|
||||
// Execute starts a go-waku node with settings determined by the Options parameter
|
||||
|
@ -81,7 +86,7 @@ func Execute(options Options) {
|
|||
|
||||
var db *sql.DB
|
||||
var migrationFn func(*sql.DB) error
|
||||
if options.Store.Enable {
|
||||
if requiresDB(options) {
|
||||
db, migrationFn, err = ExtractDBAndMigration(options.Store.DatabaseURL)
|
||||
failOnErr(err, "Could not connect to DB")
|
||||
}
|
||||
|
@ -172,17 +177,22 @@ func Execute(options Options) {
|
|||
}
|
||||
}
|
||||
|
||||
if options.Store.Enable {
|
||||
nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...))
|
||||
dbStore, err := persistence.NewDBStore(logger,
|
||||
var dbStore *persistence.DBStore
|
||||
if requiresDB(options) {
|
||||
dbStore, err = persistence.NewDBStore(logger,
|
||||
persistence.WithDB(db),
|
||||
persistence.WithMigrations(migrationFn),
|
||||
persistence.WithMigrations(migrationFn), // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB
|
||||
persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime),
|
||||
)
|
||||
failOnErr(err, "DBStore")
|
||||
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
|
||||
}
|
||||
|
||||
if options.Store.Enable {
|
||||
nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...))
|
||||
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
|
||||
}
|
||||
|
||||
if options.LightPush.Enable {
|
||||
nodeOpts = append(nodeOpts, node.WithLightPush())
|
||||
}
|
||||
|
@ -232,6 +242,11 @@ func Execute(options Options) {
|
|||
nodeOpts = append(nodeOpts, node.WithPeerExchange())
|
||||
}
|
||||
|
||||
if options.Rendezvous.Enable {
|
||||
rdb := rendezvous.NewDB(ctx, db, logger)
|
||||
nodeOpts = append(nodeOpts, node.WithRendezvousServer(rdb))
|
||||
}
|
||||
|
||||
checkForRLN(logger, options, &nodeOpts)
|
||||
|
||||
wakuNode, err := node.New(nodeOpts...)
|
||||
|
@ -242,6 +257,7 @@ func Execute(options Options) {
|
|||
|
||||
addPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4)
|
||||
addPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1)
|
||||
addPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
|
||||
|
||||
if options.Filter.UseV2 {
|
||||
addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1)
|
||||
|
|
|
@ -139,6 +139,11 @@ type PeerExchangeOptions struct {
|
|||
Node *multiaddr.Multiaddr
|
||||
}
|
||||
|
||||
type RendezvousOptions struct {
|
||||
Enable bool
|
||||
Nodes []multiaddr.Multiaddr
|
||||
}
|
||||
|
||||
// Options contains all the available features and settings that can be
|
||||
// configured via flags when executing go-waku as a service.
|
||||
type Options struct {
|
||||
|
@ -172,6 +177,7 @@ type Options struct {
|
|||
RLNRelay RLNRelayOptions
|
||||
DiscV5 DiscV5Options
|
||||
DNSDiscovery DNSDiscoveryOptions
|
||||
Rendezvous RendezvousOptions
|
||||
Metrics MetricsOptions
|
||||
RPCServer RPCServerOptions
|
||||
RESTServer RESTServerOptions
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
// 1_messages.up.sql (452B)
|
||||
// 2_messages_index.down.sql (60B)
|
||||
// 2_messages_index.up.sql (226B)
|
||||
// 3_rendezvous.down.sql (65B)
|
||||
// 3_rendezvous.up.sql (181B)
|
||||
// doc.go (74B)
|
||||
|
||||
package migrations
|
||||
|
@ -153,6 +155,46 @@ func _2_messages_indexUpSql() (*asset, error) {
|
|||
return a, nil
|
||||
}
|
||||
|
||||
var __3_rendezvousDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\x08\x4a\x4d\xcf\x2c\x2e\x29\x4a\x2c\xc9\xcc\xcf\x2b\xb6\xe6\xe2\xc2\xaa\xc8\x2f\x3f\x2f\x39\xd5\x9a\x0b\x10\x00\x00\xff\xff\x58\x3f\x49\x49\x41\x00\x00\x00")
|
||||
|
||||
func _3_rendezvousDownSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__3_rendezvousDownSql,
|
||||
"3_rendezvous.down.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _3_rendezvousDownSql() (*asset, error) {
|
||||
bytes, err := _3_rendezvousDownSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "3_rendezvous.down.sql", size: 65, mode: os.FileMode(0664), modTime: time.Unix(1678306445, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1f, 0x4b, 0xc0, 0x7d, 0x4f, 0xac, 0xc4, 0x75, 0x59, 0xcc, 0xfc, 0x1a, 0x6c, 0x18, 0x81, 0x29, 0x24, 0x33, 0x3, 0x10, 0x39, 0xd0, 0x67, 0x28, 0xa0, 0xe0, 0xfd, 0x36, 0x91, 0x25, 0x37, 0x83}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var __3_rendezvousUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\xce\xc1\x0a\x82\x40\x10\xc6\xf1\xfb\x3e\xc5\x77\x54\xf0\x18\x5d\x3a\x8d\x32\x94\x64\x11\xd3\x12\x78\x14\x1d\xc2\xcb\x2a\xb3\x1b\xf4\xf8\x81\x66\xd0\x6d\xf8\xcd\x77\xf8\x57\xc2\xe4\x19\x9e\xca\x86\x61\xfa\x1c\x63\xb2\x2e\x8d\x53\x88\xc8\x1c\x00\xf4\xd3\x2b\x24\x35\xdc\x59\x6a\x6a\x70\x93\xfa\x42\xd2\xe2\xcc\x6d\xb1\xfc\x67\x55\xc3\x83\xa4\x3a\x91\x64\xfb\x5d\xbe\x6a\x88\x9b\x15\x58\x40\xdf\xf3\x68\x8a\xfa\xea\xf9\xc8\x1b\x76\xc3\x60\x11\x65\xeb\x99\x5c\x7e\x70\xee\x2f\x26\x4c\xa1\xd7\x6f\xc4\x7a\xff\x86\x9f\x00\x00\x00\xff\xff\x00\x70\x80\x83\xb5\x00\x00\x00")
|
||||
|
||||
func _3_rendezvousUpSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__3_rendezvousUpSql,
|
||||
"3_rendezvous.up.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _3_rendezvousUpSql() (*asset, error) {
|
||||
bytes, err := _3_rendezvousUpSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "3_rendezvous.up.sql", size: 181, mode: os.FileMode(0664), modTime: time.Unix(1678306380, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x5c, 0xb8, 0x4f, 0x88, 0xe9, 0xc6, 0xc, 0xbb, 0x2e, 0x56, 0xa2, 0xcd, 0x9, 0xfa, 0x33, 0x94, 0xd7, 0x73, 0xc1, 0xa, 0xc5, 0x69, 0xfb, 0x9f, 0x75, 0xdb, 0x75, 0x58, 0x20, 0x5e, 0xf, 0x14}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
|
||||
|
||||
func docGoBytes() ([]byte, error) {
|
||||
|
@ -272,6 +314,10 @@ var _bindata = map[string]func() (*asset, error){
|
|||
|
||||
"2_messages_index.up.sql": _2_messages_indexUpSql,
|
||||
|
||||
"3_rendezvous.down.sql": _3_rendezvousDownSql,
|
||||
|
||||
"3_rendezvous.up.sql": _3_rendezvousUpSql,
|
||||
|
||||
"doc.go": docGo,
|
||||
}
|
||||
|
||||
|
@ -320,6 +366,8 @@ var _bintree = &bintree{nil, map[string]*bintree{
|
|||
"1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}},
|
||||
"2_messages_index.down.sql": &bintree{_2_messages_indexDownSql, map[string]*bintree{}},
|
||||
"2_messages_index.up.sql": &bintree{_2_messages_indexUpSql, map[string]*bintree{}},
|
||||
"3_rendezvous.down.sql": &bintree{_3_rendezvousDownSql, map[string]*bintree{}},
|
||||
"3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}},
|
||||
"doc.go": &bintree{docGo, map[string]*bintree{}},
|
||||
}}
|
||||
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
DROP TABLE IF EXISTS Registrations;
|
||||
|
||||
DROP TABLE IF EXISTS Nonce;
|
|
@ -0,0 +1,11 @@
|
|||
CREATE TABLE registrations (
|
||||
counter SERIAL PRIMARY KEY,
|
||||
peer VARCHAR(64),
|
||||
ns VARCHAR,
|
||||
expire INTEGER,
|
||||
addrs BYTEA
|
||||
);
|
||||
|
||||
CREATE TABLE nonce (
|
||||
nonce BYTEA
|
||||
);
|
|
@ -4,6 +4,8 @@
|
|||
// 1_messages.up.sql (464B)
|
||||
// 2_messages_index.down.sql (60B)
|
||||
// 2_messages_index.up.sql (226B)
|
||||
// 3_rendezvous.down.sql (65B)
|
||||
// 3_rendezvous.up.sql (204B)
|
||||
// doc.go (74B)
|
||||
|
||||
package migrations
|
||||
|
@ -153,6 +155,46 @@ func _2_messages_indexUpSql() (*asset, error) {
|
|||
return a, nil
|
||||
}
|
||||
|
||||
var __3_rendezvousDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\x08\x4a\x4d\xcf\x2c\x2e\x29\x4a\x2c\xc9\xcc\xcf\x2b\xb6\xe6\xe2\xc2\xaa\xc8\x2f\x3f\x2f\x39\xd5\x9a\x0b\x10\x00\x00\xff\xff\x58\x3f\x49\x49\x41\x00\x00\x00")
|
||||
|
||||
func _3_rendezvousDownSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__3_rendezvousDownSql,
|
||||
"3_rendezvous.down.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _3_rendezvousDownSql() (*asset, error) {
|
||||
bytes, err := _3_rendezvousDownSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "3_rendezvous.down.sql", size: 65, mode: os.FileMode(0664), modTime: time.Unix(1678305816, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1f, 0x4b, 0xc0, 0x7d, 0x4f, 0xac, 0xc4, 0x75, 0x59, 0xcc, 0xfc, 0x1a, 0x6c, 0x18, 0x81, 0x29, 0x24, 0x33, 0x3, 0x10, 0x39, 0xd0, 0x67, 0x28, 0xa0, 0xe0, 0xfd, 0x36, 0x91, 0x25, 0x37, 0x83}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var __3_rendezvousUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\xce\xb1\xaa\x83\x40\x10\x85\xe1\x7e\x9f\xe2\x94\x0a\x96\x97\xdb\xa4\x1a\x65\x48\x96\xc4\x4d\x18\x36\x01\x4b\xd1\x21\xd8\xac\xb2\x1a\xc8\xe3\x07\x56\x2d\xec\x86\x8f\x9f\xe1\x54\xc2\xe4\x19\x9e\xca\x1b\x23\xea\x7b\x98\x97\xd8\x2e\xc3\x18\x66\x64\x06\x00\xba\xf1\x13\x16\x8d\xb0\xce\xf3\x99\x05\x0f\xb1\x35\x49\x83\x2b\x37\xa0\xa7\xbf\x5b\x57\x09\xd7\xec\x7c\x91\xf2\x49\x35\xe2\x45\x52\x5d\x48\xb2\xff\xbf\x7c\xd5\x30\xef\x56\x20\x81\x7e\xa7\x21\xea\xfe\x74\xc3\xb6\xef\x63\x0a\x4b\xeb\x48\x1a\x93\x9f\x8c\x39\xec\x0b\x63\xe8\x74\xdb\xb5\xde\x87\xf8\x17\x00\x00\xff\xff\xc3\x3a\x6f\xb8\xcc\x00\x00\x00")
|
||||
|
||||
func _3_rendezvousUpSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__3_rendezvousUpSql,
|
||||
"3_rendezvous.up.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _3_rendezvousUpSql() (*asset, error) {
|
||||
bytes, err := _3_rendezvousUpSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "3_rendezvous.up.sql", size: 204, mode: os.FileMode(0664), modTime: time.Unix(1678306389, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb4, 0x9e, 0xd1, 0xde, 0xd4, 0xd3, 0x7, 0xc8, 0x7e, 0xa8, 0x54, 0xb5, 0xb3, 0xa1, 0x3c, 0x56, 0xd5, 0xcd, 0x61, 0xed, 0x9c, 0x82, 0x57, 0x24, 0x1f, 0x42, 0x98, 0xf4, 0x33, 0xa4, 0xc0, 0x16}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
|
||||
|
||||
func docGoBytes() ([]byte, error) {
|
||||
|
@ -272,6 +314,10 @@ var _bindata = map[string]func() (*asset, error){
|
|||
|
||||
"2_messages_index.up.sql": _2_messages_indexUpSql,
|
||||
|
||||
"3_rendezvous.down.sql": _3_rendezvousDownSql,
|
||||
|
||||
"3_rendezvous.up.sql": _3_rendezvousUpSql,
|
||||
|
||||
"doc.go": docGo,
|
||||
}
|
||||
|
||||
|
@ -320,6 +366,8 @@ var _bintree = &bintree{nil, map[string]*bintree{
|
|||
"1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}},
|
||||
"2_messages_index.down.sql": &bintree{_2_messages_indexDownSql, map[string]*bintree{}},
|
||||
"2_messages_index.up.sql": &bintree{_2_messages_indexUpSql, map[string]*bintree{}},
|
||||
"3_rendezvous.down.sql": &bintree{_3_rendezvousDownSql, map[string]*bintree{}},
|
||||
"3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}},
|
||||
"doc.go": &bintree{docGo, map[string]*bintree{}},
|
||||
}}
|
||||
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
DROP TABLE IF EXISTS Registrations;
|
||||
|
||||
DROP TABLE IF EXISTS Nonce;
|
|
@ -0,0 +1,11 @@
|
|||
CREATE TABLE registrations (
|
||||
counter INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
peer VARCHAR(64),
|
||||
ns VARCHAR,
|
||||
expire INTEGER,
|
||||
addrs VARBINARY
|
||||
);
|
||||
|
||||
CREATE TABLE nonce (
|
||||
nonce VARBINARY
|
||||
);
|
|
@ -1,6 +1,7 @@
|
|||
package persistence
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -21,7 +22,7 @@ type MessageProvider interface {
|
|||
Put(env *protocol.Envelope) error
|
||||
Query(query *pb.HistoryQuery) ([]StoredMessage, error)
|
||||
MostRecentTimestamp() (int64, error)
|
||||
Start(timesource timesource.Timesource) error
|
||||
Start(ctx context.Context, timesource timesource.Timesource) error
|
||||
Stop()
|
||||
}
|
||||
|
||||
|
@ -45,8 +46,8 @@ type DBStore struct {
|
|||
|
||||
enableMigrations bool
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type StoredMessage struct {
|
||||
|
@ -124,7 +125,6 @@ func DefaultOptions() []DBOption {
|
|||
func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
|
||||
result := new(DBStore)
|
||||
result.log = log.Named("dbstore")
|
||||
result.quit = make(chan struct{})
|
||||
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, options...)
|
||||
|
@ -146,7 +146,10 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (d *DBStore) Start(timesource timesource.Timesource) error {
|
||||
func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
d.cancel = cancel
|
||||
d.timesource = timesource
|
||||
|
||||
err := d.cleanOlderRecords()
|
||||
|
@ -155,7 +158,7 @@ func (d *DBStore) Start(timesource timesource.Timesource) error {
|
|||
}
|
||||
|
||||
d.wg.Add(1)
|
||||
go d.checkForOlderRecords(60 * time.Second)
|
||||
go d.checkForOlderRecords(ctx, 60*time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -192,7 +195,7 @@ func (d *DBStore) cleanOlderRecords() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *DBStore) checkForOlderRecords(t time.Duration) {
|
||||
func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
|
||||
defer d.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(t)
|
||||
|
@ -200,7 +203,7 @@ func (d *DBStore) checkForOlderRecords(t time.Duration) {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-d.quit:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := d.cleanOlderRecords()
|
||||
|
@ -213,7 +216,11 @@ func (d *DBStore) checkForOlderRecords(t time.Duration) {
|
|||
|
||||
// Stop closes a DB connection
|
||||
func (d *DBStore) Stop() {
|
||||
d.quit <- struct{}{}
|
||||
if d.cancel == nil {
|
||||
return
|
||||
}
|
||||
|
||||
d.cancel()
|
||||
d.wg.Wait()
|
||||
d.db.Close()
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package persistence
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -40,7 +41,7 @@ func TestDbStore(t *testing.T) {
|
|||
store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = store.Start(timesource.NewDefaultClock())
|
||||
err = store.Start(context.Background(), timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := store.GetAll()
|
||||
|
@ -60,7 +61,7 @@ func TestStoreRetention(t *testing.T) {
|
|||
store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = store.Start(timesource.NewDefaultClock())
|
||||
err = store.Start(context.Background(), timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
insertTime := time.Now()
|
||||
|
@ -83,7 +84,7 @@ func TestStoreRetention(t *testing.T) {
|
|||
store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = store.Start(timesource.NewDefaultClock())
|
||||
err = store.Start(context.Background(), timesource.NewDefaultClock())
|
||||
require.NoError(t, err)
|
||||
|
||||
dbResults, err = store.GetAll()
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
|
||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
@ -80,6 +81,7 @@ type WakuNode struct {
|
|||
peerConnector PeerConnectorService
|
||||
discoveryV5 Service
|
||||
peerExchange Service
|
||||
rendezvous Service
|
||||
filter ReceptorService
|
||||
filterV2Full ReceptorService
|
||||
filterV2Light Service
|
||||
|
@ -212,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.rendezvousDB, w.peerConnector, w.log)
|
||||
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
|
||||
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
|
||||
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...)
|
||||
|
@ -390,6 +393,13 @@ func (w *WakuNode) Start(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
if w.opts.enableRendezvous {
|
||||
err := w.rendezvous.Start(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if w.opts.enableRLN {
|
||||
err = w.mountRlnRelay(ctx)
|
||||
if err != nil {
|
||||
|
@ -415,6 +425,10 @@ func (w *WakuNode) Stop() {
|
|||
defer w.identificationEventSub.Close()
|
||||
defer w.addressChangesSub.Close()
|
||||
|
||||
if w.opts.enableRendezvous {
|
||||
w.rendezvous.Stop()
|
||||
}
|
||||
|
||||
w.relay.Stop()
|
||||
w.lightPush.Stop()
|
||||
w.store.Stop()
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
|
@ -79,6 +80,9 @@ type WakuNodeParameters struct {
|
|||
resumeNodes []multiaddr.Multiaddr
|
||||
messageProvider store.MessageProvider
|
||||
|
||||
enableRendezvous bool
|
||||
rendezvousDB *rendezvous.DB
|
||||
|
||||
swapMode int
|
||||
swapDisconnectThreshold int
|
||||
swapPaymentThreshold int
|
||||
|
@ -433,6 +437,16 @@ func WithWebsockets(address string, port int) WakuNodeOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithRendezvousServer is a WakuOption used to set the node as a rendezvous
|
||||
// point, using an specific storage for the peer information
|
||||
func WithRendezvousServer(db *rendezvous.DB) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enableRendezvous = true
|
||||
params.rendezvousDB = db
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSecureWebsockets is a WakuNodeOption used to enable secure websockets support
|
||||
func WithSecureWebsockets(address string, port int, certPath string, keyPath string) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
|
|
|
@ -49,6 +49,7 @@ type WakuSwap interface {
|
|||
|
||||
type WakuStore struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
timesource timesource.Timesource
|
||||
MsgC chan *protocol.Envelope
|
||||
wg *sync.WaitGroup
|
||||
|
@ -56,7 +57,6 @@ type WakuStore struct {
|
|||
log *zap.Logger
|
||||
|
||||
started bool
|
||||
quit chan struct{}
|
||||
|
||||
msgProvider MessageProvider
|
||||
h host.Host
|
||||
|
@ -71,7 +71,6 @@ func NewWakuStore(host host.Host, swap WakuSwap, p MessageProvider, timesource t
|
|||
wakuStore.swap = swap
|
||||
wakuStore.wg = &sync.WaitGroup{}
|
||||
wakuStore.log = log.Named("store")
|
||||
wakuStore.quit = make(chan struct{})
|
||||
wakuStore.timesource = timesource
|
||||
|
||||
return wakuStore
|
||||
|
|
|
@ -79,7 +79,7 @@ type MessageProvider interface {
|
|||
Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error)
|
||||
Put(env *protocol.Envelope) error
|
||||
MostRecentTimestamp() (int64, error)
|
||||
Start(timesource timesource.Timesource) error
|
||||
Start(ctx context.Context, timesource timesource.Timesource) error
|
||||
Stop()
|
||||
Count() (int, error)
|
||||
}
|
||||
|
@ -110,21 +110,21 @@ func (store *WakuStore) Start(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := store.msgProvider.Start(store.timesource)
|
||||
err := store.msgProvider.Start(ctx, store.timesource) // TODO: store protocol should not start a message provider
|
||||
if err != nil {
|
||||
store.log.Error("Error starting message provider", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
store.started = true
|
||||
store.ctx = ctx
|
||||
store.ctx, store.cancel = context.WithCancel(ctx)
|
||||
store.MsgC = make(chan *protocol.Envelope, 1024)
|
||||
|
||||
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
|
||||
|
||||
store.wg.Add(2)
|
||||
go store.storeIncomingMessages(ctx)
|
||||
go store.updateMetrics(ctx)
|
||||
go store.storeIncomingMessages(store.ctx)
|
||||
go store.updateMetrics(store.ctx)
|
||||
|
||||
store.log.Info("Store protocol started")
|
||||
|
||||
|
@ -174,7 +174,7 @@ func (store *WakuStore) updateMetrics(ctx context.Context) {
|
|||
} else {
|
||||
metrics.RecordMessage(store.ctx, "stored", msgCount)
|
||||
}
|
||||
case <-store.quit:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -229,6 +229,12 @@ func (store *WakuStore) MessageChannel() chan *protocol.Envelope {
|
|||
|
||||
// Stop closes the store message channel and removes the protocol stream handler
|
||||
func (store *WakuStore) Stop() {
|
||||
if store.cancel == nil {
|
||||
return
|
||||
}
|
||||
|
||||
store.cancel()
|
||||
|
||||
store.started = false
|
||||
|
||||
if store.MsgC != nil {
|
||||
|
@ -236,8 +242,7 @@ func (store *WakuStore) Stop() {
|
|||
}
|
||||
|
||||
if store.msgProvider != nil {
|
||||
store.msgProvider.Stop()
|
||||
store.quit <- struct{}{}
|
||||
store.msgProvider.Stop() // TODO: StoreProtocol should not stop a message provider
|
||||
}
|
||||
|
||||
if store.h != nil {
|
||||
|
|
|
@ -0,0 +1,439 @@
|
|||
package rendezvous
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
dbi "github.com/berty/go-libp2p-rendezvous/db"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
db *sql.DB
|
||||
logger *zap.Logger
|
||||
|
||||
insertPeerRegistration *sql.Stmt
|
||||
deletePeerRegistrations *sql.Stmt
|
||||
deletePeerRegistrationsNs *sql.Stmt
|
||||
countPeerRegistrations *sql.Stmt
|
||||
selectPeerRegistrations *sql.Stmt
|
||||
selectPeerRegistrationsNS *sql.Stmt
|
||||
selectPeerRegistrationsC *sql.Stmt
|
||||
selectPeerRegistrationsNSC *sql.Stmt
|
||||
deleteExpiredRegistrations *sql.Stmt
|
||||
getCounter *sql.Stmt
|
||||
|
||||
nonce []byte
|
||||
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func NewDB(ctx context.Context, db *sql.DB, logger *zap.Logger) *DB {
|
||||
rdb := &DB{
|
||||
db: db,
|
||||
logger: logger.Named("rendezvous/db"),
|
||||
}
|
||||
|
||||
return rdb
|
||||
}
|
||||
|
||||
func (db *DB) Start(ctx context.Context) error {
|
||||
err := db.loadNonce()
|
||||
if err != nil {
|
||||
db.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
err = db.prepareStmts()
|
||||
if err != nil {
|
||||
db.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
bgctx, cancel := context.WithCancel(ctx)
|
||||
db.cancel = cancel
|
||||
go db.background(bgctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) Close() error {
|
||||
db.cancel()
|
||||
return db.db.Close()
|
||||
}
|
||||
|
||||
func (db *DB) insertNonce() error {
|
||||
nonce := make([]byte, 32)
|
||||
_, err := rand.Read(nonce)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = db.db.Exec("INSERT INTO nonce VALUES (?)", nonce)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.nonce = nonce
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) loadNonce() error {
|
||||
var nonce []byte
|
||||
row := db.db.QueryRow("SELECT nonce FROM nonce")
|
||||
err := row.Scan(&nonce)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return db.insertNonce()
|
||||
}
|
||||
return err
|
||||
}
|
||||
db.nonce = nonce
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) prepareStmts() error {
|
||||
stmt, err := db.db.Prepare("INSERT INTO registrations VALUES (NULL, ?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.insertPeerRegistration = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("DELETE FROM registrations WHERE peer = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.deletePeerRegistrations = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("DELETE FROM registrations WHERE peer = ? AND ns = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.deletePeerRegistrationsNs = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("SELECT COUNT(*) FROM registrations WHERE peer = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.countPeerRegistrations = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE expire > ? LIMIT ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.selectPeerRegistrations = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE ns = ? AND expire > ? LIMIT ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.selectPeerRegistrationsNS = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE counter > ? AND expire > ? LIMIT ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.selectPeerRegistrationsC = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("SELECT * FROM registrations WHERE counter > ? AND ns = ? AND expire > ? LIMIT ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.selectPeerRegistrationsNSC = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("DELETE FROM registrations WHERE expire < ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.deleteExpiredRegistrations = stmt
|
||||
|
||||
stmt, err = db.db.Prepare("SELECT MAX(counter) FROM registrations")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.getCounter = stmt
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, error) {
|
||||
pid := p.Pretty()
|
||||
maddrs := packAddrs(addrs)
|
||||
expire := time.Now().Unix() + int64(ttl)
|
||||
|
||||
tx, err := db.db.Begin()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
delOld := tx.Stmt(db.deletePeerRegistrationsNs)
|
||||
insertNew := tx.Stmt(db.insertPeerRegistration)
|
||||
getCounter := tx.Stmt(db.getCounter)
|
||||
|
||||
_, err = delOld.Exec(pid, ns)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
|
||||
_, err = insertNew.Exec(pid, ns, expire, maddrs)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var counter uint64
|
||||
row := getCounter.QueryRow()
|
||||
err = row.Scan(&counter)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
return counter, err
|
||||
}
|
||||
|
||||
func (db *DB) CountRegistrations(p peer.ID) (int, error) {
|
||||
pid := p.Pretty()
|
||||
|
||||
row := db.countPeerRegistrations.QueryRow(pid)
|
||||
|
||||
var count int
|
||||
err := row.Scan(&count)
|
||||
|
||||
return count, err
|
||||
}
|
||||
|
||||
func (db *DB) Unregister(p peer.ID, ns string) error {
|
||||
pid := p.Pretty()
|
||||
|
||||
var err error
|
||||
|
||||
if ns == "" {
|
||||
_, err = db.deletePeerRegistrations.Exec(pid)
|
||||
} else {
|
||||
_, err = db.deletePeerRegistrationsNs.Exec(pid, ns)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *DB) Discover(ns string, cookie []byte, limit int) ([]dbi.RegistrationRecord, []byte, error) {
|
||||
now := time.Now().Unix()
|
||||
|
||||
var (
|
||||
counter int64
|
||||
rows *sql.Rows
|
||||
err error
|
||||
)
|
||||
|
||||
if cookie != nil {
|
||||
counter, err = unpackCookie(cookie)
|
||||
if err != nil {
|
||||
db.logger.Error("unpacking cookie", zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if counter > 0 {
|
||||
if ns == "" {
|
||||
rows, err = db.selectPeerRegistrationsC.Query(counter, now, limit)
|
||||
} else {
|
||||
rows, err = db.selectPeerRegistrationsNSC.Query(counter, ns, now, limit)
|
||||
}
|
||||
} else {
|
||||
if ns == "" {
|
||||
rows, err = db.selectPeerRegistrations.Query(now, limit)
|
||||
} else {
|
||||
rows, err = db.selectPeerRegistrationsNS.Query(ns, now, limit)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
db.logger.Error("query", zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
regs := make([]dbi.RegistrationRecord, 0, limit)
|
||||
for rows.Next() {
|
||||
var (
|
||||
reg dbi.RegistrationRecord
|
||||
rid string
|
||||
rns string
|
||||
expire int64
|
||||
raddrs []byte
|
||||
addrs [][]byte
|
||||
p peer.ID
|
||||
)
|
||||
|
||||
err = rows.Scan(&counter, &rid, &rns, &expire, &raddrs)
|
||||
if err != nil {
|
||||
db.logger.Error("row scan error", zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
p, err = peer.Decode(rid)
|
||||
if err != nil {
|
||||
db.logger.Error("error decoding peer id", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
addrs, err := unpackAddrs(raddrs)
|
||||
if err != nil {
|
||||
db.logger.Error("error unpacking address", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
reg.Id = p
|
||||
reg.Addrs = addrs
|
||||
reg.Ttl = int(expire - now)
|
||||
|
||||
if ns == "" {
|
||||
reg.Ns = rns
|
||||
}
|
||||
|
||||
regs = append(regs, reg)
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if counter > 0 {
|
||||
cookie = packCookie(counter, ns, db.nonce)
|
||||
}
|
||||
|
||||
return regs, cookie, nil
|
||||
}
|
||||
|
||||
func (db *DB) ValidCookie(ns string, cookie []byte) bool {
|
||||
return validCookie(cookie, ns, db.nonce)
|
||||
}
|
||||
|
||||
func (db *DB) background(ctx context.Context) {
|
||||
for {
|
||||
db.cleanupExpired()
|
||||
|
||||
select {
|
||||
case <-time.After(15 * time.Minute):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (db *DB) cleanupExpired() {
|
||||
now := time.Now().Unix()
|
||||
_, err := db.deleteExpiredRegistrations.Exec(now)
|
||||
if err != nil {
|
||||
db.logger.Error("deleting expired registrations", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func packAddrs(addrs [][]byte) []byte {
|
||||
packlen := 0
|
||||
for _, addr := range addrs {
|
||||
packlen = packlen + 2 + len(addr)
|
||||
}
|
||||
|
||||
packed := make([]byte, packlen)
|
||||
buf := packed
|
||||
for _, addr := range addrs {
|
||||
binary.BigEndian.PutUint16(buf, uint16(len(addr)))
|
||||
buf = buf[2:]
|
||||
copy(buf, addr)
|
||||
buf = buf[len(addr):]
|
||||
}
|
||||
|
||||
return packed
|
||||
}
|
||||
|
||||
func unpackAddrs(packed []byte) ([][]byte, error) {
|
||||
var addrs [][]byte
|
||||
|
||||
buf := packed
|
||||
for len(buf) > 1 {
|
||||
l := binary.BigEndian.Uint16(buf)
|
||||
buf = buf[2:]
|
||||
if len(buf) < int(l) {
|
||||
return nil, fmt.Errorf("bad packed address: not enough bytes %v %v", packed, buf)
|
||||
}
|
||||
addr := make([]byte, l)
|
||||
copy(addr, buf[:l])
|
||||
buf = buf[l:]
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
if len(buf) > 0 {
|
||||
return nil, fmt.Errorf("bad packed address: unprocessed bytes: %v %v", packed, buf)
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
// cookie: counter:SHA256(nonce + ns + counter)
|
||||
func packCookie(counter int64, ns string, nonce []byte) []byte {
|
||||
cbits := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(cbits, uint64(counter))
|
||||
|
||||
hash := sha256.New()
|
||||
_, err := hash.Write(nonce)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = hash.Write([]byte(ns))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = hash.Write(cbits)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return hash.Sum(cbits)
|
||||
}
|
||||
|
||||
func unpackCookie(cookie []byte) (int64, error) {
|
||||
if len(cookie) < 8 {
|
||||
return 0, fmt.Errorf("bad packed cookie: not enough bytes: %v", cookie)
|
||||
}
|
||||
|
||||
counter := binary.BigEndian.Uint64(cookie[:8])
|
||||
return int64(counter), nil
|
||||
}
|
||||
|
||||
func validCookie(cookie []byte, ns string, nonce []byte) bool {
|
||||
if len(cookie) != 40 {
|
||||
return false
|
||||
}
|
||||
|
||||
cbits := cookie[:8]
|
||||
hash := sha256.New()
|
||||
_, err := hash.Write(nonce)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = hash.Write([]byte(ns))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = hash.Write(cbits)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
hbits := hash.Sum(nil)
|
||||
|
||||
return bytes.Equal(cookie[8:], hbits)
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package rendezvous
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
rvs "github.com/berty/go-libp2p-rendezvous"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const RendezvousID = rvs.RendezvousProto
|
||||
|
||||
type Rendezvous struct {
|
||||
host host.Host
|
||||
peerConnector PeerConnector
|
||||
db *DB
|
||||
rendezvousSvc *rvs.RendezvousService
|
||||
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
type PeerConnector interface {
|
||||
PeerChannel() chan<- peer.AddrInfo
|
||||
}
|
||||
|
||||
func NewRendezvous(host host.Host, db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
|
||||
logger := log.Named("rendezvous")
|
||||
|
||||
return &Rendezvous{
|
||||
host: host,
|
||||
db: db,
|
||||
peerConnector: peerConnector,
|
||||
log: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Rendezvous) Start(ctx context.Context) error {
|
||||
err := r.db.Start(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
|
||||
r.log.Info("rendezvous protocol started")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Rendezvous) Stop() {
|
||||
r.host.RemoveStreamHandler(rvs.RendezvousProto)
|
||||
r.rendezvousSvc = nil
|
||||
}
|
Loading…
Reference in New Issue