fix: handle community shard unassignment and update (#4627)

This commit is contained in:
richΛrd 2024-01-30 13:56:59 -04:00 committed by GitHub
parent e2341248b3
commit 98c1ebec05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 166 additions and 24 deletions

View File

@ -69,6 +69,11 @@ func (w *gethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecd
return errors.New("not available in WakuV1")
}
func (w *gethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error {
// not available in WakuV1
return errors.New("not available in WakuV1")
}
func (w *gethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
// not available in WakuV1
return nil, errors.New("not available in WakuV1")
@ -79,6 +84,11 @@ func (w *gethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Priva
return errors.New("not available in WakuV1")
}
func (w *gethWakuWrapper) RemovePubsubTopicKey(topic string) error {
// not available in WakuV1
return errors.New("not available in WakuV1")
}
// AddRelayPeer function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) {
return "", errors.New("not available in WakuV1")

View File

@ -237,6 +237,10 @@ func (w *gethWakuV2Wrapper) SubscribeToPubsubTopic(topic string, optPublicKey *e
return w.waku.SubscribeToPubsubTopic(topic, optPublicKey)
}
func (w *gethWakuV2Wrapper) UnsubscribeFromPubsubTopic(topic string) error {
return w.waku.UnsubscribeFromPubsubTopic(topic)
}
func (w *gethWakuV2Wrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return w.waku.RetrievePubsubTopicKey(topic)
}
@ -245,6 +249,10 @@ func (w *gethWakuV2Wrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Pri
return w.waku.StorePubsubTopicKey(topic, privKey)
}
func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error {
return w.waku.RemovePubsubTopicKey(topic)
}
func (w *gethWakuV2Wrapper) AddStorePeer(address string) (peer.ID, error) {
return w.waku.AddStorePeer(address)
}

View File

@ -90,10 +90,14 @@ type Waku interface {
SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error
UnsubscribeFromPubsubTopic(topic string) error
StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error
RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error)
RemovePubsubTopicKey(topic string) error
AddStorePeer(address string) (peer.ID, error)
AddRelayPeer(address string) (peer.ID, error)

View File

@ -1077,6 +1077,10 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error {
func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock uint64) error {
community.config.Shard = shard
if shard == nil {
return m.persistence.DeleteCommunityShard(community.ID())
}
return m.persistence.SaveCommunityShard(community.ID(), shard, clock)
}
@ -1086,9 +1090,6 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com
if err != nil {
return nil, err
}
if !community.IsControlNode() {
return nil, errors.New("not admin or owner")
}
community.increaseClock()
@ -1105,17 +1106,12 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com
return community, nil
}
func (m *Manager) UpdatePubsubTopicPrivateKey(community *Community, privKey *ecdsa.PrivateKey) error {
community.SetPubsubTopicPrivateKey(privKey)
func (m *Manager) UpdatePubsubTopicPrivateKey(topic string, privKey *ecdsa.PrivateKey) error {
if privKey != nil {
topic := community.PubsubTopic()
if err := m.transport.StorePubsubTopicKey(topic, privKey); err != nil {
return err
}
return m.transport.StorePubsubTopicKey(topic, privKey)
}
return nil
return m.transport.RemovePubsubTopicKey(topic)
}
// EditCommunity takes a description, updates the community with the description,

View File

@ -715,6 +715,16 @@ func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.S
return m.transport.SubscribeToPubsubTopic(pubsubTopic, pubK)
}
func (m *Messenger) unsubscribeFromShard(shard *shard.Shard) error {
if m.transport.WakuVersion() != 2 {
return nil
}
// TODO: this should probably be moved completely to transport once pubsub topic logic is implemented
return m.transport.UnsubscribeFromPubsubTopic(shard.PubsubTopic())
}
func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes, forceJoin bool) (*MessengerResponse, error) {
logger := m.logger.Named("joinCommunity")
@ -2053,26 +2063,57 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes
return nil, err
}
community, err := m.communitiesManager.SetShard(request.CommunityID, request.Shard)
community, err := m.communitiesManager.GetByID(request.CommunityID)
if err != nil {
return nil, err
}
var topicPrivKey *ecdsa.PrivateKey
if request.PrivateKey != nil {
topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey)
} else {
topicPrivKey, err = crypto.GenerateKey()
if !community.IsControlNode() {
return nil, errors.New("not admin or owner")
}
// Reset the community private key
community.SetPubsubTopicPrivateKey(nil)
// Removing the private key (if it exist)
err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic())
if err != nil {
return nil, err
}
err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, topicPrivKey)
// Unsubscribing from existing shard
if community.Shard() != nil {
err := m.unsubscribeFromShard(community.Shard())
if err != nil {
return nil, err
}
}
community, err = m.communitiesManager.SetShard(request.CommunityID, request.Shard)
if err != nil {
return nil, err
}
if request.Shard != nil {
var topicPrivKey *ecdsa.PrivateKey
if request.PrivateKey != nil {
topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey)
} else {
topicPrivKey, err = crypto.GenerateKey()
}
if err != nil {
return nil, err
}
community.SetPubsubTopicPrivateKey(topicPrivKey)
err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), topicPrivKey)
if err != nil {
return nil, err
}
}
// TODO: Check
err = m.UpdateCommunityFilters(community)
if err != nil {
return nil, err
@ -2094,6 +2135,10 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes
return response, nil
}
func (m *Messenger) RemovePubsubTopicPrivateKey(topic string) error {
return m.transport.RemovePubsubTopicKey(topic)
}
func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error {
defaultFilters := m.DefaultFilters(community)
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))
@ -2480,15 +2525,16 @@ func (m *Messenger) SendCommunityShardKey(community *communities.Community, pubk
return nil
}
keyBytes := make([]byte, 0)
key := community.PubsubTopicPrivateKey()
if key == nil {
return nil // No community shard key available
if key != nil {
keyBytes = crypto.FromECDSA(key)
}
communityShardKey := &protobuf.CommunityShardKey{
Clock: community.Clock(),
CommunityId: community.ID(),
PrivateKey: crypto.FromECDSA(key),
PrivateKey: keyBytes,
Shard: community.Shard().Protobuffer(),
}
@ -2831,14 +2877,32 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti
}
var privKey *ecdsa.PrivateKey = nil
if message.PrivateKey != nil {
privKey, err = crypto.ToECDSA(message.PrivateKey)
if message.Shard != nil {
if message.PrivateKey != nil {
privKey, err = crypto.ToECDSA(message.PrivateKey)
if err != nil {
return err
}
}
}
// Removing the existing private key (if any)
err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic())
if err != nil {
return err
}
// Unsubscribing from existing shard
if community.Shard() != nil {
err := m.unsubscribeFromShard(community.Shard())
if err != nil {
return err
}
}
err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, privKey)
community.SetPubsubTopicPrivateKey(privKey)
err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), privKey)
if err != nil {
return err
}

View File

@ -695,6 +695,14 @@ func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.Pub
return nil
}
// Unsubscribe from a pubsub topic
func (t *Transport) UnsubscribeFromPubsubTopic(topic string) error {
if t.waku.Version() == 2 {
return t.waku.UnsubscribeFromPubsubTopic(topic)
}
return nil
}
func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return t.waku.StorePubsubTopicKey(topic, privKey)
}
@ -702,3 +710,10 @@ func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey)
func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return t.waku.RetrievePubsubTopicKey(topic)
}
func (t *Transport) RemovePubsubTopicKey(topic string) error {
if t.waku.Version() == 2 {
return t.waku.RemovePubsubTopicKey(topic)
}
return nil
}

View File

@ -17,6 +17,7 @@ type ProtectedTopicsStore struct {
insertStmt *sql.Stmt
fetchPrivKeyStmt *sql.Stmt
deleteStmt *sql.Stmt
}
// Creates a new DB store using the db specified via options.
@ -33,11 +34,17 @@ func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore
return nil, err
}
deleteStmt, err := db.Prepare("DELETE FROM pubsubtopic_signing_key WHERE topic = ?")
if err != nil {
return nil, err
}
result := new(ProtectedTopicsStore)
result.log = log.Named("protected-topics-store")
result.db = db
result.insertStmt = insertStmt
result.fetchPrivKeyStmt = fetchPrivKeyStmt
result.deleteStmt = deleteStmt
return result, nil
}
@ -64,6 +71,11 @@ func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.Private
return err
}
func (p *ProtectedTopicsStore) Delete(pubsubTopic string) error {
_, err := p.deleteStmt.Exec(pubsubTopic)
return err
}
func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) {
var privKeyBytes []byte
err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes)

View File

@ -611,6 +611,18 @@ func (w *Waku) getPubsubTopic(topic string) string {
return topic
}
func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error {
topic = w.getPubsubTopic(topic)
if !w.node.Relay().IsSubscribed(topic) {
return nil
}
contentFilter := protocol.NewContentFilter(topic)
return w.node.Relay().Unsubscribe(w.ctx, contentFilter)
}
func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error {
if w.settings.LightClient {
return errors.New("only available for full nodes")
@ -1546,6 +1558,18 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err
return nil
}
func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error {
topic = w.getPubsubTopic(topic)
if !w.settings.LightClient {
err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic)
if err != nil {
return err
}
}
return nil
}
func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
topic = w.getPubsubTopic(topic)
if w.protectedTopicStore == nil {
@ -1564,6 +1588,15 @@ func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) erro
return w.protectedTopicStore.Insert(topic, privKey, &privKey.PublicKey)
}
func (w *Waku) RemovePubsubTopicKey(topic string) error {
topic = w.getPubsubTopic(topic)
if w.protectedTopicStore == nil {
return nil
}
return w.protectedTopicStore.Delete(topic)
}
func (w *Waku) StartDiscV5() error {
if w.node.DiscV5() == nil {
return errors.New("discv5 is not setup")