fix: ensure community events eventual consistency

- Extracted `community_events_factory.go`
- Introduced `eventsProcessor`
  - Improved processing logic order
  - Improved events filtering
- Introduced concept of `EventTypeID` to prevent redundant events handling
- Added sanity check before events appliance when reading community from
  database
- Removed reject&re-apply scheme (no more ping-pong issue)
- Fixed and added more variants to eventual consistency test

fixes: status-im/status-desktop#13387
fixes: status-im/status-desktop#13388
This commit is contained in:
Patryk Osmaczko 2024-02-19 10:52:22 +01:00 committed by osmaczko
parent f7c40d4c40
commit e2cab1a8ae
15 changed files with 1828 additions and 1727 deletions

View File

@ -516,16 +516,6 @@ type CommunitySettings struct {
Clock uint64 `json:"clock"`
}
// `CommunityAdminEventChanges contain additional changes that don't live on
// a `Community` but still have to be propagated to other admin and control nodes
type CommunityEventChanges struct {
*CommunityChanges
// `RejectedRequestsToJoin` is a map of signer keys to requests to join
RejectedRequestsToJoin map[string]*protobuf.CommunityRequestToJoin `json:"rejectedRequestsToJoin"`
// `AcceptedRequestsToJoin` is a map of signer keys to requests to join
AcceptedRequestsToJoin map[string]*protobuf.CommunityRequestToJoin `json:"acceptedRequestsToJoin"`
}
func (o *Community) emptyCommunityChanges() *CommunityChanges {
changes := EmptyCommunityChanges()
changes.Community = o
@ -2343,20 +2333,14 @@ func (o *Community) DeclineRequestToJoin(dbRequest *RequestToJoin) (adminEventCr
}
if o.IsControlNode() {
// typically, community's clock is increased implicitly when making changes
// to it, however in this scenario there are no changes in the community, yet
// we need to increase the clock to ensure the owner event is processed by other
// nodes.
pk, err := common.HexToPubkey(dbRequest.PublicKey)
if err != nil {
return false, err
}
o.removeMemberFromOrg(pk)
o.increaseClock()
} else {
rejectedRequestsToJoin := make(map[string]*protobuf.CommunityRequestToJoin)
rejectedRequestsToJoin[dbRequest.PublicKey] = dbRequest.ToCommunityRequestToJoinProtobuf()
adminChanges := &CommunityEventChanges{
CommunityChanges: o.emptyCommunityChanges(),
RejectedRequestsToJoin: rejectedRequestsToJoin,
}
err = o.addNewCommunityEvent(o.ToCommunityRequestToJoinRejectCommunityEvent(adminChanges))
err = o.addNewCommunityEvent(o.ToCommunityRequestToJoinRejectCommunityEvent(dbRequest.PublicKey, dbRequest.ToCommunityRequestToJoinProtobuf()))
if err != nil {
return adminEventCreated, err
}
@ -2367,11 +2351,8 @@ func (o *Community) DeclineRequestToJoin(dbRequest *RequestToJoin) (adminEventCr
return adminEventCreated, err
}
func (o *Community) ValidateEvent(event *CommunityEvent, signer *ecdsa.PublicKey) error {
o.mutex.Lock()
defer o.mutex.Unlock()
err := validateCommunityEvent(event)
func (o *Community) validateEvent(event *CommunityEvent, signer *ecdsa.PublicKey) error {
err := event.Validate()
if err != nil {
return err
}
@ -2397,6 +2378,12 @@ func (o *Community) ValidateEvent(event *CommunityEvent, signer *ecdsa.PublicKey
return nil
}
func (o *Community) ValidateEvent(event *CommunityEvent, signer *ecdsa.PublicKey) error {
o.mutex.Lock()
defer o.mutex.Unlock()
return o.validateEvent(event, signer)
}
func (o *Community) MemberCanManageToken(member *ecdsa.PublicKey, token *community_token.CommunityToken) bool {
return o.IsMemberOwner(member) || o.IsControlNode() || (o.IsMemberTokenMaster(member) &&
token.PrivilegesLevel != community_token.OwnerLevel && token.PrivilegesLevel != community_token.MasterLevel)

View File

@ -2,435 +2,257 @@ package communities
import (
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
utils "github.com/status-im/status-go/common"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/protobuf"
)
var ErrInvalidCommunityEventClock = errors.New("clock for admin event message is outdated")
type CommunityEvent struct {
CommunityEventClock uint64 `json:"communityEventClock"`
Type protobuf.CommunityEvent_EventType `json:"type"`
CommunityConfig *protobuf.CommunityConfig `json:"communityConfig,omitempty"`
TokenPermission *protobuf.CommunityTokenPermission `json:"tokenPermissions,omitempty"`
CategoryData *protobuf.CategoryData `json:"categoryData,omitempty"`
ChannelData *protobuf.ChannelData `json:"channelData,omitempty"`
MemberToAction string `json:"memberToAction,omitempty"`
RequestToJoin *protobuf.CommunityRequestToJoin `json:"requestToJoin,omitempty"`
TokenMetadata *protobuf.CommunityTokenMetadata `json:"tokenMetadata,omitempty"`
Payload []byte `json:"payload"`
Signature []byte `json:"signature"`
}
func (e *CommunityEvent) ToProtobuf() *protobuf.CommunityEvent {
var acceptedRequestsToJoin map[string]*protobuf.CommunityRequestToJoin
var rejectedRequestsToJoin map[string]*protobuf.CommunityRequestToJoin
switch e.Type {
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT:
acceptedRequestsToJoin = make(map[string]*protobuf.CommunityRequestToJoin)
acceptedRequestsToJoin[e.MemberToAction] = e.RequestToJoin
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT:
rejectedRequestsToJoin = make(map[string]*protobuf.CommunityRequestToJoin)
rejectedRequestsToJoin[e.MemberToAction] = e.RequestToJoin
}
return &protobuf.CommunityEvent{
CommunityEventClock: e.CommunityEventClock,
Type: e.Type,
CommunityConfig: e.CommunityConfig,
TokenPermission: e.TokenPermission,
CategoryData: e.CategoryData,
ChannelData: e.ChannelData,
MemberToAction: e.MemberToAction,
RejectedRequestsToJoin: rejectedRequestsToJoin,
AcceptedRequestsToJoin: acceptedRequestsToJoin,
TokenMetadata: e.TokenMetadata,
}
}
func communityEventFromProtobuf(msg *protobuf.SignedCommunityEvent) (*CommunityEvent, error) {
decodedEvent := protobuf.CommunityEvent{}
err := proto.Unmarshal(msg.Payload, &decodedEvent)
if err != nil {
return nil, err
}
memberToAction := decodedEvent.MemberToAction
var requestToJoin *protobuf.CommunityRequestToJoin
switch decodedEvent.Type {
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT:
for member, request := range decodedEvent.AcceptedRequestsToJoin {
memberToAction = member
requestToJoin = request
break
}
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT:
for member, request := range decodedEvent.RejectedRequestsToJoin {
memberToAction = member
requestToJoin = request
break
}
}
func (o *Community) ToCreateChannelCommunityEvent(channelID string, channel *protobuf.CommunityChat) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE,
ChannelData: &protobuf.ChannelData{
ChannelId: channelID,
Channel: channel,
},
}
CommunityEventClock: decodedEvent.CommunityEventClock,
Type: decodedEvent.Type,
CommunityConfig: decodedEvent.CommunityConfig,
TokenPermission: decodedEvent.TokenPermission,
CategoryData: decodedEvent.CategoryData,
ChannelData: decodedEvent.ChannelData,
MemberToAction: memberToAction,
RequestToJoin: requestToJoin,
TokenMetadata: decodedEvent.TokenMetadata,
Payload: msg.Payload,
Signature: msg.Signature,
}, nil
}
func (o *Community) ToEditChannelCommunityEvent(channelID string, channel *protobuf.CommunityChat) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT,
ChannelData: &protobuf.ChannelData{
ChannelId: channelID,
Channel: channel,
},
}
func (e *CommunityEvent) RecoverSigner() (*ecdsa.PublicKey, error) {
if e.Signature == nil || len(e.Signature) == 0 {
return nil, errors.New("missing signature")
}
func (o *Community) ToDeleteChannelCommunityEvent(channelID string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE,
ChannelData: &protobuf.ChannelData{
ChannelId: channelID,
},
}
signer, err := crypto.SigToPub(
crypto.Keccak256(e.Payload),
e.Signature,
)
if err != nil {
return nil, errors.New("failed to recover signer")
}
func (o *Community) ToReorderChannelCommunityEvent(categoryID string, channelID string, position int) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER,
ChannelData: &protobuf.ChannelData{
CategoryId: categoryID,
ChannelId: channelID,
Position: int32(position),
},
}
return signer, nil
}
func (o *Community) ToCreateCategoryCommunityEvent(categoryID string, categoryName string, channelsIds []string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE,
CategoryData: &protobuf.CategoryData{
Name: categoryName,
CategoryId: categoryID,
ChannelsIds: channelsIds,
},
}
}
func (o *Community) ToEditCategoryCommunityEvent(categoryID string, categoryName string, channelsIds []string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT,
CategoryData: &protobuf.CategoryData{
Name: categoryName,
CategoryId: categoryID,
ChannelsIds: channelsIds,
},
}
}
func (o *Community) ToDeleteCategoryCommunityEvent(categoryID string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE,
CategoryData: &protobuf.CategoryData{
CategoryId: categoryID,
},
}
}
func (o *Community) ToReorderCategoryCommunityEvent(categoryID string, position int) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER,
CategoryData: &protobuf.CategoryData{
CategoryId: categoryID,
Position: int32(position),
},
}
}
func (o *Community) ToBanCommunityMemberCommunityEvent(pubkey string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN,
MemberToAction: pubkey,
}
}
func (o *Community) ToUnbanCommunityMemberCommunityEvent(pubkey string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN,
MemberToAction: pubkey,
}
}
func (o *Community) ToKickCommunityMemberCommunityEvent(pubkey string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK,
MemberToAction: pubkey,
}
}
func (o *Community) ToCommunityEditCommunityEvent(description *protobuf.CommunityDescription) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_EDIT),
Type: protobuf.CommunityEvent_COMMUNITY_EDIT,
CommunityConfig: &protobuf.CommunityConfig{
Identity: description.Identity,
Permissions: description.Permissions,
AdminSettings: description.AdminSettings,
IntroMessage: description.IntroMessage,
OutroMessage: description.OutroMessage,
Tags: description.Tags,
},
}
}
func (o *Community) ToCommunityTokenPermissionChangeCommunityEvent(permission *protobuf.CommunityTokenPermission) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE,
TokenPermission: permission,
}
}
func (o *Community) ToCommunityTokenPermissionDeleteCommunityEvent(permission *protobuf.CommunityTokenPermission) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE,
TokenPermission: permission,
}
}
func (o *Community) ToCommunityRequestToJoinAcceptCommunityEvent(changes *CommunityEventChanges) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT),
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT,
AcceptedRequestsToJoin: changes.AcceptedRequestsToJoin,
}
}
func (o *Community) ToCommunityRequestToJoinRejectCommunityEvent(changes *CommunityEventChanges) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT),
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT,
RejectedRequestsToJoin: changes.RejectedRequestsToJoin,
}
}
func (o *Community) ToAddTokenMetadataCommunityEvent(tokenMetadata *protobuf.CommunityTokenMetadata) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD),
Type: protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD,
TokenMetadata: tokenMetadata,
}
}
func (o *Community) UpdateCommunityByEvents(communityEventMessage *CommunityEventsMessage) error {
o.mutex.Lock()
defer o.mutex.Unlock()
// Validate that EventsBaseCommunityDescription was signed by the control node
description, err := validateAndGetEventsMessageCommunityDescription(communityEventMessage.EventsBaseCommunityDescription, o.ControlNode())
if err != nil {
return err
}
if description.Clock != o.config.CommunityDescription.Clock {
return ErrInvalidCommunityEventClock
}
// Merge community events to existing community. Community events must be stored to the db
// during saving the community
o.mergeCommunityEvents(communityEventMessage)
if o.encryptor != nil {
_, err = decryptDescription(o.ID(), o.encryptor, description, o.config.Logger)
if err != nil {
return err
}
}
o.config.CommunityDescription = description
o.config.CommunityDescriptionProtocolMessage = communityEventMessage.EventsBaseCommunityDescription
// Update the copy of the CommunityDescription by community events
err = o.updateCommunityDescriptionByEvents()
func (e *CommunityEvent) Sign(pk *ecdsa.PrivateKey) error {
sig, err := crypto.Sign(crypto.Keccak256(e.Payload), pk)
if err != nil {
return err
}
e.Signature = sig
return nil
}
func (o *Community) updateCommunityDescriptionByEvents() error {
if o.config.EventsData == nil {
return nil
}
for _, event := range o.config.EventsData.Events {
err := o.updateCommunityDescriptionByCommunityEvent(event)
if err != nil {
return err
}
}
return nil
}
func (o *Community) updateCommunityDescriptionByCommunityEvent(communityEvent CommunityEvent) error {
switch communityEvent.Type {
func (e *CommunityEvent) Validate() error {
switch e.Type {
case protobuf.CommunityEvent_COMMUNITY_EDIT:
o.config.CommunityDescription.Identity = communityEvent.CommunityConfig.Identity
o.config.CommunityDescription.Permissions = communityEvent.CommunityConfig.Permissions
o.config.CommunityDescription.AdminSettings = communityEvent.CommunityConfig.AdminSettings
o.config.CommunityDescription.IntroMessage = communityEvent.CommunityConfig.IntroMessage
o.config.CommunityDescription.OutroMessage = communityEvent.CommunityConfig.OutroMessage
o.config.CommunityDescription.Tags = communityEvent.CommunityConfig.Tags
if e.CommunityConfig == nil || e.CommunityConfig.Identity == nil ||
e.CommunityConfig.Permissions == nil || e.CommunityConfig.AdminSettings == nil {
return errors.New("invalid config change admin event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE:
if o.IsControlNode() {
_, err := o.upsertTokenPermission(communityEvent.TokenPermission)
if err != nil {
return err
}
if e.TokenPermission == nil || len(e.TokenPermission.Id) == 0 {
return errors.New("invalid token permission change event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE:
if o.IsControlNode() {
_, err := o.deleteTokenPermission(communityEvent.TokenPermission.Id)
if err != nil {
return err
}
if e.TokenPermission == nil || len(e.TokenPermission.Id) == 0 {
return errors.New("invalid token permission delete event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE:
_, err := o.createCategory(communityEvent.CategoryData.CategoryId, communityEvent.CategoryData.Name, communityEvent.CategoryData.ChannelsIds)
if err != nil {
return err
if e.CategoryData == nil || len(e.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category create event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE:
_, err := o.deleteCategory(communityEvent.CategoryData.CategoryId)
if err != nil {
return err
if e.CategoryData == nil || len(e.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category delete event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT:
_, err := o.editCategory(communityEvent.CategoryData.CategoryId, communityEvent.CategoryData.Name, communityEvent.CategoryData.ChannelsIds)
if err != nil {
return err
if e.CategoryData == nil || len(e.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category edit event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE:
err := o.createChat(communityEvent.ChannelData.ChannelId, communityEvent.ChannelData.Channel)
if err != nil {
return err
if e.ChannelData == nil || len(e.ChannelData.ChannelId) == 0 ||
e.ChannelData.Channel == nil {
return errors.New("invalid community channel create event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE:
o.deleteChat(communityEvent.ChannelData.ChannelId)
if e.ChannelData == nil || len(e.ChannelData.ChannelId) == 0 {
return errors.New("invalid community channel delete event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT:
err := o.editChat(communityEvent.ChannelData.ChannelId, communityEvent.ChannelData.Channel)
if err != nil {
return err
if e.ChannelData == nil || len(e.ChannelData.ChannelId) == 0 ||
e.ChannelData.Channel == nil {
return errors.New("invalid community channel edit event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER:
_, err := o.reorderChat(communityEvent.ChannelData.CategoryId, communityEvent.ChannelData.ChannelId, int(communityEvent.ChannelData.Position))
if err != nil {
return err
if e.ChannelData == nil || len(e.ChannelData.ChannelId) == 0 {
return errors.New("invalid community channel reorder event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER:
_, err := o.reorderCategories(communityEvent.CategoryData.CategoryId, int(communityEvent.CategoryData.Position))
if err != nil {
return err
if e.CategoryData == nil || len(e.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category reorder event")
}
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT, protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT:
if len(e.MemberToAction) == 0 || e.RequestToJoin == nil {
return errors.New("invalid community request to join event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK:
if o.IsControlNode() {
pk, err := common.HexToPubkey(communityEvent.MemberToAction)
if err != nil {
return err
}
o.removeMemberFromOrg(pk)
if len(e.MemberToAction) == 0 {
return errors.New("invalid community member kick event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN:
if o.IsControlNode() {
pk, err := common.HexToPubkey(communityEvent.MemberToAction)
if err != nil {
return err
}
o.banUserFromCommunity(pk)
if len(e.MemberToAction) == 0 {
return errors.New("invalid community member ban event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN:
if o.IsControlNode() {
pk, err := common.HexToPubkey(communityEvent.MemberToAction)
if err != nil {
return err
}
o.unbanUserFromCommunity(pk)
if len(e.MemberToAction) == 0 {
return errors.New("invalid community member unban event")
}
case protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD:
o.config.CommunityDescription.CommunityTokensMetadata = append(o.config.CommunityDescription.CommunityTokensMetadata, communityEvent.TokenMetadata)
if e.TokenMetadata == nil || len(e.TokenMetadata.ContractAddresses) == 0 {
return errors.New("invalid add community token event")
}
}
return nil
}
func (o *Community) nextEventClock(eventType protobuf.CommunityEvent_EventType) uint64 {
// assumes events are already sorted by clock
latestEventClock := uint64(0)
if o.config.EventsData != nil {
for _, event := range o.config.EventsData.Events {
if event.Type == eventType {
latestEventClock = event.CommunityEventClock
}
}
// EventTypeID constructs a unique identifier for an event and its associated target.
func (e *CommunityEvent) EventTypeID() string {
switch e.Type {
case protobuf.CommunityEvent_COMMUNITY_EDIT:
return fmt.Sprintf("%d", e.Type)
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE,
protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE:
return fmt.Sprintf("%d-%s", e.Type, e.TokenPermission.Id)
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE,
protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE,
protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT,
protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER:
return fmt.Sprintf("%d-%s", e.Type, e.CategoryData.CategoryId)
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE,
protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE,
protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT,
protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER:
return fmt.Sprintf("%d-%s", e.Type, e.ChannelData.ChannelId)
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT,
protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT,
protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK,
protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN,
protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN:
return fmt.Sprintf("%d-%s", e.Type, e.MemberToAction)
case protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD:
return fmt.Sprintf("%d-%s", e.Type, e.TokenMetadata.Name)
}
clock := o.config.CommunityDescription.Clock
if latestEventClock > clock {
clock = latestEventClock
return ""
}
// lamport timestamp
timestamp := o.timesource.GetCurrentTime()
if clock == 0 || clock < timestamp {
clock = timestamp
} else {
clock = clock + 1
func communityEventsToJSONEncodedBytes(communityEvents []CommunityEvent) ([]byte, error) {
return json.Marshal(communityEvents)
}
return clock
}
func (o *Community) addNewCommunityEvent(event *CommunityEvent) error {
err := validateCommunityEvent(event)
if err != nil {
return err
}
// All events must be built on top of the control node CommunityDescription
// If there were no events before, extract CommunityDescription from CommunityDescriptionProtocolMessage
// and check the signature
if o.config.EventsData == nil || len(o.config.EventsData.EventsBaseCommunityDescription) == 0 {
_, err := validateAndGetEventsMessageCommunityDescription(o.config.CommunityDescriptionProtocolMessage, o.ControlNode())
if err != nil {
return err
}
o.config.EventsData = &EventsData{
EventsBaseCommunityDescription: o.config.CommunityDescriptionProtocolMessage,
Events: []CommunityEvent{},
}
}
event.Payload, err = proto.Marshal(event.ToProtobuf())
if err != nil {
return err
}
o.config.EventsData.Events = append(o.config.EventsData.Events, *event)
return nil
}
func (o *Community) ToCommunityEventsMessage() *CommunityEventsMessage {
return &CommunityEventsMessage{
CommunityID: o.ID(),
EventsBaseCommunityDescription: o.config.EventsData.EventsBaseCommunityDescription,
Events: o.config.EventsData.Events,
}
}
func validateAndGetEventsMessageCommunityDescription(signedDescription []byte, signerPubkey *ecdsa.PublicKey) (*protobuf.CommunityDescription, error) {
metadata := &protobuf.ApplicationMetadataMessage{}
err := proto.Unmarshal(signedDescription, metadata)
func communityEventsFromJSONEncodedBytes(jsonEncodedRawEvents []byte) ([]CommunityEvent, error) {
var events []CommunityEvent
err := json.Unmarshal(jsonEncodedRawEvents, &events)
if err != nil {
return nil, err
}
if metadata.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
return nil, ErrInvalidMessage
}
signer, err := utils.RecoverKey(metadata)
if err != nil {
return nil, err
}
if signer == nil {
return nil, errors.New("CommunityDescription does not contain the control node signature")
}
if !signer.Equal(signerPubkey) {
return nil, errors.New("CommunityDescription was not signed by an owner")
}
description := &protobuf.CommunityDescription{}
err = proto.Unmarshal(metadata.Payload, description)
if err != nil {
return nil, err
}
return description, nil
return events, nil
}

View File

@ -1,100 +1,11 @@
package communities
import (
"bytes"
"crypto/ecdsa"
"encoding/json"
"errors"
"sort"
"github.com/golang/protobuf/proto"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/protobuf"
)
type CommunityEvent struct {
CommunityEventClock uint64 `json:"communityEventClock"`
Type protobuf.CommunityEvent_EventType `json:"type"`
CommunityConfig *protobuf.CommunityConfig `json:"communityConfig,omitempty"`
TokenPermission *protobuf.CommunityTokenPermission `json:"tokenPermissions,omitempty"`
CategoryData *protobuf.CategoryData `json:"categoryData,omitempty"`
ChannelData *protobuf.ChannelData `json:"channelData,omitempty"`
MemberToAction string `json:"memberToAction,omitempty"`
MembersAdded map[string]*protobuf.CommunityMember `json:"membersAdded,omitempty"`
RejectedRequestsToJoin map[string]*protobuf.CommunityRequestToJoin `json:"rejectedRequestsToJoin,omitempty"`
AcceptedRequestsToJoin map[string]*protobuf.CommunityRequestToJoin `json:"acceptedRequestsToJoin,omitempty"`
TokenMetadata *protobuf.CommunityTokenMetadata `json:"tokenMetadata,omitempty"`
Payload []byte `json:"payload"`
Signature []byte `json:"signature"`
}
func (e *CommunityEvent) ToProtobuf() *protobuf.CommunityEvent {
return &protobuf.CommunityEvent{
CommunityEventClock: e.CommunityEventClock,
Type: e.Type,
CommunityConfig: e.CommunityConfig,
TokenPermission: e.TokenPermission,
CategoryData: e.CategoryData,
ChannelData: e.ChannelData,
MemberToAction: e.MemberToAction,
MembersAdded: e.MembersAdded,
RejectedRequestsToJoin: e.RejectedRequestsToJoin,
AcceptedRequestsToJoin: e.AcceptedRequestsToJoin,
TokenMetadata: e.TokenMetadata,
}
}
func communityEventFromProtobuf(msg *protobuf.SignedCommunityEvent) (*CommunityEvent, error) {
decodedEvent := protobuf.CommunityEvent{}
err := proto.Unmarshal(msg.Payload, &decodedEvent)
if err != nil {
return nil, err
}
return &CommunityEvent{
CommunityEventClock: decodedEvent.CommunityEventClock,
Type: decodedEvent.Type,
CommunityConfig: decodedEvent.CommunityConfig,
TokenPermission: decodedEvent.TokenPermission,
CategoryData: decodedEvent.CategoryData,
ChannelData: decodedEvent.ChannelData,
MemberToAction: decodedEvent.MemberToAction,
MembersAdded: decodedEvent.MembersAdded,
RejectedRequestsToJoin: decodedEvent.RejectedRequestsToJoin,
AcceptedRequestsToJoin: decodedEvent.AcceptedRequestsToJoin,
TokenMetadata: decodedEvent.TokenMetadata,
Payload: msg.Payload,
Signature: msg.Signature,
}, nil
}
func (e *CommunityEvent) RecoverSigner() (*ecdsa.PublicKey, error) {
if e.Signature == nil || len(e.Signature) == 0 {
return nil, errors.New("missing signature")
}
signer, err := crypto.SigToPub(
crypto.Keccak256(e.Payload),
e.Signature,
)
if err != nil {
return nil, errors.New("failed to recover signer")
}
return signer, nil
}
func (e *CommunityEvent) Sign(pk *ecdsa.PrivateKey) error {
sig, err := crypto.Sign(crypto.Keccak256(e.Payload), pk)
if err != nil {
return err
}
e.Signature = sig
return nil
}
type CommunityEventsMessage struct {
CommunityID []byte `json:"communityId"`
EventsBaseCommunityDescription []byte `json:"eventsBaseCommunityDescription"`
@ -141,145 +52,3 @@ func (m *CommunityEventsMessage) Marshal() ([]byte, error) {
pb := m.ToProtobuf()
return proto.Marshal(pb)
}
func (c *Community) mergeCommunityEvents(communityEventMessage *CommunityEventsMessage) {
if c.config.EventsData == nil {
c.config.EventsData = &EventsData{
EventsBaseCommunityDescription: communityEventMessage.EventsBaseCommunityDescription,
Events: communityEventMessage.Events,
}
return
}
for _, update := range communityEventMessage.Events {
var exists bool
for _, existing := range c.config.EventsData.Events {
if isCommunityEventsEqual(update, existing) {
exists = true
break
}
}
if !exists {
c.config.EventsData.Events = append(c.config.EventsData.Events, update)
}
}
c.sortCommunityEvents()
}
func (c *Community) sortCommunityEvents() {
sort.Slice(c.config.EventsData.Events, func(i, j int) bool {
return c.config.EventsData.Events[i].CommunityEventClock < c.config.EventsData.Events[j].CommunityEventClock
})
}
func validateCommunityEvent(communityEvent *CommunityEvent) error {
switch communityEvent.Type {
case protobuf.CommunityEvent_COMMUNITY_EDIT:
if communityEvent.CommunityConfig == nil || communityEvent.CommunityConfig.Identity == nil ||
communityEvent.CommunityConfig.Permissions == nil || communityEvent.CommunityConfig.AdminSettings == nil {
return errors.New("invalid config change admin event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE:
if communityEvent.TokenPermission == nil || len(communityEvent.TokenPermission.Id) == 0 {
return errors.New("invalid token permission change event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE:
if communityEvent.TokenPermission == nil || len(communityEvent.TokenPermission.Id) == 0 {
return errors.New("invalid token permission delete event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE:
if communityEvent.CategoryData == nil || len(communityEvent.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category create event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE:
if communityEvent.CategoryData == nil || len(communityEvent.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category delete event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT:
if communityEvent.CategoryData == nil || len(communityEvent.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category edit event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE:
if communityEvent.ChannelData == nil || len(communityEvent.ChannelData.ChannelId) == 0 ||
communityEvent.ChannelData.Channel == nil {
return errors.New("invalid community channel create event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE:
if communityEvent.ChannelData == nil || len(communityEvent.ChannelData.ChannelId) == 0 {
return errors.New("invalid community channel delete event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT:
if communityEvent.ChannelData == nil || len(communityEvent.ChannelData.ChannelId) == 0 ||
communityEvent.ChannelData.Channel == nil {
return errors.New("invalid community channel edit event")
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER:
if communityEvent.ChannelData == nil || len(communityEvent.ChannelData.ChannelId) == 0 {
return errors.New("invalid community channel reorder event")
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER:
if communityEvent.CategoryData == nil || len(communityEvent.CategoryData.CategoryId) == 0 {
return errors.New("invalid community category reorder event")
}
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT:
if communityEvent.AcceptedRequestsToJoin == nil {
return errors.New("invalid community request to join accepted event")
}
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT:
if communityEvent.RejectedRequestsToJoin == nil {
return errors.New("invalid community request to join reject event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK:
if len(communityEvent.MemberToAction) == 0 {
return errors.New("invalid community member kick event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN:
if len(communityEvent.MemberToAction) == 0 {
return errors.New("invalid community member ban event")
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN:
if len(communityEvent.MemberToAction) == 0 {
return errors.New("invalid community member unban event")
}
case protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD:
if communityEvent.TokenMetadata == nil || len(communityEvent.TokenMetadata.ContractAddresses) == 0 {
return errors.New("invalid add community token event")
}
}
return nil
}
func isCommunityEventsEqual(left CommunityEvent, right CommunityEvent) bool {
return bytes.Equal(left.Payload, right.Payload)
}
func communityEventsToJSONEncodedBytes(communityEvents []CommunityEvent) ([]byte, error) {
return json.Marshal(communityEvents)
}
func communityEventsFromJSONEncodedBytes(jsonEncodedRawEvents []byte) ([]CommunityEvent, error) {
var events []CommunityEvent
err := json.Unmarshal(jsonEncodedRawEvents, &events)
if err != nil {
return nil, err
}
return events, nil
}

View File

@ -0,0 +1,199 @@
package communities
import "github.com/status-im/status-go/protocol/protobuf"
func (o *Community) ToCreateChannelCommunityEvent(channelID string, channel *protobuf.CommunityChat) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE,
ChannelData: &protobuf.ChannelData{
ChannelId: channelID,
Channel: channel,
},
}
}
func (o *Community) ToEditChannelCommunityEvent(channelID string, channel *protobuf.CommunityChat) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT,
ChannelData: &protobuf.ChannelData{
ChannelId: channelID,
Channel: channel,
},
}
}
func (o *Community) ToDeleteChannelCommunityEvent(channelID string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE,
ChannelData: &protobuf.ChannelData{
ChannelId: channelID,
},
}
}
func (o *Community) ToReorderChannelCommunityEvent(categoryID string, channelID string, position int) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER,
ChannelData: &protobuf.ChannelData{
CategoryId: categoryID,
ChannelId: channelID,
Position: int32(position),
},
}
}
func (o *Community) ToCreateCategoryCommunityEvent(categoryID string, categoryName string, channelsIds []string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE,
CategoryData: &protobuf.CategoryData{
Name: categoryName,
CategoryId: categoryID,
ChannelsIds: channelsIds,
},
}
}
func (o *Community) ToEditCategoryCommunityEvent(categoryID string, categoryName string, channelsIds []string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT,
CategoryData: &protobuf.CategoryData{
Name: categoryName,
CategoryId: categoryID,
ChannelsIds: channelsIds,
},
}
}
func (o *Community) ToDeleteCategoryCommunityEvent(categoryID string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE,
CategoryData: &protobuf.CategoryData{
CategoryId: categoryID,
},
}
}
func (o *Community) ToReorderCategoryCommunityEvent(categoryID string, position int) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER,
CategoryData: &protobuf.CategoryData{
CategoryId: categoryID,
Position: int32(position),
},
}
}
func (o *Community) ToBanCommunityMemberCommunityEvent(pubkey string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN,
MemberToAction: pubkey,
}
}
func (o *Community) ToUnbanCommunityMemberCommunityEvent(pubkey string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN,
MemberToAction: pubkey,
}
}
func (o *Community) ToKickCommunityMemberCommunityEvent(pubkey string) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK,
MemberToAction: pubkey,
}
}
func (o *Community) ToCommunityEditCommunityEvent(description *protobuf.CommunityDescription) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_EDIT,
CommunityConfig: &protobuf.CommunityConfig{
Identity: description.Identity,
Permissions: description.Permissions,
AdminSettings: description.AdminSettings,
IntroMessage: description.IntroMessage,
OutroMessage: description.OutroMessage,
Tags: description.Tags,
},
}
}
func (o *Community) ToCommunityTokenPermissionChangeCommunityEvent(permission *protobuf.CommunityTokenPermission) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE,
TokenPermission: permission,
}
}
func (o *Community) ToCommunityTokenPermissionDeleteCommunityEvent(permission *protobuf.CommunityTokenPermission) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE,
TokenPermission: permission,
}
}
func (o *Community) ToCommunityRequestToJoinAcceptCommunityEvent(member string, request *protobuf.CommunityRequestToJoin) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT,
MemberToAction: member,
RequestToJoin: request,
}
}
func (o *Community) ToCommunityRequestToJoinRejectCommunityEvent(member string, request *protobuf.CommunityRequestToJoin) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT,
MemberToAction: member,
RequestToJoin: request,
}
}
func (o *Community) ToAddTokenMetadataCommunityEvent(tokenMetadata *protobuf.CommunityTokenMetadata) *CommunityEvent {
return &CommunityEvent{
CommunityEventClock: o.nextEventClock(),
Type: protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD,
TokenMetadata: tokenMetadata,
}
}
func (o *Community) nextEventClock() uint64 {
latestEventClock := uint64(0)
if o.config.EventsData != nil {
for _, event := range o.config.EventsData.Events {
if event.CommunityEventClock > latestEventClock {
latestEventClock = event.CommunityEventClock
}
}
}
clock := o.config.CommunityDescription.Clock
if latestEventClock > clock {
clock = latestEventClock
}
// lamport timestamp
timestamp := o.timesource.GetCurrentTime()
if clock == 0 || clock < timestamp {
clock = timestamp
} else {
clock = clock + 1
}
return clock
}

View File

@ -0,0 +1,337 @@
package communities
import (
"crypto/ecdsa"
"errors"
"sort"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
utils "github.com/status-im/status-go/common"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
)
var ErrInvalidCommunityEventClock = errors.New("clock for admin event message is outdated")
func (o *Community) processEvents(message *CommunityEventsMessage, lastlyAppliedEvents map[string]uint64) error {
processor := &eventsProcessor{
community: o,
message: message,
logger: o.config.Logger.Named("eventsProcessor"),
lastlyAppliedEvents: lastlyAppliedEvents,
}
return processor.exec()
}
type eventsProcessor struct {
community *Community
message *CommunityEventsMessage
logger *zap.Logger
lastlyAppliedEvents map[string]uint64
eventsToApply []CommunityEvent
}
func (e *eventsProcessor) exec() error {
e.community.mutex.Lock()
defer e.community.mutex.Unlock()
err := e.validateDescription()
if err != nil {
return err
}
e.filterEvents()
e.mergeEvents()
e.retainNewestEventsPerEventTypeID()
e.sortEvents()
e.applyEvents()
return nil
}
func (e *eventsProcessor) validateDescription() error {
description, err := validateAndGetEventsMessageCommunityDescription(e.message.EventsBaseCommunityDescription, e.community.ControlNode())
if err != nil {
return err
}
// Control node is the only entity that can apply events from past description.
// In this case, events are compared against the clocks of the most recently applied events.
if e.community.IsControlNode() && description.Clock < e.community.config.CommunityDescription.Clock {
return nil
}
if description.Clock != e.community.config.CommunityDescription.Clock {
return ErrInvalidCommunityEventClock
}
return nil
}
// Filter invalid and outdated events.
func (e *eventsProcessor) filterEvents() {
validateEvent := func(event *CommunityEvent) error {
if e.lastlyAppliedEvents != nil {
if clock, found := e.lastlyAppliedEvents[event.EventTypeID()]; found && clock >= event.CommunityEventClock {
return errors.New("event outdated")
}
}
signer, err := event.RecoverSigner()
if err != nil {
return err
}
err = e.community.validateEvent(event, signer)
if err != nil {
return err
}
return nil
}
for i := range e.message.Events {
event := e.message.Events[i]
if err := validateEvent(&event); err == nil {
e.eventsToApply = append(e.eventsToApply, event)
} else {
e.logger.Warn("invalid community event", zap.String("EventTypeID", event.EventTypeID()), zap.Uint64("clock", event.CommunityEventClock), zap.Error(err))
}
}
}
// Merge message's events with community's events.
func (e *eventsProcessor) mergeEvents() {
if e.community.config.EventsData != nil {
e.eventsToApply = append(e.eventsToApply, e.community.config.EventsData.Events...)
}
}
// Keep only the newest event per PropertyTypeID.
func (e *eventsProcessor) retainNewestEventsPerEventTypeID() {
eventsMap := make(map[string]CommunityEvent)
for _, event := range e.eventsToApply {
if existingEvent, found := eventsMap[event.EventTypeID()]; !found || event.CommunityEventClock > existingEvent.CommunityEventClock {
eventsMap[event.EventTypeID()] = event
}
}
e.eventsToApply = []CommunityEvent{}
for _, event := range eventsMap {
e.eventsToApply = append(e.eventsToApply, event)
}
}
// Sorts events by clock.
func (e *eventsProcessor) sortEvents() {
sort.Slice(e.eventsToApply, func(i, j int) bool {
if e.eventsToApply[i].CommunityEventClock == e.eventsToApply[j].CommunityEventClock {
return e.eventsToApply[i].Type < e.eventsToApply[j].Type
}
return e.eventsToApply[i].CommunityEventClock < e.eventsToApply[j].CommunityEventClock
})
}
func (e *eventsProcessor) applyEvents() {
if e.community.config.EventsData == nil {
e.community.config.EventsData = &EventsData{
EventsBaseCommunityDescription: e.message.EventsBaseCommunityDescription,
}
}
e.community.config.EventsData.Events = e.eventsToApply
e.community.applyEvents()
}
func (o *Community) applyEvents() {
if o.config.EventsData == nil {
return
}
for _, event := range o.config.EventsData.Events {
err := o.applyEvent(event)
if err != nil {
o.config.Logger.Warn("failed to apply event", zap.String("EventTypeID", event.EventTypeID()), zap.Uint64("clock", event.CommunityEventClock), zap.Error(err))
}
}
}
func (o *Community) applyEvent(communityEvent CommunityEvent) error {
switch communityEvent.Type {
case protobuf.CommunityEvent_COMMUNITY_EDIT:
o.config.CommunityDescription.Identity = communityEvent.CommunityConfig.Identity
o.config.CommunityDescription.Permissions = communityEvent.CommunityConfig.Permissions
o.config.CommunityDescription.AdminSettings = communityEvent.CommunityConfig.AdminSettings
o.config.CommunityDescription.IntroMessage = communityEvent.CommunityConfig.IntroMessage
o.config.CommunityDescription.OutroMessage = communityEvent.CommunityConfig.OutroMessage
o.config.CommunityDescription.Tags = communityEvent.CommunityConfig.Tags
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE:
if o.IsControlNode() {
_, err := o.upsertTokenPermission(communityEvent.TokenPermission)
if err != nil {
return err
}
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE:
if o.IsControlNode() {
_, err := o.deleteTokenPermission(communityEvent.TokenPermission.Id)
if err != nil {
return err
}
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE:
_, err := o.createCategory(communityEvent.CategoryData.CategoryId, communityEvent.CategoryData.Name, communityEvent.CategoryData.ChannelsIds)
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE:
_, err := o.deleteCategory(communityEvent.CategoryData.CategoryId)
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT:
_, err := o.editCategory(communityEvent.CategoryData.CategoryId, communityEvent.CategoryData.Name, communityEvent.CategoryData.ChannelsIds)
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE:
err := o.createChat(communityEvent.ChannelData.ChannelId, communityEvent.ChannelData.Channel)
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE:
o.deleteChat(communityEvent.ChannelData.ChannelId)
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT:
err := o.editChat(communityEvent.ChannelData.ChannelId, communityEvent.ChannelData.Channel)
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER:
_, err := o.reorderChat(communityEvent.ChannelData.CategoryId, communityEvent.ChannelData.ChannelId, int(communityEvent.ChannelData.Position))
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER:
_, err := o.reorderCategories(communityEvent.CategoryData.CategoryId, int(communityEvent.CategoryData.Position))
if err != nil {
return err
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK:
if o.IsControlNode() {
pk, err := common.HexToPubkey(communityEvent.MemberToAction)
if err != nil {
return err
}
o.removeMemberFromOrg(pk)
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN:
if o.IsControlNode() {
pk, err := common.HexToPubkey(communityEvent.MemberToAction)
if err != nil {
return err
}
o.banUserFromCommunity(pk)
}
case protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN:
if o.IsControlNode() {
pk, err := common.HexToPubkey(communityEvent.MemberToAction)
if err != nil {
return err
}
o.unbanUserFromCommunity(pk)
}
case protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD:
o.config.CommunityDescription.CommunityTokensMetadata = append(o.config.CommunityDescription.CommunityTokensMetadata, communityEvent.TokenMetadata)
}
return nil
}
func (o *Community) addNewCommunityEvent(event *CommunityEvent) error {
err := event.Validate()
if err != nil {
return err
}
// All events must be built on top of the control node CommunityDescription
// If there were no events before, extract CommunityDescription from CommunityDescriptionProtocolMessage
// and check the signature
if o.config.EventsData == nil || len(o.config.EventsData.EventsBaseCommunityDescription) == 0 {
_, err := validateAndGetEventsMessageCommunityDescription(o.config.CommunityDescriptionProtocolMessage, o.ControlNode())
if err != nil {
return err
}
o.config.EventsData = &EventsData{
EventsBaseCommunityDescription: o.config.CommunityDescriptionProtocolMessage,
Events: []CommunityEvent{},
}
}
event.Payload, err = proto.Marshal(event.ToProtobuf())
if err != nil {
return err
}
o.config.EventsData.Events = append(o.config.EventsData.Events, *event)
return nil
}
func (o *Community) toCommunityEventsMessage() *CommunityEventsMessage {
return &CommunityEventsMessage{
CommunityID: o.ID(),
EventsBaseCommunityDescription: o.config.EventsData.EventsBaseCommunityDescription,
Events: o.config.EventsData.Events,
}
}
func validateAndGetEventsMessageCommunityDescription(signedDescription []byte, signerPubkey *ecdsa.PublicKey) (*protobuf.CommunityDescription, error) {
metadata := &protobuf.ApplicationMetadataMessage{}
err := proto.Unmarshal(signedDescription, metadata)
if err != nil {
return nil, err
}
if metadata.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
return nil, ErrInvalidMessage
}
signer, err := utils.RecoverKey(metadata)
if err != nil {
return nil, err
}
if signer == nil {
return nil, errors.New("CommunityDescription does not contain the control node signature")
}
if !signer.Equal(signerPubkey) {
return nil, errors.New("CommunityDescription was not signed by an owner")
}
description := &protobuf.CommunityDescription{}
err = proto.Unmarshal(metadata.Payload, description)
if err != nil {
return nil, err
}
return description, nil
}

View File

@ -0,0 +1,69 @@
package communities
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/status-im/status-go/protocol/protobuf"
)
func TestEventsProcessorSuite(t *testing.T) {
suite.Run(t, new(EventsProcessorSuite))
}
type EventsProcessorSuite struct {
suite.Suite
}
func (s *EventsProcessorSuite) TestRetainNewestEventsPerPropertyTypeID() {
processor := &eventsProcessor{
eventsToApply: []CommunityEvent{
CommunityEvent{
CommunityEventClock: 1,
Type: protobuf.CommunityEvent_COMMUNITY_EDIT,
},
CommunityEvent{
CommunityEventClock: 2,
Type: protobuf.CommunityEvent_COMMUNITY_EDIT,
},
CommunityEvent{
CommunityEventClock: 3,
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT,
MemberToAction: "A",
},
CommunityEvent{
CommunityEventClock: 4,
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT,
MemberToAction: "A",
},
CommunityEvent{
CommunityEventClock: 5,
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT,
MemberToAction: "A",
},
CommunityEvent{
CommunityEventClock: 1,
Type: protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT,
MemberToAction: "B",
},
},
}
processor.retainNewestEventsPerEventTypeID()
s.Require().Len(processor.eventsToApply, 4)
processor.sortEvents()
s.Require().Equal(protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT, processor.eventsToApply[0].Type)
s.Require().EqualValues(1, processor.eventsToApply[0].CommunityEventClock)
s.Require().Equal(protobuf.CommunityEvent_COMMUNITY_EDIT, processor.eventsToApply[1].Type)
s.Require().EqualValues(2, processor.eventsToApply[1].CommunityEventClock)
s.Require().Equal(protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT, processor.eventsToApply[2].Type)
s.Require().EqualValues(4, processor.eventsToApply[2].CommunityEventClock)
s.Require().Equal(protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT, processor.eventsToApply[3].Type)
s.Require().EqualValues(5, processor.eventsToApply[3].CommunityEventClock)
}

View File

@ -346,7 +346,6 @@ type Subscription struct {
DownloadingHistoryArchivesFinishedSignal *signal.DownloadingHistoryArchivesFinishedSignal
ImportingHistoryArchiveMessagesSignal *signal.ImportingHistoryArchiveMessagesSignal
CommunityEventsMessage *CommunityEventsMessage
CommunityEventsMessageInvalidClock *CommunityEventsMessageInvalidClockSignal
AcceptedRequestsToJoin []types.HexBytes
RejectedRequestsToJoin []types.HexBytes
CommunityPrivilegedMemberSyncMessage *CommunityPrivilegedMemberSyncMessage
@ -360,11 +359,6 @@ type CommunityResponse struct {
FailedToDecrypt []*CommunityPrivateDataFailedToDecrypt `json:"-"`
}
type CommunityEventsMessageInvalidClockSignal struct {
Community *Community
CommunityEventsMessage *CommunityEventsMessage
}
func (m *Manager) Subscribe() chan *Subscription {
subscription := make(chan *Subscription, 100)
m.subscriptions = append(m.subscriptions, subscription)
@ -1839,19 +1833,15 @@ func (m *Manager) HandleCommunityEventsMessage(signer *ecdsa.PublicKey, message
originCommunity := community.CreateDeepCopy()
eventsMessage.Events = m.validateAndFilterEvents(community, eventsMessage.Events)
err = community.UpdateCommunityByEvents(eventsMessage)
var lastlyAppliedEvents map[string]uint64
if community.IsControlNode() {
lastlyAppliedEvents, err = m.persistence.GetAppliedCommunityEvents(community.ID())
if err != nil {
if err == ErrInvalidCommunityEventClock && community.IsControlNode() {
// send updated CommunityDescription to the event sender on top of which he must apply his changes
eventsMessage.EventsBaseCommunityDescription = community.config.CommunityDescriptionProtocolMessage
m.publish(&Subscription{
CommunityEventsMessageInvalidClock: &CommunityEventsMessageInvalidClockSignal{
Community: community,
CommunityEventsMessage: eventsMessage,
}})
return nil, err
}
}
err = community.processEvents(eventsMessage, lastlyAppliedEvents)
if err != nil {
return nil, err
}
@ -1866,6 +1856,12 @@ func (m *Manager) HandleCommunityEventsMessage(signer *ecdsa.PublicKey, message
// Control node applies events and publish updated CommunityDescription
if community.IsControlNode() {
appliedEvents := map[string]uint64{}
if community.config.EventsData != nil {
for _, event := range community.config.EventsData.Events {
appliedEvents[event.EventTypeID()] = event.CommunityEventClock
}
}
community.config.EventsData = nil // clear events, they are already applied
community.increaseClock()
@ -1882,6 +1878,11 @@ func (m *Manager) HandleCommunityEventsMessage(signer *ecdsa.PublicKey, message
return nil, err
}
err = m.persistence.UpsertAppliedCommunityEvents(community.ID(), appliedEvents)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
} else {
err = m.persistence.SaveCommunity(community)
@ -1952,7 +1953,7 @@ func (m *Manager) HandleCommunityEventsMessageRejected(signer *ecdsa.PublicKey,
EventsBaseCommunityDescription: community.config.CommunityDescriptionProtocolMessage,
Events: myRejectedEvents,
}
reapplyEventsMessage := community.ToCommunityEventsMessage()
reapplyEventsMessage := community.toCommunityEventsMessage()
return reapplyEventsMessage, nil
}
@ -1967,10 +1968,20 @@ func (m *Manager) handleAdditionalAdminChanges(community *Community) (*Community
return &communityResponse, nil
}
for i := range community.config.EventsData.Events {
if community.config.EventsData == nil {
return &communityResponse, nil
}
handledMembers := map[string]struct{}{}
for i := len(community.config.EventsData.Events) - 1; i >= 0; i-- {
communityEvent := &community.config.EventsData.Events[i]
if _, handled := handledMembers[communityEvent.MemberToAction]; handled {
continue
}
switch communityEvent.Type {
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT:
handledMembers[communityEvent.MemberToAction] = struct{}{}
requestsToJoin, err := m.handleCommunityEventRequestAccepted(community, communityEvent)
if err != nil {
return nil, err
@ -1980,6 +1991,7 @@ func (m *Manager) handleAdditionalAdminChanges(community *Community) (*Community
}
case protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_REJECT:
handledMembers[communityEvent.MemberToAction] = struct{}{}
requestsToJoin, err := m.handleCommunityEventRequestRejected(community, communityEvent)
if err != nil {
return nil, err
@ -2031,7 +2043,9 @@ func (m *Manager) handleCommunityEventRequestAccepted(community *Community, comm
requestsToJoin := make([]*RequestToJoin, 0)
for signer, request := range communityEvent.AcceptedRequestsToJoin {
signer := communityEvent.MemberToAction
request := communityEvent.RequestToJoin
requestToJoin := &RequestToJoin{
PublicKey: signer,
Clock: request.Clock,
@ -2047,9 +2061,9 @@ func (m *Manager) handleCommunityEventRequestAccepted(community *Community, comm
}
if existingRequestToJoin != nil {
alreadyProcessedByControlNode := existingRequestToJoin.State == RequestToJoinStateAccepted || existingRequestToJoin.State == RequestToJoinStateDeclined
alreadyProcessedByControlNode := existingRequestToJoin.State == RequestToJoinStateAccepted
if alreadyProcessedByControlNode || existingRequestToJoin.State == RequestToJoinStateCanceled {
continue
return requestsToJoin, nil
}
}
@ -2067,7 +2081,7 @@ func (m *Manager) handleCommunityEventRequestAccepted(community *Community, comm
}
requestsToJoin = append(requestsToJoin, requestToJoin)
}
if community.IsControlNode() {
m.publish(&Subscription{AcceptedRequestsToJoin: acceptedRequestsToJoin})
}
@ -2079,7 +2093,9 @@ func (m *Manager) handleCommunityEventRequestRejected(community *Community, comm
requestsToJoin := make([]*RequestToJoin, 0)
for signer, request := range communityEvent.RejectedRequestsToJoin {
signer := communityEvent.MemberToAction
request := communityEvent.RequestToJoin
requestToJoin := &RequestToJoin{
PublicKey: signer,
Clock: request.Clock,
@ -2095,9 +2111,9 @@ func (m *Manager) handleCommunityEventRequestRejected(community *Community, comm
}
if existingRequestToJoin != nil {
alreadyProcessedByControlNode := existingRequestToJoin.State == RequestToJoinStateAccepted || existingRequestToJoin.State == RequestToJoinStateDeclined
alreadyProcessedByControlNode := existingRequestToJoin.State == RequestToJoinStateDeclined
if alreadyProcessedByControlNode || existingRequestToJoin.State == RequestToJoinStateCanceled {
continue
return requestsToJoin, nil
}
}
@ -2114,7 +2130,6 @@ func (m *Manager) handleCommunityEventRequestRejected(community *Community, comm
}
requestsToJoin = append(requestsToJoin, requestToJoin)
}
if community.IsControlNode() {
m.publish(&Subscription{RejectedRequestsToJoin: rejectedRequestsToJoin})
@ -2325,16 +2340,7 @@ func (m *Manager) AcceptRequestToJoin(dbRequest *RequestToJoin) (*Community, err
}
}
} else if community.hasPermissionToSendCommunityEvent(protobuf.CommunityEvent_COMMUNITY_REQUEST_TO_JOIN_ACCEPT) {
// admins do not perform permission checks, they merely mark the
// request as accepted (pending) and forward their decision to the control node
acceptedRequestsToJoin := make(map[string]*protobuf.CommunityRequestToJoin)
acceptedRequestsToJoin[dbRequest.PublicKey] = dbRequest.ToCommunityRequestToJoinProtobuf()
adminChanges := &CommunityEventChanges{
AcceptedRequestsToJoin: acceptedRequestsToJoin,
}
err := community.addNewCommunityEvent(community.ToCommunityRequestToJoinAcceptCommunityEvent(adminChanges))
err := community.addNewCommunityEvent(community.ToCommunityRequestToJoinAcceptCommunityEvent(dbRequest.PublicKey, dbRequest.ToCommunityRequestToJoinProtobuf()))
if err != nil {
return nil, err
}
@ -3213,9 +3219,14 @@ func (m *Manager) dbRecordBundleToCommunity(r *CommunityRecordBundle) (*Communit
return err
}
err = community.updateCommunityDescriptionByEvents()
if community.config.EventsData != nil {
eventsDescription, err := validateAndGetEventsMessageCommunityDescription(community.config.EventsData.EventsBaseCommunityDescription, community.ControlNode())
if err != nil {
return err
m.logger.Error("invalid EventsBaseCommunityDescription", zap.Error(err))
}
if eventsDescription.Clock == community.Clock() {
community.applyEvents()
}
}
if m.transport != nil && m.transport.WakuVersion() == 2 {
@ -4658,7 +4669,7 @@ func (m *Manager) saveAndPublish(community *Community) error {
return err
}
m.publish(&Subscription{CommunityEventsMessage: community.ToCommunityEventsMessage()})
m.publish(&Subscription{CommunityEventsMessage: community.toCommunityEventsMessage()})
return nil
}

View File

@ -1806,3 +1806,66 @@ func (p *Persistence) DeleteCommunityShard(communityID types.HexBytes) error {
_, err := p.db.Exec(`DELETE FROM communities_shards WHERE community_id = ?`, communityID)
return err
}
func (p *Persistence) GetAppliedCommunityEvents(communityID types.HexBytes) (map[string]uint64, error) {
rows, err := p.db.Query(`SELECT event_type_id, clock FROM applied_community_events WHERE community_id = ?`, communityID.String())
if err != nil {
return nil, err
}
defer rows.Close()
result := map[string]uint64{}
eventTypeID := ""
clock := uint64(0)
for rows.Next() {
err := rows.Scan(&eventTypeID, &clock)
if err != nil {
return nil, err
}
result[eventTypeID] = clock
}
return result, nil
}
func (p *Persistence) UpsertAppliedCommunityEvents(communityID types.HexBytes, processedEvents map[string]uint64) error {
tx, err := p.db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()
for eventTypeID, newClock := range processedEvents {
var currentClock uint64
err = tx.QueryRow(`
SELECT clock
FROM applied_community_events
WHERE community_id = ? AND event_type_id = ?`,
communityID.String(), eventTypeID).Scan(&currentClock)
if err != nil && err != sql.ErrNoRows {
return err
}
if newClock > currentClock {
_, err = tx.Exec(`
INSERT OR REPLACE INTO applied_community_events(community_id, event_type_id, clock)
VALUES (?, ?, ?)`,
communityID.String(), eventTypeID, newClock)
if err != nil {
return err
}
}
}
return err
}

View File

@ -921,3 +921,26 @@ func (s *PersistenceSuite) TestGetCommunityToValidateByID() {
s.Require().NoError(err)
s.Require().Len(result, 0)
}
func (s *PersistenceSuite) TestProcessedCommunityEvents() {
community := types.HexBytes{1}
events, err := s.db.GetAppliedCommunityEvents(community)
s.Require().NoError(err)
s.Require().Empty(events)
err = s.db.UpsertAppliedCommunityEvents(community, map[string]uint64{"a": 1, "b": 10})
s.Require().NoError(err)
events, err = s.db.GetAppliedCommunityEvents(community)
s.Require().NoError(err)
s.Require().Len(events, 2)
s.Require().True(reflect.DeepEqual(events, map[string]uint64{"a": 1, "b": 10}))
err = s.db.UpsertAppliedCommunityEvents(community, map[string]uint64{"a": 2, "b": 8, "c": 1})
s.Require().NoError(err)
events, err = s.db.GetAppliedCommunityEvents(community)
s.Require().NoError(err)
s.Require().Len(events, 3)
s.Require().True(reflect.DeepEqual(events, map[string]uint64{"a": 2, "b": 10, "c": 1}))
}

View File

@ -18,7 +18,7 @@ func TestCommunityEventsEventualConsistencySuite(t *testing.T) {
}
type CommunityEventsEventualConsistencySuite struct {
AdminCommunityEventsSuite
AdminCommunityEventsSuiteBase
messagesOrderController *MessagesOrderController
}
@ -52,7 +52,7 @@ func (s *CommunityEventsEventualConsistencySuite) SetupTest() {
}
func (s *CommunityEventsEventualConsistencySuite) TearDownTest() {
s.AdminCommunityEventsSuite.TearDownTest()
s.AdminCommunityEventsSuiteBase.TearDownTest()
s.messagesOrderController.Stop()
}
@ -69,11 +69,16 @@ func (s *CommunityEventsEventualConsistencySuite) newMessenger(password string,
})
}
// TODO: remove once eventual consistency is implemented
var communityRequestsEventualConsistencyFixed = false
type requestToJoinActionType int
func (s *CommunityEventsEventualConsistencySuite) TestAdminAcceptRejectRequestToJoin() {
const (
requestToJoinAccept requestToJoinActionType = iota
requestToJoinReject
)
func (s *CommunityEventsEventualConsistencySuite) testRequestsToJoin(actions []requestToJoinActionType, messagesOrder messagesOrderType) {
community := setUpOnRequestCommunityAndRoles(s, protobuf.CommunityMember_ROLE_ADMIN, []*Messenger{})
s.Require().True(community.IsControlNode())
// set up additional user that will send request to join
user := s.newMessenger("", []string{})
@ -108,17 +113,21 @@ func (s *CommunityEventsEventualConsistencySuite) TestAdminAcceptRejectRequestTo
s.Require().NoError(err)
s.Require().Len(response.RequestsToJoinCommunity(), 1)
// accept request to join
for _, action := range actions {
switch action {
case requestToJoinAccept:
acceptRequestToJoin := &requests.AcceptRequestToJoinCommunity{ID: sentRequest.ID}
_, err = s.admin.AcceptRequestToJoinCommunity(acceptRequestToJoin)
s.Require().NoError(err)
// then reject request to join
case requestToJoinReject:
rejectRequestToJoin := &requests.DeclineRequestToJoinCommunity{ID: sentRequest.ID}
_, err = s.admin.DeclineRequestToJoinCommunity(rejectRequestToJoin)
s.Require().NoError(err)
}
}
// ensure both messages are pushed to waku
// ensure all messages are pushed to waku
/*
FIXME: we should do it smarter, as follows:
```
@ -131,48 +140,48 @@ func (s *CommunityEventsEventualConsistencySuite) TestAdminAcceptRejectRequestTo
time.Sleep(1 * time.Second)
// ensure events are received in order
s.messagesOrderController.order = messagesOrderAsPosted
s.messagesOrderController.order = messagesOrder
waitForAcceptedRequestToJoin := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
return len(sub.AcceptedRequestsToJoin) == 1
})
waitOnAdminEventsRejection := waitOnCommunitiesEvent(s.owner, func(s *communities.Subscription) bool {
return s.CommunityEventsMessageInvalidClock != nil
})
_, err = s.owner.RetrieveAll()
response, err = s.owner.RetrieveAll()
s.Require().NoError(err)
// first owner handles AcceptRequestToJoinCommunity event
err = <-waitForAcceptedRequestToJoin
s.Require().NoError(err)
// then owner rejects DeclineRequestToJoinCommunity event due to invalid clock
err = <-waitOnAdminEventsRejection
s.Require().NoError(err)
if communityRequestsEventualConsistencyFixed {
// admin receives rejected DeclineRequestToJoinCommunity event and re-applies it,
// there is no signal whatsoever, we just wait for admin to process all incoming messages
_, _ = WaitOnMessengerResponse(s.admin, func(response *MessengerResponse) bool {
lastAction := actions[len(actions)-1]
responseChecker := func(mr *MessengerResponse) bool {
if len(mr.RequestsToJoinCommunity()) == 0 || len(mr.Communities()) == 0 {
return false
}, "")
}
switch lastAction {
case requestToJoinAccept:
return mr.RequestsToJoinCommunity()[0].State == communities.RequestToJoinStateAccepted &&
mr.Communities()[0].HasMember(&user.identity.PublicKey)
case requestToJoinReject:
return mr.RequestsToJoinCommunity()[0].State == communities.RequestToJoinStateDeclined &&
!mr.Communities()[0].HasMember(&user.identity.PublicKey)
}
return false
}
waitForRejectedRequestToJoin := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
return len(sub.RejectedRequestsToJoin) == 1
})
_, err = s.owner.RetrieveAll()
switch messagesOrder {
case messagesOrderAsPosted:
_, err = WaitOnSignaledMessengerResponse(s.owner, responseChecker, "lack of eventual consistency")
s.Require().NoError(err)
// owner handles DeclineRequestToJoinCommunity event eventually
err = <-waitForRejectedRequestToJoin
s.Require().NoError(err)
// user should be removed from community
community, err = s.owner.GetCommunityByID(community.ID())
s.Require().NoError(err)
s.Require().False(community.HasMember(&user.identity.PublicKey))
case messagesOrderReversed:
s.Require().True(responseChecker(response))
}
}
func (s *CommunityEventsEventualConsistencySuite) TestAdminAcceptRejectRequestToJoin_InOrder() {
s.testRequestsToJoin([]requestToJoinActionType{requestToJoinAccept, requestToJoinReject}, messagesOrderAsPosted)
}
func (s *CommunityEventsEventualConsistencySuite) TestAdminAcceptRejectRequestToJoin_OutOfOrder() {
s.testRequestsToJoin([]requestToJoinActionType{requestToJoinAccept, requestToJoinReject}, messagesOrderReversed)
}
func (s *CommunityEventsEventualConsistencySuite) TestAdminRejectAcceptRequestToJoin_InOrder() {
s.testRequestsToJoin([]requestToJoinActionType{requestToJoinReject, requestToJoinAccept}, messagesOrderAsPosted)
}
func (s *CommunityEventsEventualConsistencySuite) TestAdminRejectAcceptRequestToJoin_OutOfOrder() {
s.testRequestsToJoin([]requestToJoinActionType{requestToJoinReject, requestToJoinAccept}, messagesOrderReversed)
}

View File

@ -13,10 +13,8 @@ import (
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/waku"
@ -26,7 +24,7 @@ func TestAdminCommunityEventsSuite(t *testing.T) {
suite.Run(t, new(AdminCommunityEventsSuite))
}
type AdminCommunityEventsSuite struct {
type AdminCommunityEventsSuiteBase struct {
suite.Suite
owner *Messenger
admin *Messenger
@ -41,27 +39,31 @@ type AdminCommunityEventsSuite struct {
additionalEventSenders []*Messenger
}
func (s *AdminCommunityEventsSuite) GetControlNode() *Messenger {
type AdminCommunityEventsSuite struct {
AdminCommunityEventsSuiteBase
}
func (s *AdminCommunityEventsSuiteBase) GetControlNode() *Messenger {
return s.owner
}
func (s *AdminCommunityEventsSuite) GetEventSender() *Messenger {
func (s *AdminCommunityEventsSuiteBase) GetEventSender() *Messenger {
return s.admin
}
func (s *AdminCommunityEventsSuite) GetMember() *Messenger {
func (s *AdminCommunityEventsSuiteBase) GetMember() *Messenger {
return s.alice
}
func (s *AdminCommunityEventsSuite) GetSuite() *suite.Suite {
func (s *AdminCommunityEventsSuiteBase) GetSuite() *suite.Suite {
return &s.Suite
}
func (s *AdminCommunityEventsSuite) GetCollectiblesServiceMock() *CollectiblesServiceMock {
func (s *AdminCommunityEventsSuiteBase) GetCollectiblesServiceMock() *CollectiblesServiceMock {
return s.collectiblesServiceMock
}
func (s *AdminCommunityEventsSuite) SetupTest() {
func (s *AdminCommunityEventsSuiteBase) SetupTest() {
s.logger = tt.MustCreateTestLogger()
s.collectiblesServiceMock = &CollectiblesServiceMock{}
@ -84,7 +86,7 @@ func (s *AdminCommunityEventsSuite) SetupTest() {
s.mockedBalances = createMockedWalletBalance(&s.Suite)
}
func (s *AdminCommunityEventsSuite) TearDownTest() {
func (s *AdminCommunityEventsSuiteBase) TearDownTest() {
TearDownMessenger(&s.Suite, s.owner)
TearDownMessenger(&s.Suite, s.admin)
TearDownMessenger(&s.Suite, s.alice)
@ -97,7 +99,7 @@ func (s *AdminCommunityEventsSuite) TearDownTest() {
_ = s.logger.Sync()
}
func (s *AdminCommunityEventsSuite) SetupAdditionalMessengers(messengers []*Messenger) {
func (s *AdminCommunityEventsSuiteBase) SetupAdditionalMessengers(messengers []*Messenger) {
for _, m := range messengers {
s.additionalEventSenders = append(s.additionalEventSenders, m)
_, err := m.Start()
@ -105,7 +107,7 @@ func (s *AdminCommunityEventsSuite) SetupAdditionalMessengers(messengers []*Mess
}
}
func (s *AdminCommunityEventsSuite) newMessenger(password string, walletAddresses []string) *Messenger {
func (s *AdminCommunityEventsSuiteBase) newMessenger(password string, walletAddresses []string) *Messenger {
return newTestCommunitiesMessenger(&s.Suite, s.shh, testCommunitiesMessengerConfig{
testMessengerConfig: testMessengerConfig{
logger: s.logger,
@ -416,65 +418,6 @@ func (s *AdminCommunityEventsSuite) TestReceiveRequestsToJoinWithRevealedAccount
testMemberReceiveRequestsToJoinAfterGettingNewRole(s, bob, protobuf.CommunityTokenPermission_BECOME_ADMIN)
}
func (s *AdminCommunityEventsSuite) TestAdminDoesNotHaveRejectedEventsLoop() {
community := setUpCommunityAndRoles(s, protobuf.CommunityMember_ROLE_ADMIN)
// admin modifies community description
adminEditRequest := &requests.EditCommunity{
CommunityID: community.ID(),
CreateCommunity: requests.CreateCommunity{
Name: "admin name",
Description: "admin description",
Color: "#FFFFFF",
Membership: protobuf.CommunityPermissions_MANUAL_ACCEPT,
},
}
_, err := s.admin.EditCommunity(adminEditRequest)
s.Require().NoError(err)
community, err = s.owner.communitiesManager.GetByID(community.ID())
s.Require().NoError(err)
// Update community clock without publishing new CommunityDescription
_, err = community.DeclineRequestToJoin(nil)
s.Require().NoError(err)
err = s.owner.communitiesManager.SaveCommunity(community)
s.Require().NoError(err)
waitOnAdminEventsRejection := waitOnCommunitiesEvent(s.owner, func(s *communities.Subscription) bool {
return s.CommunityEventsMessageInvalidClock != nil
})
// control node receives admin event and rejects it
_, err = WaitOnMessengerResponse(s.owner, func(response *MessengerResponse) bool {
select {
case err := <-waitOnAdminEventsRejection:
s.Require().NoError(err)
return true
default:
return false
}
}, "")
s.Require().NoError(err)
community, err = s.owner.communitiesManager.GetByID(community.ID())
s.Require().NoError(err)
s.Require().NotEqual(adminEditRequest.Description, community.DescriptionText())
// admin receives rejected events and re-applies them
// there is no signal whatsoever, we just wait for admin to process all incoming messages
_, _ = WaitOnMessengerResponse(s.admin, func(response *MessengerResponse) bool {
return false
}, "")
// control node does not receives admin event
_, err = WaitOnMessengerResponse(s.owner, func(response *MessengerResponse) bool {
return len(response.Communities()) > 0
}, "no communities in response")
s.Require().Error(err)
}
func (s *AdminCommunityEventsSuite) TestAdminAcceptsRequestToJoinAfterMemberLeave() {
community := setUpOnRequestCommunityAndRoles(s, protobuf.CommunityMember_ROLE_ADMIN, []*Messenger{})

View File

@ -156,36 +156,6 @@ func (m *Messenger) publishCommunityEvents(community *communities.Community, msg
return err
}
func (m *Messenger) publishCommunityEventsRejected(community *communities.Community, msg *communities.CommunityEventsMessage) error {
if !community.IsControlNode() {
return communities.ErrNotControlNode
}
m.logger.Debug("publishing community events rejected", zap.Any("event", msg))
communityEventsMessage := msg.ToProtobuf()
communityEventsMessageRejected := &protobuf.CommunityEventsMessageRejected{
Msg: communityEventsMessage,
}
payload, err := proto.Marshal(communityEventsMessageRejected)
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Sender: community.PrivateKey(),
// we don't want to wrap in an encryption layer message
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE_REJECTED,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
}
// TODO: resend in case of failure?
_, err = m.sender.SendPublic(context.Background(), types.EncodeHex(msg.CommunityID), rawMessage)
return err
}
func (m *Messenger) publishCommunityPrivilegedMemberSyncMessage(msg *communities.CommunityPrivilegedMemberSyncMessage) error {
m.logger.Debug("publishing privileged user sync message", zap.Any("event", msg))
@ -409,14 +379,6 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti
}
}
if sub.CommunityEventsMessageInvalidClock != nil {
err := m.publishCommunityEventsRejected(sub.CommunityEventsMessageInvalidClock.Community,
sub.CommunityEventsMessageInvalidClock.CommunityEventsMessage)
if err != nil {
m.logger.Warn("failed to publish community events rejected", zap.Error(err))
}
}
if sub.AcceptedRequestsToJoin != nil {
for _, requestID := range sub.AcceptedRequestsToJoin {
accept := &requests.AcceptRequestToJoinCommunity{

View File

@ -81,7 +81,7 @@ func WaitOnSignaledMessengerResponse(m *Messenger, condition func(*MessengerResp
return nil, errors.New("messengerSignalsHandler already provided/mocked")
}
responseChan := make(chan *MessengerResponse, 1)
responseChan := make(chan *MessengerResponse, 64)
m.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{
responseChan: responseChan,
}
@ -101,10 +101,9 @@ func WaitOnSignaledMessengerResponse(m *Messenger, condition func(*MessengerResp
if condition(r) {
return r, nil
}
return nil, errors.New(errorMessage)
case <-timeoutChan:
return nil, errors.New("timed out: " + errorMessage)
return nil, errors.New(errorMessage)
default: // No immediate response, rest & loop back to retrieve again
time.Sleep(interval)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
CREATE TABLE applied_community_events (
community_id TEXT NOT NULL,
event_type_id TEXT DEFAULT NULL,
clock INT NOT NULL,
PRIMARY KEY (community_id, event_type_id) ON CONFLICT REPLACE
);