feat_: flag to enable/disable missing message verification (#5497)

This commit is contained in:
richΛrd 2024-07-09 09:42:34 -04:00 committed by GitHub
parent b2e5e7a81c
commit c0a7a1ee9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 104 additions and 142 deletions

View File

@ -114,10 +114,8 @@
// 1716385243_no_discovery.up.sql (44B)
// 1718785164_max_delivery_attempts_update.up.sql (60B)
// 1718978062_nodeconfig_add_connector.up.sql (76B)
// 1720466921_missing_message_verification.up.sql (167B)
// doc.go (94B)
// exit_code_1.txt (4B)
// test.log (79B)
// test_1.log (0)
package migrations
@ -2465,6 +2463,26 @@ func _1718978062_nodeconfig_add_connectorUpSql() (*asset, error) {
return a, nil
}
var __1720466921_missing_message_verificationUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\xcc\xb1\x0e\x82\x30\x10\x00\xd0\x9d\xaf\xb8\x6f\x70\x35\x0e\x87\xad\x53\x05\xa3\xd7\xb9\x39\xc9\xb5\xb9\x08\x25\xe1\x00\x7f\xdf\xb8\x3a\xf9\x03\x0f\x03\xf9\x3b\x10\xb6\xc1\xc3\x9b\x5f\xdb\x7e\x48\xc3\x5c\xb3\x16\x40\xe7\xe0\xdc\x87\x78\xed\x40\x2a\x3f\x47\x49\x93\x9a\x69\x2d\x69\x12\x33\x2e\x92\x76\x59\x34\xeb\xc0\xab\xce\x15\xda\xbe\x0f\x1e\x3b\x70\xfe\x82\x31\x10\x64\x1e\x4d\x8e\x4d\x13\x6f\x0e\xe9\xd7\x7e\x78\xfa\x0b\x3d\xc1\xba\x6c\x5f\xe5\x13\x00\x00\xff\xff\x08\xc7\x32\xed\xa7\x00\x00\x00")
func _1720466921_missing_message_verificationUpSqlBytes() ([]byte, error) {
return bindataRead(
__1720466921_missing_message_verificationUpSql,
"1720466921_missing_message_verification.up.sql",
)
}
func _1720466921_missing_message_verificationUpSql() (*asset, error) {
bytes, err := _1720466921_missing_message_verificationUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1720466921_missing_message_verification.up.sql", size: 167, mode: os.FileMode(0644), modTime: time.Unix(1700000000, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4c, 0x84, 0x55, 0x43, 0x82, 0x17, 0x7b, 0x71, 0x2a, 0xc0, 0xb6, 0x53, 0xbc, 0xc0, 0x9f, 0x70, 0xa3, 0x0, 0x32, 0x5b, 0xaa, 0xa, 0x9a, 0x20, 0xa, 0x40, 0x97, 0xc3, 0xfe, 0x35, 0xb3, 0xfa}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xcb\x41\x0e\x02\x31\x08\x05\xd0\x7d\x4f\xf1\x2f\x00\xe8\xca\xc4\xc4\xc3\xa0\x43\x08\x19\x5b\xc6\x96\xfb\xc7\x4d\xdf\xfe\x5d\xfa\x39\xd5\x0d\xeb\xf7\x6d\x4d\xc4\xf3\xe9\x36\x6c\x6a\x19\x3c\xe9\x1d\xe3\xd0\x52\x50\xcf\xa3\xa2\xdb\xeb\xfe\xb8\x6d\xa0\xeb\x74\xf4\xf0\xa9\x15\x39\x16\x28\xc1\x2c\x7b\xb0\x27\x58\xda\x3f\x00\x00\xff\xff\x57\xd4\xd5\x90\x5e\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -2485,66 +2503,6 @@ func docGo() (*asset, error) {
return a, nil
}
var _exit_code_1Txt = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x32\x34\x32\xe7\x02\x04\x00\x00\xff\xff\x0c\x38\xee\x3e\x04\x00\x00\x00")
func exit_code_1TxtBytes() ([]byte, error) {
return bindataRead(
_exit_code_1Txt,
"exit_code_1.txt",
)
}
func exit_code_1Txt() (*asset, error) {
bytes, err := exit_code_1TxtBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "exit_code_1.txt", size: 4, mode: os.FileMode(0664), modTime: time.Unix(1700000000, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x74, 0x3c, 0x78, 0x50, 0xcc, 0xcf, 0xba, 0x5e, 0x53, 0xa9, 0x0, 0x26, 0x63, 0xec, 0x1d, 0xdd, 0x10, 0x79, 0x31, 0x5a, 0x98, 0xbd, 0xbf, 0xdd, 0xe1, 0xe, 0x60, 0x44, 0xf5, 0x6a, 0xbe, 0xfe}}
return a, nil
}
var _testLog = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x3c\xc6\xb1\x11\x80\x20\x0c\x05\xd0\x1a\xa7\xc8\x02\x9a\x11\x1c\xc4\xb3\x08\x8a\x98\x3b\x20\x68\x3e\xfb\xdb\xd9\xbd\x95\x88\x42\x56\xdc\x23\x2e\x87\x55\x76\x08\x86\xcf\xfa\x2b\x1b\x4b\xef\xa7\x40\xa2\x78\xe2\xaa\xf9\x15\xa8\x35\x67\x7f\x4a\xd8\x9a\x11\x92\x83\x2e\x2d\xc9\xf7\xe9\x0b\x00\x00\xff\xff\xff\xeb\x51\xe5\x4f\x00\x00\x00")
func testLogBytes() ([]byte, error) {
return bindataRead(
_testLog,
"test.log",
)
}
func testLog() (*asset, error) {
bytes, err := testLogBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "test.log", size: 79, mode: os.FileMode(0664), modTime: time.Unix(1700000000, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xa4, 0x44, 0x38, 0xbc, 0xb4, 0xe5, 0x83, 0xfd, 0x7e, 0x19, 0xb0, 0x78, 0x70, 0x98, 0xc5, 0x9a, 0x6c, 0xeb, 0x5f, 0x68, 0x5, 0x7a, 0x77, 0x28, 0x8d, 0xb2, 0xff, 0x86, 0x47, 0x91, 0x10, 0x11}}
return a, nil
}
var _test_1Log = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x01\x00\x00\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00")
func test_1LogBytes() ([]byte, error) {
return bindataRead(
_test_1Log,
"test_1.log",
)
}
func test_1Log() (*asset, error) {
bytes, err := test_1LogBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "test_1.log", size: 0, mode: os.FileMode(0664), modTime: time.Unix(1700000000, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
@ -2750,10 +2708,8 @@ var _bindata = map[string]func() (*asset, error){
"1716385243_no_discovery.up.sql": _1716385243_no_discoveryUpSql,
"1718785164_max_delivery_attempts_update.up.sql": _1718785164_max_delivery_attempts_updateUpSql,
"1718978062_nodeconfig_add_connector.up.sql": _1718978062_nodeconfig_add_connectorUpSql,
"doc.go": docGo,
"exit_code_1.txt": exit_code_1Txt,
"test.log": testLog,
"test_1.log": test_1Log,
"1720466921_missing_message_verification.up.sql": _1720466921_missing_message_verificationUpSql,
"doc.go": docGo,
}
// AssetDebug is true if the assets were built with the debug flag enabled.
@ -2916,10 +2872,8 @@ var _bintree = &bintree{nil, map[string]*bintree{
"1716385243_no_discovery.up.sql": {_1716385243_no_discoveryUpSql, map[string]*bintree{}},
"1718785164_max_delivery_attempts_update.up.sql": {_1718785164_max_delivery_attempts_updateUpSql, map[string]*bintree{}},
"1718978062_nodeconfig_add_connector.up.sql": {_1718978062_nodeconfig_add_connectorUpSql, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
"exit_code_1.txt": {exit_code_1Txt, map[string]*bintree{}},
"test.log": {testLog, map[string]*bintree{}},
"test_1.log": {test_1Log, map[string]*bintree{}},
"1720466921_missing_message_verification.up.sql": {_1720466921_missing_message_verificationUpSql, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -0,0 +1,4 @@
ALTER TABLE wakuv2_config ADD COLUMN enable_missing_message_verification BOOLEAN DEFAULT false;
UPDATE wakuv2_config SET enable_missing_message_verification = true;

View File

@ -319,25 +319,26 @@ func (b *StatusNode) wakuService(wakuCfg *params.WakuConfig, clusterCfg *params.
func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, error) {
if b.wakuV2Srvc == nil {
cfg := &wakuv2.Config{
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
Host: nodeConfig.WakuV2Config.Host,
Port: nodeConfig.WakuV2Config.Port,
LightClient: nodeConfig.WakuV2Config.LightClient,
KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval,
Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
EnableStore: nodeConfig.WakuV2Config.EnableStore,
StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity,
StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds,
DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit,
DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes,
Nameserver: nodeConfig.WakuV2Config.Nameserver,
UDPPort: nodeConfig.WakuV2Config.UDPPort,
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic,
TelemetryServerURL: nodeConfig.WakuV2Config.TelemetryServerURL,
ClusterID: nodeConfig.ClusterConfig.ClusterID,
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
Host: nodeConfig.WakuV2Config.Host,
Port: nodeConfig.WakuV2Config.Port,
LightClient: nodeConfig.WakuV2Config.LightClient,
KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval,
Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
EnableStore: nodeConfig.WakuV2Config.EnableStore,
StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity,
StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds,
DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit,
DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes,
Nameserver: nodeConfig.WakuV2Config.Nameserver,
UDPPort: nodeConfig.WakuV2Config.UDPPort,
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic,
TelemetryServerURL: nodeConfig.WakuV2Config.TelemetryServerURL,
ClusterID: nodeConfig.ClusterConfig.ClusterID,
EnableMissingMessageVerification: nodeConfig.WakuV2Config.EnableMissingMessageVerification,
}
// Configure peer exchange and discv5 settings based on node type

View File

@ -203,7 +203,7 @@ func insertTorrentConfig(tx *sql.Tx, c *params.NodeConfig) error {
return err
}
func insertWakuV2Config(tx *sql.Tx, c *params.NodeConfig) error {
func insertWakuV2ConfigPreMigration(tx *sql.Tx, c *params.NodeConfig) error {
_, err := tx.Exec(`
INSERT OR REPLACE INTO wakuv2_config (
enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir,
@ -234,24 +234,19 @@ func setWakuV2CustomNodes(tx *sql.Tx, customNodes map[string]string) error {
return nil
}
func insertWakuV2StoreConfig(tx *sql.Tx, c *params.NodeConfig) error {
func insertWakuV2ConfigPostMigration(tx *sql.Tx, c *params.NodeConfig) error {
_, err := tx.Exec(`
UPDATE wakuv2_config
SET enable_store = ?, store_capacity = ?, store_seconds = ?
SET enable_store = ?,
store_capacity = ?,
store_seconds = ?,
use_shard_default_topic = ?,
enable_missing_message_verification = ?
WHERE synthetic_id = 'id'`,
c.WakuV2Config.EnableStore, c.WakuV2Config.StoreCapacity, c.WakuV2Config.StoreSeconds,
c.WakuV2Config.UseShardAsDefaultTopic, c.WakuV2Config.EnableMissingMessageVerification,
)
return err
}
func insertWakuV2ShardConfig(tx *sql.Tx, c *params.NodeConfig) error {
_, err := tx.Exec(`
UPDATE wakuv2_config
SET use_shard_default_topic = ?
WHERE synthetic_id = 'id'`,
c.WakuV2Config.UseShardAsDefaultTopic,
)
if err != nil {
return err
}
@ -266,7 +261,7 @@ func insertWakuV2ShardConfig(tx *sql.Tx, c *params.NodeConfig) error {
return err
}
func insertWakuConfig(tx *sql.Tx, c *params.NodeConfig) error {
func insertWakuV1Config(tx *sql.Tx, c *params.NodeConfig) error {
_, err := tx.Exec(`
INSERT OR REPLACE INTO waku_config (
enabled, light_client, full_node, enable_mailserver, data_dir, minimum_pow, mailserver_password, mailserver_rate_limit, mailserver_data_retention,
@ -351,8 +346,8 @@ func nodeConfigUpgradeInserts() []insertFn {
insertRequireTopics,
insertPushNotificationsServerConfig,
insertShhExtConfig,
insertWakuConfig,
insertWakuV2Config,
insertWakuV1Config,
insertWakuV2ConfigPreMigration,
}
}
@ -375,11 +370,10 @@ func nodeConfigNormalInserts() []insertFn {
insertRequireTopics,
insertPushNotificationsServerConfig,
insertShhExtConfig,
insertWakuConfig,
insertWakuV2Config,
insertWakuV1Config,
insertWakuV2ConfigPreMigration,
insertTorrentConfig,
insertWakuV2StoreConfig,
insertWakuV2ShardConfig,
insertWakuV2ConfigPostMigration,
}
}
@ -684,13 +678,14 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) {
err = tx.QueryRow(`
SELECT enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir,
max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update,
enable_store, store_capacity, store_seconds, use_shard_default_topic
enable_store, store_capacity, store_seconds, use_shard_default_topic, enable_missing_message_verification
FROM wakuv2_config WHERE synthetic_id = 'id'
`).Scan(
&nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.KeepAliveInterval, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode,
&nodecfg.WakuV2Config.DiscoveryLimit, &nodecfg.WakuV2Config.DataDir, &nodecfg.WakuV2Config.MaxMessageSize, &nodecfg.WakuV2Config.EnableConfirmations,
&nodecfg.WakuV2Config.PeerExchange, &nodecfg.WakuV2Config.EnableDiscV5, &nodecfg.WakuV2Config.UDPPort, &nodecfg.WakuV2Config.AutoUpdate,
&nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds, &nodecfg.WakuV2Config.UseShardAsDefaultTopic,
&nodecfg.WakuV2Config.EnableMissingMessageVerification,
)
if err != nil && err != sql.ErrNoRows {
return nil, err

View File

@ -217,6 +217,9 @@ type WakuV2Config struct {
// UseShardAsDefaultTopic indicates whether the default shard should be used instead of the default relay topic
UseShardAsDefaultTopic bool
// EnableMissingMessageVerification indicates whether the storenodes must be queried periodically to retrieve any missing message
EnableMissingMessageVerification bool
}
// ----------

View File

@ -39,35 +39,36 @@ var (
// Config represents the configuration state of a waku node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient
EnablePeerExchangeClient bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery
EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not
DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
DefaultShardedPubsubTopics []string `toml:", omitempty"`
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient
EnablePeerExchangeClient bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery
EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not
DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
DefaultShardedPubsubTopics []string `toml:", omitempty"`
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
EnableMissingMessageVerification bool `toml:",omitempty"`
}
func (c *Config) Validate(logger *zap.Logger) error {

View File

@ -1373,7 +1373,7 @@ func (w *Waku) Start() error {
}
}
w.wg.Add(3)
w.wg.Add(2)
go func() {
defer w.wg.Done()
@ -1441,7 +1441,11 @@ func (w *Waku) Start() error {
//TODO: commenting for now so that only fleet nodes are used.
//Need to uncomment once filter peer scoring etc is implemented.
go w.runPeerExchangeLoop()
go w.checkForMissingMessages()
if w.cfg.EnableMissingMessageVerification {
w.wg.Add(1)
go w.checkForMissingMessages()
}
if w.cfg.LightClient {
// Create FilterManager that will main peer connectivity