Merge pull request #218 from libp2p/feat/view-last-hop

Expose the peer that propagates a message to the recipient
This commit is contained in:
vyzo 2019-10-18 11:47:50 +03:00 committed by GitHub
commit aa9a8756d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 31 deletions

View File

@ -1398,3 +1398,59 @@ func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) m
} }
return peerState return peerState
} }
func TestMessageSender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const topic = "foobar"
hosts := getNetHosts(t, ctx, 3)
psubs := getPubsubs(ctx, hosts)
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
msgs = append(msgs, subch)
}
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(time.Millisecond * 100)
for i := 0; i < 3; i++ {
for j := 0; j < 100; j++ {
msg := []byte(fmt.Sprintf("%d sent %d", i, j))
psubs[i].Publish(topic, msg)
for k, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
var expectedHost int
if i == k {
expectedHost = i
} else if k != 1 {
expectedHost = 1
} else {
expectedHost = i
}
if got.ReceivedFrom != hosts[expectedHost].ID() {
t.Fatal("got wrong message sender")
}
}
}
}
}

2
go.mod
View File

@ -11,3 +11,5 @@ require (
github.com/multiformats/go-multistream v0.1.0 github.com/multiformats/go-multistream v0.1.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
) )
go 1.13

View File

@ -79,7 +79,7 @@ type PubSub struct {
topics map[string]map[peer.ID]struct{} topics map[string]map[peer.ID]struct{}
// sendMsg handles messages that have been validated // sendMsg handles messages that have been validated
sendMsg chan *sendReq sendMsg chan *Message
// addVal handles validator registration requests // addVal handles validator registration requests
addVal chan *addValReq addVal chan *addValReq
@ -135,6 +135,7 @@ type PubSubRouter interface {
type Message struct { type Message struct {
*pb.Message *pb.Message
ReceivedFrom peer.ID
} }
func (m *Message) GetFrom() peer.ID { func (m *Message) GetFrom() peer.ID {
@ -170,7 +171,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
getPeers: make(chan *listPeerReq), getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq), addSub: make(chan *addSubReq),
getTopics: make(chan *topicReq), getTopics: make(chan *topicReq),
sendMsg: make(chan *sendReq, 32), sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq), addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq), rmVal: make(chan *rmValReq),
eval: make(chan func()), eval: make(chan func()),
@ -373,10 +374,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleIncomingRPC(rpc) p.handleIncomingRPC(rpc)
case msg := <-p.publish: case msg := <-p.publish:
p.pushMsg(p.host.ID(), msg) p.pushMsg(msg)
case req := <-p.sendMsg: case msg := <-p.sendMsg:
p.publishMessage(req.from, req.msg.Message) p.publishMessage(msg)
case req := <-p.addVal: case req := <-p.addVal:
p.val.AddValidator(req) p.val.AddValidator(req)
@ -522,12 +523,12 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
// notifySubs sends a given message to all corresponding subscribers. // notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop. // Only called from processLoop.
func (p *PubSub) notifySubs(msg *pb.Message) { func (p *PubSub) notifySubs(msg *Message) {
for _, topic := range msg.GetTopicIDs() { for _, topic := range msg.GetTopicIDs() {
subs := p.myTopics[topic] subs := p.myTopics[topic]
for f := range subs { for f := range subs {
select { select {
case f.ch <- &Message{msg}: case f.ch <- msg:
default: default:
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic) log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
} }
@ -616,8 +617,8 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
continue continue
} }
msg := &Message{pmsg} msg := &Message{pmsg, rpc.from}
p.pushMsg(rpc.from, msg) p.pushMsg(msg)
} }
p.rt.HandleRPC(rpc) p.rt.HandleRPC(rpc)
@ -629,7 +630,8 @@ func msgID(pmsg *pb.Message) string {
} }
// pushMsg pushes a message performing validation as necessary // pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(src peer.ID, msg *Message) { func (p *PubSub) pushMsg(msg *Message) {
src := msg.ReceivedFrom
// reject messages from blacklisted peers // reject messages from blacklisted peers
if p.blacklist.Contains(src) { if p.blacklist.Contains(src) {
log.Warningf("dropping message from blacklisted peer %s", src) log.Warningf("dropping message from blacklisted peer %s", src)
@ -659,13 +661,13 @@ func (p *PubSub) pushMsg(src peer.ID, msg *Message) {
} }
if p.markSeen(id) { if p.markSeen(id) {
p.publishMessage(src, msg.Message) p.publishMessage(msg)
} }
} }
func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { func (p *PubSub) publishMessage(msg *Message) {
p.notifySubs(pmsg) p.notifySubs(msg)
p.rt.Publish(from, pmsg) p.rt.Publish(msg.ReceivedFrom, msg.Message)
} }
type addSubReq struct { type addSubReq struct {
@ -734,10 +736,11 @@ func (p *PubSub) GetTopics() []string {
// Publish publishes data to the given topic. // Publish publishes data to the given topic.
func (p *PubSub) Publish(topic string, data []byte) error { func (p *PubSub) Publish(topic string, data []byte) error {
seqno := p.nextSeqno() seqno := p.nextSeqno()
id := p.host.ID()
m := &pb.Message{ m := &pb.Message{
Data: data, Data: data,
TopicIDs: []string{topic}, TopicIDs: []string{topic},
From: []byte(p.host.ID()), From: []byte(id),
Seqno: seqno, Seqno: seqno,
} }
if p.signKey != nil { if p.signKey != nil {
@ -747,7 +750,7 @@ func (p *PubSub) Publish(topic string, data []byte) error {
return err return err
} }
} }
p.publish <- &Message{m} p.publish <- &Message{m, id}
return nil return nil
} }
@ -763,13 +766,6 @@ type listPeerReq struct {
topic string topic string
} }
// sendReq is a request to call publishMessage.
// It is issued after message validation is done.
type sendReq struct {
from peer.ID
msg *Message
}
// ListPeers returns a list of peers we are connected to in the given topic. // ListPeers returns a list of peers we are connected to in the given topic.
func (p *PubSub) ListPeers(topic string) []peer.ID { func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID) out := make(chan []peer.ID)

View File

@ -233,10 +233,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
} }
// no async validators, send the message // no async validators, send the message
v.p.sendMsg <- &sendReq{ v.p.sendMsg <- msg
from: src,
msg: msg,
}
} }
func (v *validation) validateSignature(msg *Message) bool { func (v *validation) validateSignature(msg *Message) bool {
@ -255,10 +252,7 @@ func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message
return return
} }
v.p.sendMsg <- &sendReq{ v.p.sendMsg <- msg
from: src,
msg: msg,
}
} }
func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool {