package communities import ( "errors" "sort" "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" ) var ErrInvalidCommunityEventClock = errors.New("clock for admin event message is outdated") func (o *Community) processEvents(message *CommunityEventsMessage, lastlyAppliedEvents map[string]uint64) error { processor := &eventsProcessor{ community: o, message: message, logger: o.config.Logger.Named("eventsProcessor"), lastlyAppliedEvents: lastlyAppliedEvents, } return processor.exec() } type eventsProcessor struct { community *Community message *CommunityEventsMessage logger *zap.Logger lastlyAppliedEvents map[string]uint64 eventsToApply []CommunityEvent } func (e *eventsProcessor) exec() error { e.community.mutex.Lock() defer e.community.mutex.Unlock() err := e.validateDescription() if err != nil { return err } e.filterEvents() e.mergeEvents() e.retainNewestEventsPerEventTypeID() e.sortEvents() e.applyEvents() return nil } func (e *eventsProcessor) validateDescription() error { description, err := unmarshalCommunityDescriptionMessage(e.message.EventsBaseCommunityDescription, e.community.ControlNode()) if err != nil { return err } // Control node is the only entity that can apply events from past description. // In this case, events are compared against the clocks of the most recently applied events. if e.community.IsControlNode() && description.Clock < e.community.config.CommunityDescription.Clock { return nil } if description.Clock != e.community.config.CommunityDescription.Clock { return ErrInvalidCommunityEventClock } return nil } func (e *eventsProcessor) validateEvent(event *CommunityEvent) error { if e.lastlyAppliedEvents != nil { if clock, found := e.lastlyAppliedEvents[event.EventTypeID()]; found && clock >= event.CommunityEventClock { return errors.New("event outdated") } } signer, err := event.RecoverSigner() if err != nil { return err } return e.community.validateEvent(event, signer) } // Filter invalid and outdated events. func (e *eventsProcessor) filterEvents() { for _, ev := range e.message.Events { event := ev if err := e.validateEvent(&event); err == nil { e.eventsToApply = append(e.eventsToApply, event) } else { e.logger.Warn("invalid community event", zap.String("EventTypeID", event.EventTypeID()), zap.Uint64("clock", event.CommunityEventClock), zap.Error(err)) } } } // Merge message's events with community's events. func (e *eventsProcessor) mergeEvents() { if e.community.config.EventsData != nil { for _, ev := range e.community.config.EventsData.Events { event := ev if err := e.validateEvent(&event); err == nil { e.eventsToApply = append(e.eventsToApply, event) } else { // NOTE: this should not happen, events should be validated before they are saved in the db. // It has been identified that an invalid event is saved to the database for some reason. // The code flow leading to this behavior is not yet known. // https://github.com/status-im/status-desktop/issues/14106 e.logger.Error("invalid community event read from db", zap.String("EventTypeID", event.EventTypeID()), zap.Uint64("clock", event.CommunityEventClock), zap.Error(err)) } } } } // Keep only the newest event per PropertyTypeID. func (e *eventsProcessor) retainNewestEventsPerEventTypeID() { eventsMap := make(map[string]CommunityEvent) for _, event := range e.eventsToApply { if existingEvent, found := eventsMap[event.EventTypeID()]; !found || event.CommunityEventClock > existingEvent.CommunityEventClock { eventsMap[event.EventTypeID()] = event } } e.eventsToApply = []CommunityEvent{} for _, event := range eventsMap { e.eventsToApply = append(e.eventsToApply, event) } } // Sorts events by clock. func (e *eventsProcessor) sortEvents() { sort.Slice(e.eventsToApply, func(i, j int) bool { if e.eventsToApply[i].CommunityEventClock == e.eventsToApply[j].CommunityEventClock { return e.eventsToApply[i].Type < e.eventsToApply[j].Type } return e.eventsToApply[i].CommunityEventClock < e.eventsToApply[j].CommunityEventClock }) } func (e *eventsProcessor) applyEvents() { if e.community.config.EventsData == nil { e.community.config.EventsData = &EventsData{ EventsBaseCommunityDescription: e.message.EventsBaseCommunityDescription, } } e.community.config.EventsData.Events = e.eventsToApply e.community.applyEvents() } func (o *Community) applyEvents() { if o.config.EventsData == nil { return } for _, event := range o.config.EventsData.Events { err := o.applyEvent(event) if err != nil { o.config.Logger.Warn("failed to apply event", zap.String("EventTypeID", event.EventTypeID()), zap.Uint64("clock", event.CommunityEventClock), zap.Error(err)) } } } func (o *Community) applyEvent(communityEvent CommunityEvent) error { switch communityEvent.Type { case protobuf.CommunityEvent_COMMUNITY_EDIT: o.config.CommunityDescription.Identity = communityEvent.CommunityConfig.Identity o.config.CommunityDescription.Permissions = communityEvent.CommunityConfig.Permissions o.config.CommunityDescription.AdminSettings = communityEvent.CommunityConfig.AdminSettings o.config.CommunityDescription.IntroMessage = communityEvent.CommunityConfig.IntroMessage o.config.CommunityDescription.OutroMessage = communityEvent.CommunityConfig.OutroMessage o.config.CommunityDescription.Tags = communityEvent.CommunityConfig.Tags case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_CHANGE: if o.IsControlNode() { _, err := o.upsertTokenPermission(communityEvent.TokenPermission) if err != nil { return err } } case protobuf.CommunityEvent_COMMUNITY_MEMBER_TOKEN_PERMISSION_DELETE: if o.IsControlNode() { _, err := o.deleteTokenPermission(communityEvent.TokenPermission.Id) if err != nil { return err } } case protobuf.CommunityEvent_COMMUNITY_CATEGORY_CREATE: _, err := o.createCategory(communityEvent.CategoryData.CategoryId, communityEvent.CategoryData.Name, communityEvent.CategoryData.ChannelsIds) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_CATEGORY_DELETE: _, err := o.deleteCategory(communityEvent.CategoryData.CategoryId) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_CATEGORY_EDIT: _, err := o.editCategory(communityEvent.CategoryData.CategoryId, communityEvent.CategoryData.Name, communityEvent.CategoryData.ChannelsIds) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_CHANNEL_CREATE: err := o.createChat(communityEvent.ChannelData.ChannelId, communityEvent.ChannelData.Channel) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_CHANNEL_DELETE: o.deleteChat(communityEvent.ChannelData.ChannelId) case protobuf.CommunityEvent_COMMUNITY_CHANNEL_EDIT: err := o.editChat(communityEvent.ChannelData.ChannelId, communityEvent.ChannelData.Channel) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_CHANNEL_REORDER: _, err := o.reorderChat(communityEvent.ChannelData.CategoryId, communityEvent.ChannelData.ChannelId, int(communityEvent.ChannelData.Position)) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_CATEGORY_REORDER: _, err := o.reorderCategories(communityEvent.CategoryData.CategoryId, int(communityEvent.CategoryData.Position)) if err != nil { return err } case protobuf.CommunityEvent_COMMUNITY_MEMBER_KICK: if o.IsControlNode() { _ = o.RemoveMembersFromOrg([]string{communityEvent.MemberToAction}) } case protobuf.CommunityEvent_COMMUNITY_MEMBER_BAN: if o.IsControlNode() { pk, err := common.HexToPubkey(communityEvent.MemberToAction) if err != nil { return err } o.banUserFromCommunity(pk, &protobuf.CommunityBanInfo{DeleteAllMessages: false}) } case protobuf.CommunityEvent_COMMUNITY_MEMBER_UNBAN: if o.IsControlNode() { pk, err := common.HexToPubkey(communityEvent.MemberToAction) if err != nil { return err } o.unbanUserFromCommunity(pk) } case protobuf.CommunityEvent_COMMUNITY_TOKEN_ADD: o.config.CommunityDescription.CommunityTokensMetadata = append(o.config.CommunityDescription.CommunityTokensMetadata, communityEvent.TokenMetadata) case protobuf.CommunityEvent_COMMUNITY_DELETE_BANNED_MEMBER_MESSAGES: if o.IsControlNode() { pk, err := common.HexToPubkey(communityEvent.MemberToAction) if err != nil { return err } err = o.deleteBannedMemberAllMessages(pk) if err != nil { return err } } } return nil } func (o *Community) addNewCommunityEvent(event *CommunityEvent) error { err := event.Validate() if err != nil { return err } // All events must be built on top of the control node CommunityDescription // If there were no events before, extract CommunityDescription from CommunityDescriptionProtocolMessage // and check the signature if o.config.EventsData == nil || len(o.config.EventsData.EventsBaseCommunityDescription) == 0 { _, err := unmarshalCommunityDescriptionMessage(o.config.CommunityDescriptionProtocolMessage, o.ControlNode()) if err != nil { return err } o.config.EventsData = &EventsData{ EventsBaseCommunityDescription: o.config.CommunityDescriptionProtocolMessage, Events: []CommunityEvent{}, } } event.Payload, err = proto.Marshal(event.ToProtobuf()) if err != nil { return err } o.config.EventsData.Events = append(o.config.EventsData.Events, *event) return nil } func (o *Community) toCommunityEventsMessage() *CommunityEventsMessage { return &CommunityEventsMessage{ CommunityID: o.ID(), EventsBaseCommunityDescription: o.config.EventsData.EventsBaseCommunityDescription, Events: o.config.EventsData.Events, } }