UnregisterTopicValidator for removing topic validators
This commit is contained in:
parent
d553fb800c
commit
399bcb25a6
36
pubsub.go
36
pubsub.go
|
@ -66,6 +66,9 @@ type PubSub struct {
|
|||
// addVal handles validator registration requests
|
||||
addVal chan *addValReq
|
||||
|
||||
// rmVal handles validator unregistration requests
|
||||
rmVal chan *rmValReq
|
||||
|
||||
// topicVals tracks per topic validators
|
||||
topicVals map[string]*topicVal
|
||||
|
||||
|
@ -124,6 +127,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
|||
getTopics: make(chan *topicReq),
|
||||
sendMsg: make(chan *sendReq, 32),
|
||||
addVal: make(chan *addValReq),
|
||||
rmVal: make(chan *rmValReq),
|
||||
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||
myTopics: make(map[string]map[*Subscription]struct{}),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
|
@ -240,6 +244,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
case req := <-p.addVal:
|
||||
p.addValidator(req)
|
||||
|
||||
case req := <-p.rmVal:
|
||||
p.rmValidator(req)
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Info("pubsub processloop shutting down")
|
||||
return
|
||||
|
@ -586,6 +593,11 @@ type addValReq struct {
|
|||
resp chan error
|
||||
}
|
||||
|
||||
type rmValReq struct {
|
||||
topic string
|
||||
resp chan error
|
||||
}
|
||||
|
||||
type topicVal struct {
|
||||
topic string
|
||||
validate Validator
|
||||
|
@ -662,6 +674,30 @@ func (ps *PubSub) addValidator(req *addValReq) {
|
|||
req.resp <- nil
|
||||
}
|
||||
|
||||
// UnregisterTopicValidator removes a validator from a topic
|
||||
// returns an error if there was no validator registered with the topic
|
||||
func (p *PubSub) UnregisterTopicValidator(topic string) error {
|
||||
rmVal := &rmValReq{
|
||||
topic: topic,
|
||||
resp: make(chan error, 1),
|
||||
}
|
||||
|
||||
p.rmVal <- rmVal
|
||||
return <-rmVal.resp
|
||||
}
|
||||
|
||||
func (ps *PubSub) rmValidator(req *rmValReq) {
|
||||
topic := req.topic
|
||||
|
||||
_, ok := ps.topicVals[topic]
|
||||
if ok {
|
||||
delete(ps.topicVals, topic)
|
||||
req.resp <- nil
|
||||
} else {
|
||||
req.resp <- fmt.Errorf("No validator for topic %s", topic)
|
||||
}
|
||||
}
|
||||
|
||||
func (val *topicVal) validateMsg(ctx context.Context, msg *Message) bool {
|
||||
vctx, cancel := context.WithTimeout(ctx, val.validateTimeout)
|
||||
defer cancel()
|
||||
|
|
Loading…
Reference in New Issue