diff --git a/pubsub.go b/pubsub.go index 577f450..d607cc5 100644 --- a/pubsub.go +++ b/pubsub.go @@ -66,6 +66,9 @@ type PubSub struct { // addVal handles validator registration requests addVal chan *addValReq + // rmVal handles validator unregistration requests + rmVal chan *rmValReq + // topicVals tracks per topic validators topicVals map[string]*topicVal @@ -124,6 +127,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option getTopics: make(chan *topicReq), sendMsg: make(chan *sendReq, 32), addVal: make(chan *addValReq), + rmVal: make(chan *rmValReq), validateThrottle: make(chan struct{}, defaultValidateThrottle), myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), @@ -240,6 +244,9 @@ func (p *PubSub) processLoop(ctx context.Context) { case req := <-p.addVal: p.addValidator(req) + case req := <-p.rmVal: + p.rmValidator(req) + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return @@ -586,6 +593,11 @@ type addValReq struct { resp chan error } +type rmValReq struct { + topic string + resp chan error +} + type topicVal struct { topic string validate Validator @@ -662,6 +674,30 @@ func (ps *PubSub) addValidator(req *addValReq) { req.resp <- nil } +// UnregisterTopicValidator removes a validator from a topic +// returns an error if there was no validator registered with the topic +func (p *PubSub) UnregisterTopicValidator(topic string) error { + rmVal := &rmValReq{ + topic: topic, + resp: make(chan error, 1), + } + + p.rmVal <- rmVal + return <-rmVal.resp +} + +func (ps *PubSub) rmValidator(req *rmValReq) { + topic := req.topic + + _, ok := ps.topicVals[topic] + if ok { + delete(ps.topicVals, topic) + req.resp <- nil + } else { + req.resp <- fmt.Errorf("No validator for topic %s", topic) + } +} + func (val *topicVal) validateMsg(ctx context.Context, msg *Message) bool { vctx, cancel := context.WithTimeout(ctx, val.validateTimeout) defer cancel()