From 399bcb25a6c6260f50709f62d04b1c3e04d6fdb4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Feb 2018 22:14:54 +0200 Subject: [PATCH 1/2] UnregisterTopicValidator for removing topic validators --- pubsub.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pubsub.go b/pubsub.go index 577f450..d607cc5 100644 --- a/pubsub.go +++ b/pubsub.go @@ -66,6 +66,9 @@ type PubSub struct { // addVal handles validator registration requests addVal chan *addValReq + // rmVal handles validator unregistration requests + rmVal chan *rmValReq + // topicVals tracks per topic validators topicVals map[string]*topicVal @@ -124,6 +127,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option getTopics: make(chan *topicReq), sendMsg: make(chan *sendReq, 32), addVal: make(chan *addValReq), + rmVal: make(chan *rmValReq), validateThrottle: make(chan struct{}, defaultValidateThrottle), myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), @@ -240,6 +244,9 @@ func (p *PubSub) processLoop(ctx context.Context) { case req := <-p.addVal: p.addValidator(req) + case req := <-p.rmVal: + p.rmValidator(req) + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return @@ -586,6 +593,11 @@ type addValReq struct { resp chan error } +type rmValReq struct { + topic string + resp chan error +} + type topicVal struct { topic string validate Validator @@ -662,6 +674,30 @@ func (ps *PubSub) addValidator(req *addValReq) { req.resp <- nil } +// UnregisterTopicValidator removes a validator from a topic +// returns an error if there was no validator registered with the topic +func (p *PubSub) UnregisterTopicValidator(topic string) error { + rmVal := &rmValReq{ + topic: topic, + resp: make(chan error, 1), + } + + p.rmVal <- rmVal + return <-rmVal.resp +} + +func (ps *PubSub) rmValidator(req *rmValReq) { + topic := req.topic + + _, ok := ps.topicVals[topic] + if ok { + delete(ps.topicVals, topic) + req.resp <- nil + } else { + req.resp <- fmt.Errorf("No validator for topic %s", topic) + } +} + func (val *topicVal) validateMsg(ctx context.Context, msg *Message) bool { vctx, cancel := context.WithTimeout(ctx, val.validateTimeout) defer cancel() From ed1dd154b6d3637e6727257943bce6d59583150c Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Feb 2018 22:26:42 +0200 Subject: [PATCH 2/2] add test for UnregisterTopicValidator --- floodsub_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/floodsub_test.go b/floodsub_test.go index bf5ac80..54c7d96 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -341,6 +341,31 @@ func TestOneToOne(t *testing.T) { checkMessageRouting(t, "foobar", psubs, []*Subscription{sub}) } +func TestRegisterUnregisterValidator(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 1) + psubs := getPubsubs(ctx, hosts) + + err := psubs[0].RegisterTopicValidator("foo", func(context.Context, *Message) bool { + return true + }) + if err != nil { + t.Fatal(err) + } + + err = psubs[0].UnregisterTopicValidator("foo") + if err != nil { + t.Fatal(err) + } + + err = psubs[0].UnregisterTopicValidator("foo") + if err == nil { + t.Fatal("Unregistered bogus topic validator") + } +} + func TestValidate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()