diff --git a/appdatabase/migrations/bindata.go b/appdatabase/migrations/bindata.go index 8ad1ec09c..d7c516151 100644 --- a/appdatabase/migrations/bindata.go +++ b/appdatabase/migrations/bindata.go @@ -114,10 +114,8 @@ // 1716385243_no_discovery.up.sql (44B) // 1718785164_max_delivery_attempts_update.up.sql (60B) // 1718978062_nodeconfig_add_connector.up.sql (76B) +// 1720466921_missing_message_verification.up.sql (167B) // doc.go (94B) -// exit_code_1.txt (4B) -// test.log (79B) -// test_1.log (0) package migrations @@ -2465,6 +2463,26 @@ func _1718978062_nodeconfig_add_connectorUpSql() (*asset, error) { return a, nil } +var __1720466921_missing_message_verificationUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\xcc\xb1\x0e\x82\x30\x10\x00\xd0\x9d\xaf\xb8\x6f\x70\x35\x0e\x87\xad\x53\x05\xa3\xd7\xb9\x39\xc9\xb5\xb9\x08\x25\xe1\x00\x7f\xdf\xb8\x3a\xf9\x03\x0f\x03\xf9\x3b\x10\xb6\xc1\xc3\x9b\x5f\xdb\x7e\x48\xc3\x5c\xb3\x16\x40\xe7\xe0\xdc\x87\x78\xed\x40\x2a\x3f\x47\x49\x93\x9a\x69\x2d\x69\x12\x33\x2e\x92\x76\x59\x34\xeb\xc0\xab\xce\x15\xda\xbe\x0f\x1e\x3b\x70\xfe\x82\x31\x10\x64\x1e\x4d\x8e\x4d\x13\x6f\x0e\xe9\xd7\x7e\x78\xfa\x0b\x3d\xc1\xba\x6c\x5f\xe5\x13\x00\x00\xff\xff\x08\xc7\x32\xed\xa7\x00\x00\x00") + +func _1720466921_missing_message_verificationUpSqlBytes() ([]byte, error) { + return bindataRead( + __1720466921_missing_message_verificationUpSql, + "1720466921_missing_message_verification.up.sql", + ) +} + +func _1720466921_missing_message_verificationUpSql() (*asset, error) { + bytes, err := _1720466921_missing_message_verificationUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1720466921_missing_message_verification.up.sql", size: 167, mode: os.FileMode(0644), modTime: time.Unix(1700000000, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4c, 0x84, 0x55, 0x43, 0x82, 0x17, 0x7b, 0x71, 0x2a, 0xc0, 0xb6, 0x53, 0xbc, 0xc0, 0x9f, 0x70, 0xa3, 0x0, 0x32, 0x5b, 0xaa, 0xa, 0x9a, 0x20, 0xa, 0x40, 0x97, 0xc3, 0xfe, 0x35, 0xb3, 0xfa}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xcb\x41\x0e\x02\x31\x08\x05\xd0\x7d\x4f\xf1\x2f\x00\xe8\xca\xc4\xc4\xc3\xa0\x43\x08\x19\x5b\xc6\x96\xfb\xc7\x4d\xdf\xfe\x5d\xfa\x39\xd5\x0d\xeb\xf7\x6d\x4d\xc4\xf3\xe9\x36\x6c\x6a\x19\x3c\xe9\x1d\xe3\xd0\x52\x50\xcf\xa3\xa2\xdb\xeb\xfe\xb8\x6d\xa0\xeb\x74\xf4\xf0\xa9\x15\x39\x16\x28\xc1\x2c\x7b\xb0\x27\x58\xda\x3f\x00\x00\xff\xff\x57\xd4\xd5\x90\x5e\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -2485,66 +2503,6 @@ func docGo() (*asset, error) { return a, nil } -var _exit_code_1Txt = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x32\x34\x32\xe7\x02\x04\x00\x00\xff\xff\x0c\x38\xee\x3e\x04\x00\x00\x00") - -func exit_code_1TxtBytes() ([]byte, error) { - return bindataRead( - _exit_code_1Txt, - "exit_code_1.txt", - ) -} - -func exit_code_1Txt() (*asset, error) { - bytes, err := exit_code_1TxtBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "exit_code_1.txt", size: 4, mode: os.FileMode(0664), modTime: time.Unix(1700000000, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x74, 0x3c, 0x78, 0x50, 0xcc, 0xcf, 0xba, 0x5e, 0x53, 0xa9, 0x0, 0x26, 0x63, 0xec, 0x1d, 0xdd, 0x10, 0x79, 0x31, 0x5a, 0x98, 0xbd, 0xbf, 0xdd, 0xe1, 0xe, 0x60, 0x44, 0xf5, 0x6a, 0xbe, 0xfe}} - return a, nil -} - -var _testLog = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x3c\xc6\xb1\x11\x80\x20\x0c\x05\xd0\x1a\xa7\xc8\x02\x9a\x11\x1c\xc4\xb3\x08\x8a\x98\x3b\x20\x68\x3e\xfb\xdb\xd9\xbd\x95\x88\x42\x56\xdc\x23\x2e\x87\x55\x76\x08\x86\xcf\xfa\x2b\x1b\x4b\xef\xa7\x40\xa2\x78\xe2\xaa\xf9\x15\xa8\x35\x67\x7f\x4a\xd8\x9a\x11\x92\x83\x2e\x2d\xc9\xf7\xe9\x0b\x00\x00\xff\xff\xff\xeb\x51\xe5\x4f\x00\x00\x00") - -func testLogBytes() ([]byte, error) { - return bindataRead( - _testLog, - "test.log", - ) -} - -func testLog() (*asset, error) { - bytes, err := testLogBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "test.log", size: 79, mode: os.FileMode(0664), modTime: time.Unix(1700000000, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xa4, 0x44, 0x38, 0xbc, 0xb4, 0xe5, 0x83, 0xfd, 0x7e, 0x19, 0xb0, 0x78, 0x70, 0x98, 0xc5, 0x9a, 0x6c, 0xeb, 0x5f, 0x68, 0x5, 0x7a, 0x77, 0x28, 0x8d, 0xb2, 0xff, 0x86, 0x47, 0x91, 0x10, 0x11}} - return a, nil -} - -var _test_1Log = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x01\x00\x00\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00") - -func test_1LogBytes() ([]byte, error) { - return bindataRead( - _test_1Log, - "test_1.log", - ) -} - -func test_1Log() (*asset, error) { - bytes, err := test_1LogBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "test_1.log", size: 0, mode: os.FileMode(0664), modTime: time.Unix(1700000000, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}} - return a, nil -} - // Asset loads and returns the asset for the given name. // It returns an error if the asset could not be found or // could not be loaded. @@ -2750,10 +2708,8 @@ var _bindata = map[string]func() (*asset, error){ "1716385243_no_discovery.up.sql": _1716385243_no_discoveryUpSql, "1718785164_max_delivery_attempts_update.up.sql": _1718785164_max_delivery_attempts_updateUpSql, "1718978062_nodeconfig_add_connector.up.sql": _1718978062_nodeconfig_add_connectorUpSql, - "doc.go": docGo, - "exit_code_1.txt": exit_code_1Txt, - "test.log": testLog, - "test_1.log": test_1Log, + "1720466921_missing_message_verification.up.sql": _1720466921_missing_message_verificationUpSql, + "doc.go": docGo, } // AssetDebug is true if the assets were built with the debug flag enabled. @@ -2916,10 +2872,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "1716385243_no_discovery.up.sql": {_1716385243_no_discoveryUpSql, map[string]*bintree{}}, "1718785164_max_delivery_attempts_update.up.sql": {_1718785164_max_delivery_attempts_updateUpSql, map[string]*bintree{}}, "1718978062_nodeconfig_add_connector.up.sql": {_1718978062_nodeconfig_add_connectorUpSql, map[string]*bintree{}}, - "doc.go": {docGo, map[string]*bintree{}}, - "exit_code_1.txt": {exit_code_1Txt, map[string]*bintree{}}, - "test.log": {testLog, map[string]*bintree{}}, - "test_1.log": {test_1Log, map[string]*bintree{}}, + "1720466921_missing_message_verification.up.sql": {_1720466921_missing_message_verificationUpSql, map[string]*bintree{}}, + "doc.go": {docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. diff --git a/appdatabase/migrations/sql/1720466921_missing_message_verification.up.sql b/appdatabase/migrations/sql/1720466921_missing_message_verification.up.sql new file mode 100644 index 000000000..e5cc0eccc --- /dev/null +++ b/appdatabase/migrations/sql/1720466921_missing_message_verification.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE wakuv2_config ADD COLUMN enable_missing_message_verification BOOLEAN DEFAULT false; + +UPDATE wakuv2_config SET enable_missing_message_verification = true; + diff --git a/node/status_node_services.go b/node/status_node_services.go index 1a88cfeae..a61750b42 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -319,25 +319,26 @@ func (b *StatusNode) wakuService(wakuCfg *params.WakuConfig, clusterCfg *params. func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, error) { if b.wakuV2Srvc == nil { cfg := &wakuv2.Config{ - MaxMessageSize: wakucommon.DefaultMaxMessageSize, - Host: nodeConfig.WakuV2Config.Host, - Port: nodeConfig.WakuV2Config.Port, - LightClient: nodeConfig.WakuV2Config.LightClient, - KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval, - Rendezvous: nodeConfig.Rendezvous, - WakuNodes: nodeConfig.ClusterConfig.WakuNodes, - EnableStore: nodeConfig.WakuV2Config.EnableStore, - StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity, - StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds, - DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit, - DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes, - Nameserver: nodeConfig.WakuV2Config.Nameserver, - UDPPort: nodeConfig.WakuV2Config.UDPPort, - AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate, - DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(), - UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic, - TelemetryServerURL: nodeConfig.WakuV2Config.TelemetryServerURL, - ClusterID: nodeConfig.ClusterConfig.ClusterID, + MaxMessageSize: wakucommon.DefaultMaxMessageSize, + Host: nodeConfig.WakuV2Config.Host, + Port: nodeConfig.WakuV2Config.Port, + LightClient: nodeConfig.WakuV2Config.LightClient, + KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval, + Rendezvous: nodeConfig.Rendezvous, + WakuNodes: nodeConfig.ClusterConfig.WakuNodes, + EnableStore: nodeConfig.WakuV2Config.EnableStore, + StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity, + StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds, + DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit, + DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes, + Nameserver: nodeConfig.WakuV2Config.Nameserver, + UDPPort: nodeConfig.WakuV2Config.UDPPort, + AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate, + DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(), + UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic, + TelemetryServerURL: nodeConfig.WakuV2Config.TelemetryServerURL, + ClusterID: nodeConfig.ClusterConfig.ClusterID, + EnableMissingMessageVerification: nodeConfig.WakuV2Config.EnableMissingMessageVerification, } // Configure peer exchange and discv5 settings based on node type diff --git a/nodecfg/node_config.go b/nodecfg/node_config.go index 840db24ef..7eb00d919 100644 --- a/nodecfg/node_config.go +++ b/nodecfg/node_config.go @@ -203,7 +203,7 @@ func insertTorrentConfig(tx *sql.Tx, c *params.NodeConfig) error { return err } -func insertWakuV2Config(tx *sql.Tx, c *params.NodeConfig) error { +func insertWakuV2ConfigPreMigration(tx *sql.Tx, c *params.NodeConfig) error { _, err := tx.Exec(` INSERT OR REPLACE INTO wakuv2_config ( enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir, @@ -234,24 +234,19 @@ func setWakuV2CustomNodes(tx *sql.Tx, customNodes map[string]string) error { return nil } -func insertWakuV2StoreConfig(tx *sql.Tx, c *params.NodeConfig) error { +func insertWakuV2ConfigPostMigration(tx *sql.Tx, c *params.NodeConfig) error { _, err := tx.Exec(` UPDATE wakuv2_config - SET enable_store = ?, store_capacity = ?, store_seconds = ? + SET enable_store = ?, + store_capacity = ?, + store_seconds = ?, + use_shard_default_topic = ?, + enable_missing_message_verification = ? WHERE synthetic_id = 'id'`, c.WakuV2Config.EnableStore, c.WakuV2Config.StoreCapacity, c.WakuV2Config.StoreSeconds, + c.WakuV2Config.UseShardAsDefaultTopic, c.WakuV2Config.EnableMissingMessageVerification, ) - return err -} - -func insertWakuV2ShardConfig(tx *sql.Tx, c *params.NodeConfig) error { - _, err := tx.Exec(` - UPDATE wakuv2_config - SET use_shard_default_topic = ? - WHERE synthetic_id = 'id'`, - c.WakuV2Config.UseShardAsDefaultTopic, - ) if err != nil { return err } @@ -266,7 +261,7 @@ func insertWakuV2ShardConfig(tx *sql.Tx, c *params.NodeConfig) error { return err } -func insertWakuConfig(tx *sql.Tx, c *params.NodeConfig) error { +func insertWakuV1Config(tx *sql.Tx, c *params.NodeConfig) error { _, err := tx.Exec(` INSERT OR REPLACE INTO waku_config ( enabled, light_client, full_node, enable_mailserver, data_dir, minimum_pow, mailserver_password, mailserver_rate_limit, mailserver_data_retention, @@ -351,8 +346,8 @@ func nodeConfigUpgradeInserts() []insertFn { insertRequireTopics, insertPushNotificationsServerConfig, insertShhExtConfig, - insertWakuConfig, - insertWakuV2Config, + insertWakuV1Config, + insertWakuV2ConfigPreMigration, } } @@ -375,11 +370,10 @@ func nodeConfigNormalInserts() []insertFn { insertRequireTopics, insertPushNotificationsServerConfig, insertShhExtConfig, - insertWakuConfig, - insertWakuV2Config, + insertWakuV1Config, + insertWakuV2ConfigPreMigration, insertTorrentConfig, - insertWakuV2StoreConfig, - insertWakuV2ShardConfig, + insertWakuV2ConfigPostMigration, } } @@ -684,13 +678,14 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) { err = tx.QueryRow(` SELECT enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir, max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update, - enable_store, store_capacity, store_seconds, use_shard_default_topic + enable_store, store_capacity, store_seconds, use_shard_default_topic, enable_missing_message_verification FROM wakuv2_config WHERE synthetic_id = 'id' `).Scan( &nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.KeepAliveInterval, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode, &nodecfg.WakuV2Config.DiscoveryLimit, &nodecfg.WakuV2Config.DataDir, &nodecfg.WakuV2Config.MaxMessageSize, &nodecfg.WakuV2Config.EnableConfirmations, &nodecfg.WakuV2Config.PeerExchange, &nodecfg.WakuV2Config.EnableDiscV5, &nodecfg.WakuV2Config.UDPPort, &nodecfg.WakuV2Config.AutoUpdate, &nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds, &nodecfg.WakuV2Config.UseShardAsDefaultTopic, + &nodecfg.WakuV2Config.EnableMissingMessageVerification, ) if err != nil && err != sql.ErrNoRows { return nil, err diff --git a/params/config.go b/params/config.go index fcb64b19b..c3bffea98 100644 --- a/params/config.go +++ b/params/config.go @@ -217,6 +217,9 @@ type WakuV2Config struct { // UseShardAsDefaultTopic indicates whether the default shard should be used instead of the default relay topic UseShardAsDefaultTopic bool + + // EnableMissingMessageVerification indicates whether the storenodes must be queried periodically to retrieve any missing message + EnableMissingMessageVerification bool } // ---------- diff --git a/wakuv2/config.go b/wakuv2/config.go index 9785bad66..02ef66ad1 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -39,35 +39,36 @@ var ( // Config represents the configuration state of a waku node. type Config struct { - MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node - Host string `toml:",omitempty"` - Port int `toml:",omitempty"` - EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient - EnablePeerExchangeClient bool `toml:",omitempty"` - KeepAliveInterval int `toml:",omitempty"` - MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol - MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol - LightClient bool `toml:",omitempty"` // Indicates if the node is a light client - WakuNodes []string `toml:",omitempty"` - Rendezvous bool `toml:",omitempty"` - DiscV5BootstrapNodes []string `toml:",omitempty"` - Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery - Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery - EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not - DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client - AutoUpdate bool `toml:",omitempty"` - UDPPort int `toml:",omitempty"` - EnableStore bool `toml:",omitempty"` - StoreCapacity int `toml:",omitempty"` - StoreSeconds int `toml:",omitempty"` - TelemetryServerURL string `toml:",omitempty"` - TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service - DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not) - DefaultShardedPubsubTopics []string `toml:", omitempty"` - UseShardAsDefaultTopic bool `toml:",omitempty"` - ClusterID uint16 `toml:",omitempty"` - EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations - SkipPublishToTopic bool `toml:",omitempty"` // Used in testing + MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node + Host string `toml:",omitempty"` + Port int `toml:",omitempty"` + EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient + EnablePeerExchangeClient bool `toml:",omitempty"` + KeepAliveInterval int `toml:",omitempty"` + MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol + MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol + LightClient bool `toml:",omitempty"` // Indicates if the node is a light client + WakuNodes []string `toml:",omitempty"` + Rendezvous bool `toml:",omitempty"` + DiscV5BootstrapNodes []string `toml:",omitempty"` + Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery + Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery + EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not + DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client + AutoUpdate bool `toml:",omitempty"` + UDPPort int `toml:",omitempty"` + EnableStore bool `toml:",omitempty"` + StoreCapacity int `toml:",omitempty"` + StoreSeconds int `toml:",omitempty"` + TelemetryServerURL string `toml:",omitempty"` + TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service + DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not) + DefaultShardedPubsubTopics []string `toml:", omitempty"` + UseShardAsDefaultTopic bool `toml:",omitempty"` + ClusterID uint16 `toml:",omitempty"` + EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations + SkipPublishToTopic bool `toml:",omitempty"` // Used in testing + EnableMissingMessageVerification bool `toml:",omitempty"` } func (c *Config) Validate(logger *zap.Logger) error { diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 573744731..babe316ff 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1373,7 +1373,7 @@ func (w *Waku) Start() error { } } - w.wg.Add(3) + w.wg.Add(2) go func() { defer w.wg.Done() @@ -1441,7 +1441,11 @@ func (w *Waku) Start() error { //TODO: commenting for now so that only fleet nodes are used. //Need to uncomment once filter peer scoring etc is implemented. go w.runPeerExchangeLoop() - go w.checkForMissingMessages() + + if w.cfg.EnableMissingMessageVerification { + w.wg.Add(1) + go w.checkForMissingMessages() + } if w.cfg.LightClient { // Create FilterManager that will main peer connectivity