feat: introduce msgIdGenerator and add ID field to Message wrapper

This commit is contained in:
Wondertan 2022-01-09 18:05:29 +02:00 committed by vyzo
parent b57bcc8b36
commit 957fc4f80c
3 changed files with 35 additions and 3 deletions

32
midgen.go Normal file
View File

@ -0,0 +1,32 @@
package pubsub
import "sync"
type msgIDGenerator struct {
defGen MsgIdFunction
topicGens map[string]MsgIdFunction
topicGensLk sync.RWMutex
}
func (m *msgIDGenerator) Add(topic string, gen MsgIdFunction) {
m.topicGensLk.Lock()
m.topicGens[topic] = gen
m.topicGensLk.Unlock()
}
func (m *msgIDGenerator) GenID(msg *Message) string {
if msg.ID != "" {
return msg.ID
}
m.topicGensLk.RLock()
gen, ok := m.topicGens[msg.GetTopic()]
m.topicGensLk.RUnlock()
if !ok {
gen = m.defGen
}
msg.ID = gen(msg.Message)
return msg.ID
}

View File

@ -213,6 +213,7 @@ const (
type Message struct {
*pb.Message
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
}
@ -1047,8 +1048,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
continue
}
msg := &Message{pmsg, rpc.from, nil}
p.pushMsg(msg)
p.pushMsg(&Message{pmsg, "", rpc.from, nil})
}
}

View File

@ -283,7 +283,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
}
}
return t.p.val.PushLocal(&Message{m, t.p.host.ID(), nil})
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
}
// WithReadiness returns a publishing option for only publishing when the router is ready.