fix: TestFetchRealCommunity with shards fleet (#4553)

* fix: TestFetchRealCommunity with shards fleet
* fix: DefaultShardPubsubTopic
* chore: print waku query request id
This commit is contained in:
Igor Sirotin 2024-01-16 13:38:41 +03:00 committed by GitHub
parent 94bee02ef7
commit a5acffc001
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 298 additions and 61 deletions

View File

@ -5,6 +5,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
types "github.com/status-im/status-go/eth-node/types"
settings "github.com/status-im/status-go/multiaccounts/settings"
)

View File

@ -6,6 +6,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
settings "github.com/status-im/status-go/multiaccounts/settings"
)

View File

@ -9,6 +9,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
types "github.com/status-im/status-go/eth-node/types"
)

View File

@ -443,19 +443,18 @@ func (m *Manager) runOwnerVerificationLoop() {
}
func (m *Manager) ValidateCommunityByID(communityID types.HexBytes) (*CommunityResponse, error) {
communityToValidate, err := m.persistence.getCommunityToValidateByID(communityID)
communitiesToValidate, err := m.persistence.getCommunityToValidateByID(communityID)
if err != nil {
m.logger.Error("failed to validate community by ID", zap.String("id", communityID.String()), zap.Error(err))
return nil, err
}
return m.validateCommunity(communityToValidate)
return m.validateCommunity(communitiesToValidate)
}
func (m *Manager) validateCommunity(communityToValidateData []communityToValidate) (*CommunityResponse, error) {
for _, communityToValidate := range communityToValidateData {
signer, description, err := UnwrapCommunityDescriptionMessage(communityToValidate.payload)
for _, community := range communityToValidateData {
signer, description, err := UnwrapCommunityDescriptionMessage(community.payload)
if err != nil {
m.logger.Error("failed to unwrap community", zap.Error(err))
continue
@ -468,12 +467,12 @@ func (m *Manager) validateCommunity(communityToValidateData []communityToValidat
continue
}
m.logger.Info("validating community", zap.String("id", types.EncodeHex(communityToValidate.id)), zap.String("signer", common.PubkeyToHex(signer)))
m.logger.Info("validating community", zap.String("id", types.EncodeHex(community.id)), zap.String("signer", common.PubkeyToHex(signer)))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
owner, err := m.ownerVerifier.SafeGetSignerPubKey(ctx, chainID, types.EncodeHex(communityToValidate.id))
owner, err := m.ownerVerifier.SafeGetSignerPubKey(ctx, chainID, types.EncodeHex(community.id))
if err != nil {
m.logger.Error("failed to get owner", zap.Error(err))
continue
@ -486,10 +485,10 @@ func (m *Manager) validateCommunity(communityToValidateData []communityToValidat
}
// TODO: handle shards
response, err := m.HandleCommunityDescriptionMessage(signer, description, communityToValidate.payload, ownerPK, nil)
response, err := m.HandleCommunityDescriptionMessage(signer, description, community.payload, ownerPK, nil)
if err != nil {
m.logger.Error("failed to handle community", zap.Error(err))
err = m.persistence.DeleteCommunityToValidate(communityToValidate.id, communityToValidate.clock)
err = m.persistence.DeleteCommunityToValidate(community.id, community.clock)
if err != nil {
m.logger.Error("failed to delete community to validate", zap.Error(err))
}
@ -498,9 +497,9 @@ func (m *Manager) validateCommunity(communityToValidateData []communityToValidat
if response != nil {
m.logger.Info("community validated", zap.String("id", types.EncodeHex(communityToValidate.id)), zap.String("signer", common.PubkeyToHex(signer)))
m.logger.Info("community validated", zap.String("id", types.EncodeHex(community.id)), zap.String("signer", common.PubkeyToHex(signer)))
m.publish(&Subscription{TokenCommunityValidated: response})
err := m.persistence.DeleteCommunitiesToValidateByCommunityID(communityToValidate.id)
err := m.persistence.DeleteCommunitiesToValidateByCommunityID(community.id)
if err != nil {
m.logger.Error("failed to delete communities to validate", zap.Error(err))
}

View File

@ -70,18 +70,34 @@ type MessengerStoreNodeRequestSuite struct {
type singleResult struct {
EnvelopesCount int
Envelopes []*wakuV2common.ReceivedMessage
ShardEnvelopes []*wakuV2common.ReceivedMessage
Error error
FetchedCommunity *communities.Community
}
func (r *singleResult) ShardEnvelopesHashes() []string {
out := make([]string, 0, len(r.ShardEnvelopes))
for _, e := range r.ShardEnvelopes {
out = append(out, e.Hash().String())
}
return out
}
func (r *singleResult) EnvelopesHashes() []string {
out := make([]string, 0, len(r.Envelopes))
for _, e := range r.Envelopes {
out = append(out, e.Hash().String())
}
return out
}
func (r *singleResult) toString() string {
resultString := ""
communityString := ""
if r.FetchedCommunity != nil {
communityString = fmt.Sprintf("clock: %d (%s), name: %s, members: %d",
communityString = fmt.Sprintf("clock: %d, name: '%s', members: %d",
r.FetchedCommunity.Clock(),
time.Unix(int64(r.FetchedCommunity.Clock()), 0).UTC(),
r.FetchedCommunity.Name(),
len(r.FetchedCommunity.Members()),
)
@ -90,12 +106,24 @@ func (r *singleResult) toString() string {
if r.Error != nil {
resultString = fmt.Sprintf("error: %s", r.Error.Error())
} else {
resultString = fmt.Sprintf("envelopes fetched: %d, community %s",
resultString = fmt.Sprintf("envelopes fetched: %d, community - %s",
r.EnvelopesCount, communityString)
}
for i, envelope := range r.ShardEnvelopes {
resultString += fmt.Sprintf("\n\tshard envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
i+1,
envelope.Hash().Hex(),
envelope.Envelope.Message().GetTimestamp(),
time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(),
len(envelope.Envelope.Message().Payload),
envelope.Envelope.Message().ContentTopic,
envelope.Envelope.PubsubTopic(),
)
}
for i, envelope := range r.Envelopes {
resultString += fmt.Sprintf("\n\tenvelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
resultString += fmt.Sprintf("\n\tdescription envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
i+1,
envelope.Hash().Hex(),
envelope.Envelope.Message().GetTimestamp(),
@ -119,7 +147,7 @@ func (s *MessengerStoreNodeRequestSuite) SetupTest() {
s.cancel = make(chan struct{}, 10)
storeNodeLogger := s.logger.Named("store-node-waku")
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true, false)
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true, false, 0)
storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses()
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
@ -139,7 +167,7 @@ func (s *MessengerStoreNodeRequestSuite) TearDown() {
func (s *MessengerStoreNodeRequestSuite) createOwner() {
wakuLogger := s.logger.Named("owner-waku-node")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false)
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false, 0)
s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("owner-messenger")
@ -152,7 +180,7 @@ func (s *MessengerStoreNodeRequestSuite) createOwner() {
func (s *MessengerStoreNodeRequestSuite) createBob() {
wakuLogger := s.logger.Named("bob-waku-node")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false)
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false, 0)
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("bob-messenger")
@ -699,26 +727,205 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
s.requireCommunitiesEqual(fetchedCommunity, community)
}
// TestFetchRealCommunity is intended to only run locally to check the community description in all of the store nodes.
// Shouldn't be executed in CI, because it relies on connection to the real network.
//
// To run this test, first set `runLocalTests` to true.
// Then carefully set all of communityID, communityShard, fleet and other const variables.
// NOTE: I only tested it with the default parameters, but in theory it should work for any configuration.
/*
TestFetchRealCommunity is not actually a test, but an utility to check the community description in all of the store nodes.
It's intended to only run locally and shouldn't be executed in CI, because it relies on connection to the real network.
TODO: It would be nice to move this code to a real utility in /cmd.
It should allow us to fairly verify the community owner and do other good things.
To run this test, first set `runLocalTests` to true.
Then carefully set all of communityID, communityShard, fleet and other const variables.
NOTE: I only tested it with the default parameters, but in theory it should work for any configuration.
*/
type testFetchRealCommunityExampleTokenInfo struct {
ChainID uint64
ContractAddress string
}
var testFetchRealCommunityExample = []struct {
CommunityID string
CommunityShard *shard.Shard // WARNING: I didn't test a sharded community
Fleet string
UseShardAsDefaultTopic bool
ClusterID uint16
UserPrivateKeyString string // When empty a new user will be created
// Setup OwnerPublicKey and CommunityTokens if the community has owner token
// This is needed to mock the owner verification
OwnerPublicKey string
CommunityTokens []testFetchRealCommunityExampleTokenInfo
// Fill these if you know what envelopes are expected.
// The test will fail if fetched array doesn't equal to the expected one.
CheckExpectedEnvelopes bool
ExpectedShardEnvelopes []string
ExpectedDescriptionEnvelopes []string
}{
{
//Example 1, status.prod fleet
CommunityID: "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a",
CommunityShard: nil,
Fleet: params.FleetStatusProd,
UseShardAsDefaultTopic: false,
ClusterID: shard.UndefinedShardValue,
},
{
// Example 3, shards.test fleet
// https://status.app/c/CxiACi8KFGFwIHJlcSAxIHN0dCBiZWMgbWVtEgdkc2Fkc2FkGAMiByM0MzYwREYqAxkrHAM=#zQ3shwDYZHtrLE7NqoTGjTWzWUu6hom5D4qxfskLZfgfyGRyL
CommunityID: "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b",
Fleet: params.FleetShardsTest,
UseShardAsDefaultTopic: true,
ClusterID: shard.MainStatusShardCluster,
CheckExpectedEnvelopes: true,
ExpectedShardEnvelopes: []string{
"0x8173eecd7ff9ebcaae3dde0e704daf9bdeb6d33b0d8505a67e7dc56d0d8fc07c",
"0x596bbafbe0e0b625d165378cd4c7641a4d23aa1145c705aad666ddeaf60c88cd",
"0x8a1ee798f3657da5a463e5f878ab2455d05b8f552359b58330ccd7fa4f5624b0",
"0x97bcde2103a01984bb45a8590a6cb6972411445a1b2d40e181d5f2b5366fa5f1",
"0x26e3c0c880d1a2c4e81bf4fffbdb8b7e1ecc91fce7c6a05ee87d200d62ffc11e",
"0x1a8820bd61ebcc9de75c25f31c9b05eb6e880a5a4902679bb6ce2f43f61bf159",
"0xce450cfb5f79d761f34dea5b2ccec63751886e43ae63477e12f517c31f800aeb",
"0x9607bd1cf08355c44bcce055da197ba177201882736fa8874910194ccdaa8760",
"0x0c4b989ca69f529e571e6ea8b3230a85e057d8b2ae6147d1fedc2a01f2816ed6",
"0xe40ea64c9007a064b6324b614976510f2a433c9f84d87139df8f66b536e37ee4",
"0x7a028466a095e40650bb0ef16e903309b0c38c5a7cb7e2e9debd0acf2151448d",
"0x96419c6be375b2b348778d4694e3a491de84eecde601d5d405a0e72e9cece4a1",
"0xbcaeb5e86128638fab7203428daddd741df44ceeabe7d9d25936a10cd0a8b808",
"0x2e0b5872cb5a7c9a3273048eb2dfcb1d6a28faad3fe307a7db6c2dbaca9ce462",
"0xfa96bbe4125514ce73c52ef3ccb1c4ad9c4ad4afe8803de8ab9309fe9483b1d0",
"0xdeddfc82f70cce77c26959d91851fbc33afe648428c3e6ea349b2a2456b92111",
"0x5b12f17d7b712071f57bb48b7dcd0d6568ff5e7c3f8b3811013aea8dde9c6243",
"0x18928fd044482c75518162104d487e6fe504f086eb8c5e9f21aa4bce2811d0fa",
"0x543c156ced76138d69229a39425a0a1cabd617770e023333c10501b979f52d61",
"0xf46ea6bf5ab6a70662bcf227cc5d2c8c7a70ce42a88e5bb7ebe9e598668a8ae2",
"0xedb9628dc1ce5b0ec899c3813dd4159a2e06fb3dc88ffaae047e927c804ad0b3",
"0x16248eccc3544af3fc4a73467d0925ca2f3741eb623516ee369f710e4aa8a3ef",
"0x6a85f784a9004b56bbb47d87f5541173f05bd61ff5b26e41c714adbb5516e9ff",
"0x91e320be2cb5c6178027390cbce165fe088a1a35e1442382064ddbc9aabea8a2",
"0x676496dd36ae40e184863725bfb7425e46d916f73f4b0dd5d10324f4e9325da6",
},
ExpectedDescriptionEnvelopes: []string{
"0xe2c38667ee160861b3dc5a00e4422f47de1303c8b61f0a33c4853ce0b71d0ae4",
"0x7d8392baa9dd134e43287e58d69b8c9f50aa5c144adda6a3c7d32f00c5dea309",
"0xf74918d445709ccf9c29e776d27e9b7dc31f25a28473e8fcd89ed9de8a2e6df4",
"0xafd7e9b6245d88ea2b4fe70265aa3c5ce5618827c1092f8e3058315ac27c5b98",
"0x4d16bd4fcfc2d8736dc29d8c7287671f7e1df62af74943dc40a39ae18f388a07",
"0xd101f14bcf6bb9a5e72b934b3e74ebe3f77774037cb5b193803b264d69bfb9bf",
"0xaa857389e8886401678690bb4dbc664486bd7039427ca53e826197b303696cff",
"0x4443448e575824330a96d5114a9a7d3fe0ee7168ab6c7a646057ca4502fb91d2",
"0xfbb79b8d0ee1a61109c543cbe580412fe6d23de33d163006f74fef4addbdad37",
"0x51bc55c732e0e9db40fd8865a8ae20cbd99bfd4f95c62cf8b591e793d9b642d2",
"0xc28ca742c0e159a60941fb68ee5832016b510afe54e8ee8bb2200495ac29f1e3",
"0xf44f1714743a55f0170ba3627390d84cb9307326028abf7236222c93104db833",
"0x69d067da262b124d2eb6a8ba9f08e0a1ad66afb8d7ff3641a256992fff53a6de",
"0xa86a08457854fbea347cdd92fd390a330a9971967c551bebc53b18ccfea876fe",
"0x592850f8bc3c6079826f168971746bd2a1b50a5011fe3b233ead6c72c92a3373",
"0x1b3151d7b9b37350e86c937dc2e7964d472815bbf9275714bcb16a0c4327fe3d",
"0x1f2beca64e52f996127647cf3f3abd2e4fe501646fc39e98713ae064333388b3",
"0xeafc8f9d3114426c08748e6171710874074fb1eb732d9364830ce9b58955c83c",
"0x6e014a5ec75465efaf036353ec5811b8f710f608a04002925eba3a0a37a30423",
"0xf7e12a5829cf90e4272132e7b62c5bf0dd09100c8d498c66c9740a798db559b3",
"0x62da7e828862c3c8692cbd077c5a62266d811764184378e55f9f1066510b4652",
"0x74201394d05b914bc7e6ce4d2fcf0b119491c80644f48ac9fd37b842e4a0275a",
"0x9d4c0b1be53810c45c2fa744baa8d16ef4ae3a319b09043f4b4e053127461bf0",
"0x1937979514ea1dba8ab3b621fc3d0a3f6246b4bf1f9b4073888b8dbd9b4a765a",
"0xf767f3f36fdecb5ef6650542232334df836bfca1e7f72f1215df50d3f9f9c9bc",
"0x3a06002325502bc39a77962241fe274d4e88f61762194d321f9cc95272ed4a74",
"0x13caae58261c181d4974d7a68e0b7c8580c3cc569840179d53ae76407548d8b8",
"0x36f3b4afbcc4177a7aef26ad567839ffce51896d4c40d0a08d222cebd1255e3b",
"0x159e685bbab26a5d54ab817c93c9c610055bcf2af75290abcc9a84f1b85a2de9",
"0x1fd5ff73d7ea9a19f282bd0716f04a5e86b7c515839f0c721f66b3fe99161054",
"0x95b1e9ada4913ca809c9c28fc225a21753f18a90253660750900c78f79ad2a00",
"0x4334826934a7cbfb7446ec9d581fa6433c5d1f7f51b97f24717f55cffa320c65",
"0x0c01d07108c448797ffd14a2152fd38d1764c8a9c5e2f3da12f70551588add7a",
"0xe7071c6587fc277c4f4c0d7e4575de1a0843d3cf6c2a4aac79be79edc1608038",
"0x5da4e482f3e6eacf080db685e00c199c8cbbad9a8f43b1d94944426444a7a84a",
"0x638f551acdd7ccffd6a40ad12bbba1da8fd8a58157fdf9625b12d4a95b4eef71",
"0xa1a52c28e0481f6004d98bbce906676fff67f04246454bd33fba02c640355af0",
"0xe0300eb9e0f215ace491b1104665b48b9f6bff039af40e0cfc52a3ce766e747f",
"0xd092c04d51ee963d59953324d84188a0c1636e8600cb0f5f6f3f4f826d70c8f3",
"0x8d94bbfee687d534361fc3069079cf4e4f7db2a179d24e6419f67e38b5f0bd34",
"0x1fd7a4d2c04fca3875126b7a951f619b4da0000ca47496df0c2fb1048a145108",
"0xbbefbd116cbb23de193318b328412addc500af965d31ba481d70fa1d9e99461d",
"0xaa4e0e8bd820438e22b93371bda24a29922d33c15fb312b343d2e81a22cbdd95",
"0x76aef29ea4dde107c22c520efb2a4516b69ae83bc237281d9990f68397d801f5",
"0x804789119513a065d892cba5d240cb4d89d7329aeee93fcd8e85379a4d362fc9",
"0x9029b4a13903a3369e3466f1bfabae3f26b6721628db138eaba25c1f55f6fc1e",
"0xee38c209cb95035a289034c737e5775877145efea31b2a01f7c9241ff02f3e92",
"0x3e76da87895ca821db3b7ed7dc6949557c620d9cbcaf97af39ed4955d37b734e",
"0xf5e77eb8f9a5c52e09a56dd5e461bfee6cf9a73e1253f1d41bcde81fe3646997",
"0x083e06375c366283e541b249ea8646c3f31feb970078e95861ea399f0a57d09a",
"0xcd7db07ba557ec1ba0104909fdb958661c60c82213a75e8d15e7b262ef4f58b7",
"0x57b49dc83e1d3ac7b56bb7d758a9bf448339593311103bce4f0a53028587d577",
"0xb08cd92a5ec6f44a6129a60107132ca17d5fef29fb2bc5ebba14028d57a8038e",
"0x76959f98c8c734307a985294c26f008f3f705912aab02a5b3a0602a8598c02e2",
"0xd8ad7df58ffeec20b16a140bdd91484a34fca3ad7fc602043530ca63c307809a",
"0x6872ef39653bb208ef51f5b11c4bda3eb494a13f5da14e33687d30aef99ef383",
"0x0fabebc2e0c02f4add94886216b01ecdbd34929ad303d1a20a4505bf729038f6",
"0xc18270cd532bb3d34f62704feb722c40be48aedb5ad38d4d09fd67f5843b686d",
"0xbc16217ece82998783d4209ed3bc3f2f33d92630e43933b0129eb8b792500a3f",
"0xbda651e3b9c82f4bcf5b16252407fc888952820c842c49c06b4f01c8127e359a",
"0xb4b1799950c6aca3b011ffb775d0f973437d7d46e40cf7b379ff736d08f24eb2",
"0x38f12fb09c71dd720cacbb2102ac78ad6fbf830558adc7af9fb773f39e728bdc",
"0x489eb6fa2f5ee5b2a071c7083bf36a0a6cb4ec96049707d25843d9a97b4ac7be",
"0x64ea5655c8caf89a53c94edd5a47ba750d9fbcf099ec0dcd4026656b044486f1",
"0x501aee1c5da6aaeaae14abffefbc377b59ebe3fcaa9981bc83bfeffb25344749",
"0x9a3d360ea866102a6268ffd2001617c442b74b221d131fb3c08ae29bfac18203",
},
},
{
//Example 1, shards.test fleet
CommunityID: "0x02471dd922756a3a50b623e59cf3b99355d6587e43d5c517eb55f9aea9d3fe9fe9",
Fleet: params.FleetShardsTest,
UseShardAsDefaultTopic: true,
ClusterID: shard.MainStatusShardCluster,
CheckExpectedEnvelopes: true,
ExpectedShardEnvelopes: []string{
"0xc3e68e838d09e0117b3f3fd27aabe5f5a509d13e9045263c78e6890953d43547",
"0x5ee13d052bedb855ce2b9ba6f43c78233fbd4e6539a3bdf156497053c6ddf76d",
"0xfb6638b7e050f9323a0fe7b84986b5c6f8827965e67e3b3bd0fea21cf24e43de",
},
ExpectedDescriptionEnvelopes: []string{
"0x5b4fa95d430c939c1cbbb26175eabfb4ee058d508c6b4c0e26624958ba02c3ce",
"0xbf44409ee40dea7816186b37a45dfebabcee59f76855ad5af663ccdf598861ab",
"0x98d98453f6017517d0114989da0938aad59a3ad9a10839c181f453283f64f5c9",
},
},
}
func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
if !runLocalTests {
return
}
const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a"
var communityShard *shard.Shard
exampleToRun := testFetchRealCommunityExample[2]
const fleet = params.FleetStatusProd
const useShardAsDefaultTopic = false
const clusterID = 0
const userPrivateKeyString = "" // When empty a new user will be created
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
// Test configuration
communityID := exampleToRun.CommunityID
communityShard := exampleToRun.CommunityShard
fleet := exampleToRun.Fleet
useShardAsDefaultTopic := exampleToRun.UseShardAsDefaultTopic
clusterID := exampleToRun.ClusterID
userPrivateKeyString := exampleToRun.UserPrivateKeyString
ownerPublicKey := exampleToRun.OwnerPublicKey
communityTokens := exampleToRun.CommunityTokens
// Prepare things depending on the configuration
nodesList := mailserversDB.DefaultMailserversByFleet(fleet)
descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID)))
communityIDBytes, err := types.DecodeHex(communityID)
s.Require().NoError(err)
// update mock - the signer for the community returned by the contracts should be owner
for _, communityToken := range communityTokens {
s.collectiblesServiceMock.SetSignerPubkeyForCommunity(communityIDBytes, ownerPublicKey)
s.collectiblesServiceMock.SetMockCollectibleContractData(communityToken.ChainID, communityToken.ContractAddress,
&communitytokens.CollectibleContractData{TotalSupply: &bigint.BigInt{}})
}
results := map[string]singleResult{}
wg := sync.WaitGroup{}
@ -745,7 +952,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
wakuLogger := s.logger.Named(fmt.Sprintf("user-waku-node-%d", i))
messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i))
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, useShardAsDefaultTopic)
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, useShardAsDefaultTopic, clusterID)
userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)
//
@ -782,6 +989,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
Fleet: localFleet,
ClusterID: clusterID,
}),
WithCommunityTokensService(s.collectiblesServiceMock),
}
// Create user without `createBob` func to force desired fleet
@ -796,7 +1004,11 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
// Setup envelopes watcher to gather fetched envelopes
s.setupEnvelopesWatcher(wakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) {
s.setupEnvelopesWatcher(wakuV2, &shardContentTopic, func(envelope *wakuV2common.ReceivedMessage) {
result.ShardEnvelopes = append(result.ShardEnvelopes, envelope)
})
s.setupEnvelopesWatcher(wakuV2, &descriptionContentTopic, func(envelope *wakuV2common.ReceivedMessage) {
result.Envelopes = append(result.Envelopes, envelope)
})
@ -823,10 +1035,20 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
wg.Wait()
// Print the results
for storeNodeName, result := range results {
fmt.Printf("%s --- %s\n", storeNodeName, result.toString())
}
// Check that results has no errors and contain correct envelopes
for storeNodeName, result := range results {
s.Require().NoError(result.Error)
if exampleToRun.CheckExpectedEnvelopes {
s.Require().Equal(exampleToRun.ExpectedShardEnvelopes, result.ShardEnvelopesHashes(),
fmt.Sprintf("wrong shard envelopes for store node %s", storeNodeName))
s.Require().Equal(exampleToRun.ExpectedDescriptionEnvelopes, result.EnvelopesHashes(),
fmt.Sprintf("wrong envelopes for store node %s", storeNodeName))
}
}
}
func (s *MessengerStoreNodeRequestSuite) TestFetchingCommunityWithOwnerToken() {

View File

@ -251,10 +251,10 @@ func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Durati
s.Require().True(available)
}
func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool, useShardAsDefaultTopic bool) *waku2.Waku {
func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool, useShardAsDefaultTopic bool, clusterID uint16) *waku2.Waku {
wakuConfig := &waku2.Config{
DefaultShardPubsubTopic: "", // TODO: Empty string should work fine, for default value if not.
UseShardAsDefaultTopic: useShardAsDefaultTopic,
UseShardAsDefaultTopic: useShardAsDefaultTopic,
ClusterID: clusterID,
}
var onPeerStats func(connStatus types.ConnStatus)
@ -315,7 +315,7 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s
nodes := make([]*waku2.Waku, len(nodeNames))
for i, name := range nodeNames {
logger := parentLogger.With(zap.String("name", name+"-waku"))
wakuNode := NewWakuV2(s, logger, true, false, false)
wakuNode := NewWakuV2(s, logger, true, false, false, 0)
nodes[i] = wakuNode
}

View File

@ -21,6 +21,8 @@ package wakuv2
import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/protocol/common/shard"
ethdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/status-im/status-go/wakuv2/common"
@ -56,15 +58,14 @@ type Config struct {
}
var DefaultConfig = Config{
MaxMessageSize: common.DefaultMaxMessageSize,
Host: "0.0.0.0",
Port: 0,
KeepAliveInterval: 10, // second
DiscoveryLimit: 20,
MinPeersForRelay: 1, // TODO: determine correct value with Vac team
MinPeersForFilter: 2, // TODO: determine correct value with Vac team and via testing
AutoUpdate: false,
DefaultShardPubsubTopic: relay.DefaultWakuTopic,
MaxMessageSize: common.DefaultMaxMessageSize,
Host: "0.0.0.0",
Port: 0,
KeepAliveInterval: 10, // second
DiscoveryLimit: 20,
MinPeersForRelay: 1, // TODO: determine correct value with Vac team
MinPeersForFilter: 2, // TODO: determine correct value with Vac team and via testing
AutoUpdate: false,
}
func setDefaults(cfg *Config) *Config {
@ -97,7 +98,11 @@ func setDefaults(cfg *Config) *Config {
}
if cfg.DefaultShardPubsubTopic == "" {
cfg.DefaultShardPubsubTopic = DefaultConfig.DefaultShardPubsubTopic
if cfg.UseShardAsDefaultTopic {
cfg.DefaultShardPubsubTopic = shard.DefaultShardPubsubTopic()
} else {
cfg.DefaultShardPubsubTopic = relay.DefaultWakuTopic
}
}
return cfg

View File

@ -249,12 +249,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
EnableDiscV5: cfg.EnableDiscV5,
}
if waku.cfg.UseShardAsDefaultTopic {
waku.settings.DefaultPubsubTopic = cfg.DefaultShardPubsubTopic
} else {
waku.settings.DefaultPubsubTopic = relay.DefaultWakuTopic
}
waku.settings.DefaultPubsubTopic = cfg.DefaultShardPubsubTopic
waku.filters = common.NewFilters(waku.settings.DefaultPubsubTopic, waku.logger)
waku.bandwidthCounter = metrics.NewBandwidthCounter()
@ -1110,7 +1105,10 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
return envelope.Hash(), nil
}
func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (*store.Result, error) {
func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, requestID []byte, opts []store.HistoryRequestOption) (*store.Result, error) {
opts = append(opts, store.WithRequestID(requestID))
strTopics := make([]string, len(topics))
for i, t := range topics {
strTopics[i] = t.ContentTopic()
@ -1125,18 +1123,28 @@ func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
PubsubTopic: pubsubTopic,
}
w.logger.Debug("store.query", zap.Int64p("startTime", query.StartTime), zap.Int64p("endTime", query.EndTime), zap.Strings("contentTopics", query.ContentTopics), zap.String("pubsubTopic", query.PubsubTopic), zap.Stringer("peerID", peerID))
w.logger.Debug("store.query",
zap.String("requestID", hexutil.Encode(requestID)),
zap.Int64p("startTime", query.StartTime),
zap.Int64p("endTime", query.EndTime),
zap.Strings("contentTopics", query.ContentTopics),
zap.String("pubsubTopic", query.PubsubTopic),
zap.Stringer("peerID", peerID))
return w.node.Store().Query(ctx, query, opts...)
}
func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption, processEnvelopes bool) (cursor *storepb.Index, envelopesCount int, err error) {
requestID := protocol.GenerateRequestID()
opts = append(opts, store.WithRequestID(requestID))
pubsubTopic = w.getPubsubTopic(pubsubTopic)
result, err := w.query(ctx, peerID, pubsubTopic, topics, from, to, opts)
result, err := w.query(ctx, peerID, pubsubTopic, topics, from, to, requestID, opts)
if err != nil {
w.logger.Error("error querying storenode", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", peerID.String()), zap.Error(err))
w.logger.Error("error querying storenode",
zap.String("requestID", hexutil.Encode(requestID)),
zap.String("peerID", peerID.String()),
zap.Error(err))
if w.onHistoricMessagesRequestFailed != nil {
w.onHistoricMessagesRequestFailed(requestID, peerID, err)
}

View File

@ -182,7 +182,7 @@ func TestBasicWakuV2(t *testing.T) {
b.InitialInterval = 500 * time.Millisecond
}
err = tt.RetryWithBackOff(func() error {
storeResult, err := w.query(context.Background(), storeNode.PeerID, relay.DefaultWakuTopic, []common.TopicType{contentTopic}, uint64(timestampInSeconds-int64(marginInSeconds)), uint64(timestampInSeconds+int64(marginInSeconds)), []store.HistoryRequestOption{})
storeResult, err := w.query(context.Background(), storeNode.PeerID, relay.DefaultWakuTopic, []common.TopicType{contentTopic}, uint64(timestampInSeconds-int64(marginInSeconds)), uint64(timestampInSeconds+int64(marginInSeconds)), []byte{}, []store.HistoryRequestOption{})
if err != nil || len(storeResult.Messages) == 0 {
// in case of failure extend timestamp margin up to 40secs
if marginInSeconds < 40 {
@ -451,7 +451,7 @@ func TestWakuV2Store(t *testing.T) {
marginInSeconds := 5
// Query the second node's store for the message
storeResult, err := w1.query(context.Background(), w2.node.Host().ID(), relay.DefaultWakuTopic, []common.TopicType{contentTopic}, uint64(timestampInSeconds-int64(marginInSeconds)), uint64(timestampInSeconds+int64(marginInSeconds)), []store.HistoryRequestOption{})
storeResult, err := w1.query(context.Background(), w2.node.Host().ID(), relay.DefaultWakuTopic, []common.TopicType{contentTopic}, uint64(timestampInSeconds-int64(marginInSeconds)), uint64(timestampInSeconds+int64(marginInSeconds)), []byte{}, []store.HistoryRequestOption{})
require.NoError(t, err)
require.True(t, len(storeResult.Messages) > 0, "no messages received from store node")
}