remove multi-topic message support

This commit is contained in:
vyzo 2020-09-29 18:05:54 +03:00
parent f7f33e10cc
commit d6c20b59fc
24 changed files with 1798 additions and 1473 deletions

598
compat/compat.pb.go Normal file
View File

@ -0,0 +1,598 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: compat.proto
package compat_pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Message struct {
From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`
TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`
Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_bced3ff93dcaa7f8, []int{0}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Message.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Message) XXX_Merge(src proto.Message) {
xxx_messageInfo_Message.Merge(m, src)
}
func (m *Message) XXX_Size() int {
return m.Size()
}
func (m *Message) XXX_DiscardUnknown() {
xxx_messageInfo_Message.DiscardUnknown(m)
}
var xxx_messageInfo_Message proto.InternalMessageInfo
func (m *Message) GetFrom() []byte {
if m != nil {
return m.From
}
return nil
}
func (m *Message) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *Message) GetSeqno() []byte {
if m != nil {
return m.Seqno
}
return nil
}
func (m *Message) GetTopicIDs() []string {
if m != nil {
return m.TopicIDs
}
return nil
}
func (m *Message) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
func (m *Message) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func init() {
proto.RegisterType((*Message)(nil), "compat.pb.Message")
}
func init() { proto.RegisterFile("compat.proto", fileDescriptor_bced3ff93dcaa7f8) }
var fileDescriptor_bced3ff93dcaa7f8 = []byte{
// 165 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0xce, 0xcf, 0x2d,
0x48, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0xf1, 0x92, 0x94, 0x26, 0x33,
0x72, 0xb1, 0xfb, 0xa6, 0x16, 0x17, 0x27, 0xa6, 0xa7, 0x0a, 0x09, 0x71, 0xb1, 0xa4, 0x15, 0xe5,
0xe7, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x20, 0xb1, 0x94, 0xc4, 0x92, 0x44,
0x09, 0x26, 0x88, 0x18, 0x88, 0x2d, 0x24, 0xc2, 0xc5, 0x5a, 0x9c, 0x5a, 0x98, 0x97, 0x2f, 0xc1,
0x0c, 0x16, 0x84, 0x70, 0x84, 0xa4, 0xb8, 0x38, 0x4a, 0xf2, 0x0b, 0x32, 0x93, 0x3d, 0x5d, 0x8a,
0x25, 0x58, 0x14, 0x98, 0x35, 0x38, 0x83, 0xe0, 0x7c, 0x21, 0x19, 0x2e, 0xce, 0xe2, 0xcc, 0xf4,
0xbc, 0xc4, 0x92, 0xd2, 0xa2, 0x54, 0x09, 0x56, 0xb0, 0x2e, 0x84, 0x80, 0x90, 0x00, 0x17, 0x73,
0x76, 0x6a, 0xa5, 0x04, 0x1b, 0x58, 0x1c, 0xc4, 0x74, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2,
0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x51, 0x81, 0xcf,
0x0e, 0xbd, 0x00, 0x00, 0x00,
}
func (m *Message) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Message) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Key != nil {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0x32
}
if m.Signature != nil {
i -= len(m.Signature)
copy(dAtA[i:], m.Signature)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Signature)))
i--
dAtA[i] = 0x2a
}
if len(m.TopicIDs) > 0 {
for iNdEx := len(m.TopicIDs) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.TopicIDs[iNdEx])
copy(dAtA[i:], m.TopicIDs[iNdEx])
i = encodeVarintCompat(dAtA, i, uint64(len(m.TopicIDs[iNdEx])))
i--
dAtA[i] = 0x22
}
}
if m.Seqno != nil {
i -= len(m.Seqno)
copy(dAtA[i:], m.Seqno)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Seqno)))
i--
dAtA[i] = 0x1a
}
if m.Data != nil {
i -= len(m.Data)
copy(dAtA[i:], m.Data)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Data)))
i--
dAtA[i] = 0x12
}
if m.From != nil {
i -= len(m.From)
copy(dAtA[i:], m.From)
i = encodeVarintCompat(dAtA, i, uint64(len(m.From)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintCompat(dAtA []byte, offset int, v uint64) int {
offset -= sovCompat(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Message) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.From != nil {
l = len(m.From)
n += 1 + l + sovCompat(uint64(l))
}
if m.Data != nil {
l = len(m.Data)
n += 1 + l + sovCompat(uint64(l))
}
if m.Seqno != nil {
l = len(m.Seqno)
n += 1 + l + sovCompat(uint64(l))
}
if len(m.TopicIDs) > 0 {
for _, s := range m.TopicIDs {
l = len(s)
n += 1 + l + sovCompat(uint64(l))
}
}
if m.Signature != nil {
l = len(m.Signature)
n += 1 + l + sovCompat(uint64(l))
}
if m.Key != nil {
l = len(m.Key)
n += 1 + l + sovCompat(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovCompat(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozCompat(x uint64) (n int) {
return sovCompat(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Message) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Message: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field From", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.From = append(m.From[:0], dAtA[iNdEx:postIndex]...)
if m.From == nil {
m.From = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
if m.Data == nil {
m.Data = []byte{}
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Seqno", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Seqno = append(m.Seqno[:0], dAtA[iNdEx:postIndex]...)
if m.Seqno == nil {
m.Seqno = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TopicIDs", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.TopicIDs = append(m.TopicIDs, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Signature", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Signature = append(m.Signature[:0], dAtA[iNdEx:postIndex]...)
if m.Signature == nil {
m.Signature = []byte{}
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCompat(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCompat
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthCompat
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipCompat(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCompat
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCompat
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCompat
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthCompat
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupCompat
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthCompat
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthCompat = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowCompat = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupCompat = fmt.Errorf("proto: unexpected end of group")
)

12
compat/compat.proto Normal file
View File

@ -0,0 +1,12 @@
syntax = "proto2";
package compat.pb;
message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}

83
compat_test.go Normal file
View File

@ -0,0 +1,83 @@
package pubsub
import (
"testing"
compat_pb "github.com/libp2p/go-libp2p-pubsub/compat"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
func TestMultitopicMessageCompatibility(t *testing.T) {
topic1 := "topic1"
topic2 := "topic2"
newMessage1 := &pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
Topic: &topic1,
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
oldMessage1 := &compat_pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
TopicIDs: []string{topic1},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
oldMessage2 := &compat_pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
TopicIDs: []string{topic1, topic2},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
newMessage1b, err := newMessage1.Marshal()
if err != nil {
t.Fatal(err)
}
oldMessage1b, err := oldMessage1.Marshal()
if err != nil {
t.Fatal(err)
}
oldMessage2b, err := oldMessage2.Marshal()
if err != nil {
t.Fatal(err)
}
newMessage := new(pb.Message)
oldMessage := new(compat_pb.Message)
err = newMessage.Unmarshal(oldMessage1b)
if err != nil {
t.Fatal(err)
}
if newMessage.GetTopic() != topic1 {
t.Fatalf("bad topic: expected %s, got %s", topic1, newMessage.GetTopic())
}
newMessage.Reset()
err = newMessage.Unmarshal(oldMessage2b)
if err != nil {
t.Fatal(err)
}
if newMessage.GetTopic() != topic2 {
t.Fatalf("bad topic: expected %s, got %s", topic2, newMessage.GetTopic())
}
err = oldMessage.Unmarshal(newMessage1b)
if err != nil {
t.Fatal(err)
}
topics := oldMessage.GetTopicIDs()
if len(topics) != 1 {
t.Fatalf("expected 1 topic, got %d", len(topics))
}
if topics[0] != topic1 {
t.Fatalf("bad topic: expected %s, got %s", topic1, topics[0])
}
}

View File

@ -75,21 +75,10 @@ func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
func (fs *FloodSubRouter) Publish(msg *Message) { func (fs *FloodSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom from := msg.ReceivedFrom
topic := msg.GetTopic()
tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
tmap, ok := fs.p.topics[topic]
if !ok {
continue
}
for p := range tmap {
tosend[p] = struct{}{}
}
}
out := rpcWithMessages(msg.Message) out := rpcWithMessages(msg.Message)
for pid := range tosend { for pid := range fs.p.topics[topic] {
if pid == from || pid == peer.ID(msg.GetFrom()) { if pid == from || pid == peer.ID(msg.GetFrom()) {
continue continue
} }

View File

@ -869,26 +869,26 @@ func (gs *GossipSubRouter) connector() {
func (gs *GossipSubRouter) Publish(msg *Message) { func (gs *GossipSubRouter) Publish(msg *Message) {
gs.mcache.Put(msg.Message) gs.mcache.Put(msg.Message)
from := msg.ReceivedFrom from := msg.ReceivedFrom
topic := msg.GetTopic()
tosend := make(map[peer.ID]struct{}) tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
// any peers in the topic?
tmap, ok := gs.p.topics[topic]
if !ok {
continue
}
if gs.floodPublish && from == gs.p.host.ID() { // any peers in the topic?
for p := range tmap { tmap, ok := gs.p.topics[topic]
_, direct := gs.direct[p] if !ok {
if direct || gs.score.Score(p) >= gs.publishThreshold { return
tosend[p] = struct{}{} }
}
if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
} }
continue
} }
} else {
// direct peers // direct peers
for p := range gs.direct { for p := range gs.direct {
_, inTopic := tmap[p] _, inTopic := tmap[p]

View File

@ -662,10 +662,10 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
// fail validation and reduce the attacker's score) // fail validation and reduce the attacker's score)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
msg := &pb.Message{ msg := &pb.Message{
Data: []byte("some data" + strconv.Itoa(i)), Data: []byte("some data" + strconv.Itoa(i)),
TopicIDs: []string{mytopic}, Topic: &mytopic,
From: []byte(attacker.ID()), From: []byte(attacker.ID()),
Seqno: []byte{byte(i + 1)}, Seqno: []byte{byte(i + 1)},
} }
writeMsg(&pb.RPC{ writeMsg(&pb.RPC{
Publish: []*pb.Message{msg}, Publish: []*pb.Message{msg},

View File

@ -48,14 +48,14 @@ func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction) {
} }
type CacheEntry struct { type CacheEntry struct {
mid string mid string
topics []string topic string
} }
func (mc *MessageCache) Put(msg *pb.Message) { func (mc *MessageCache) Put(msg *pb.Message) {
mid := mc.msgID(msg) mid := mc.msgID(msg)
mc.msgs[mid] = msg mc.msgs[mid] = msg
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()}) mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topic: msg.GetTopic()})
} }
func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
@ -83,11 +83,8 @@ func (mc *MessageCache) GetGossipIDs(topic string) []string {
var mids []string var mids []string
for _, entries := range mc.history[:mc.gossip] { for _, entries := range mc.history[:mc.gossip] {
for _, entry := range entries { for _, entry := range entries {
for _, t := range entry.topics { if entry.topic == topic {
if t == topic { mids = append(mids, entry.mid)
mids = append(mids, entry.mid)
break
}
} }
} }
} }

View File

@ -157,10 +157,11 @@ func makeTestMessage(n int) *pb.Message {
seqno := make([]byte, 8) seqno := make([]byte, 8)
binary.BigEndian.PutUint64(seqno, uint64(n)) binary.BigEndian.PutUint64(seqno, uint64(n))
data := []byte(fmt.Sprintf("%d", n)) data := []byte(fmt.Sprintf("%d", n))
topic := "test"
return &pb.Message{ return &pb.Message{
Data: data, Data: data,
TopicIDs: []string{"test"}, Topic: &topic,
From: []byte("test"), From: []byte("test"),
Seqno: seqno, Seqno: seqno,
} }
} }

View File

@ -230,7 +230,7 @@ type Message struct {
From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"` From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"` Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`
TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"` Topic *string `protobuf:"bytes,4,opt,name=topic" json:"topic,omitempty"`
Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"` Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"` Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -292,11 +292,11 @@ func (m *Message) GetSeqno() []byte {
return nil return nil
} }
func (m *Message) GetTopicIDs() []string { func (m *Message) GetTopic() string {
if m != nil { if m != nil && m.Topic != nil {
return m.TopicIDs return *m.Topic
} }
return nil return ""
} }
func (m *Message) GetSignature() []byte { func (m *Message) GetSignature() []byte {
@ -846,49 +846,49 @@ func init() {
func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) }
var fileDescriptor_77a6da22d6a3feb1 = []byte{ var fileDescriptor_77a6da22d6a3feb1 = []byte{
// 668 bytes of a gzipped FileDescriptorProto // 662 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcd, 0x6e, 0xd3, 0x4a, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x6e, 0xd3, 0x4a,
0x14, 0xc7, 0xef, 0xc4, 0x49, 0x1d, 0x9f, 0xba, 0xbd, 0xd1, 0xdc, 0xab, 0x5e, 0xdf, 0xa8, 0x8a, 0x14, 0xc6, 0xef, 0xd4, 0x4e, 0x1d, 0x9f, 0xba, 0xbd, 0xd1, 0xdc, 0xab, 0x62, 0xaa, 0x2a, 0x8a,
0x22, 0x23, 0xa1, 0x50, 0x8a, 0x17, 0x01, 0x89, 0x0d, 0x42, 0x94, 0x26, 0x22, 0x11, 0x6a, 0x1b, 0x8c, 0x84, 0x42, 0x29, 0x5e, 0x04, 0x24, 0x36, 0x08, 0x51, 0x9a, 0x88, 0x44, 0xa8, 0x6d, 0x34,
0x4d, 0x2b, 0x55, 0x2c, 0x6d, 0x67, 0xd2, 0x58, 0x69, 0x3c, 0xc6, 0x1f, 0x45, 0x7d, 0x03, 0xf6, 0xad, 0x54, 0xb1, 0xb4, 0x9d, 0x49, 0x63, 0xa5, 0xf1, 0x18, 0xff, 0x29, 0xea, 0x1b, 0xb0, 0x61,
0xf0, 0x2c, 0x3c, 0x03, 0x0b, 0x16, 0x3c, 0x02, 0xea, 0x8e, 0xb7, 0x40, 0x73, 0x3c, 0x4e, 0x9c, 0xc7, 0xb3, 0xf0, 0x0c, 0x2c, 0x58, 0xf0, 0x08, 0xa8, 0x3b, 0xde, 0x02, 0xcd, 0xf1, 0x38, 0x71,
0x7e, 0xc1, 0xca, 0x67, 0x8e, 0x7f, 0xff, 0x73, 0xfe, 0xe7, 0x78, 0x0c, 0x46, 0x1c, 0xf9, 0x4e, 0x5a, 0x5a, 0x58, 0xf9, 0xcc, 0x99, 0xdf, 0x77, 0xce, 0x37, 0xc7, 0x33, 0x60, 0x26, 0x71, 0xe0,
0x14, 0x8b, 0x54, 0x50, 0x23, 0xca, 0xbc, 0x24, 0xf3, 0x9c, 0xc8, 0xb3, 0x7f, 0x12, 0xd0, 0xd8, 0xc6, 0x89, 0xc8, 0x04, 0x35, 0xe3, 0xdc, 0x4f, 0x73, 0xdf, 0x8d, 0x7d, 0xe7, 0x27, 0x01, 0x8d,
0x68, 0x9f, 0xbe, 0x80, 0x8d, 0x24, 0xf3, 0x12, 0x3f, 0x0e, 0xa2, 0x34, 0x10, 0x61, 0x62, 0x91, 0x0d, 0xf7, 0xe9, 0x0b, 0x58, 0x4f, 0x73, 0x3f, 0x0d, 0x92, 0x30, 0xce, 0x42, 0x11, 0xa5, 0x36,
0xb6, 0xd6, 0x59, 0xef, 0x6e, 0x39, 0x0b, 0xd4, 0x61, 0xa3, 0x7d, 0xe7, 0x38, 0xf3, 0x8e, 0xa2, 0x69, 0x69, 0xed, 0xb5, 0xce, 0xa6, 0x3b, 0x47, 0x5d, 0x36, 0xdc, 0x77, 0x8f, 0x73, 0xff, 0x28,
0x34, 0x61, 0xab, 0x30, 0xdd, 0x05, 0x3d, 0xca, 0xbc, 0xf3, 0x20, 0x99, 0x5a, 0x15, 0xd4, 0xd1, 0xce, 0x52, 0xb6, 0x0c, 0xd3, 0x5d, 0x30, 0xe2, 0xdc, 0x3f, 0x0f, 0xd3, 0x89, 0xbd, 0x82, 0x3a,
0x92, 0xee, 0x80, 0x27, 0x89, 0x7b, 0xc6, 0x59, 0x81, 0xd0, 0xa7, 0xa0, 0xfb, 0x22, 0x4c, 0x63, 0x5a, 0xd1, 0x1d, 0xf0, 0x34, 0xf5, 0xce, 0x38, 0x2b, 0x11, 0xfa, 0x14, 0x8c, 0x40, 0x44, 0x59,
0x71, 0x6e, 0x69, 0x6d, 0xd2, 0x59, 0xef, 0xfe, 0x5f, 0xa2, 0xf7, 0xf3, 0x37, 0x0b, 0x91, 0x22, 0x22, 0xce, 0x6d, 0xad, 0x45, 0xda, 0x6b, 0x9d, 0xfb, 0x15, 0x7a, 0xbf, 0xd8, 0x99, 0x8b, 0x14,
0x9b, 0x7b, 0xa0, 0xab, 0xe6, 0x74, 0x1b, 0x0c, 0xd5, 0xde, 0xe3, 0x16, 0x69, 0x93, 0x4e, 0x9d, 0xb9, 0xb5, 0x07, 0x86, 0x6a, 0x4e, 0xb7, 0xc1, 0x54, 0xed, 0x7d, 0x6e, 0x93, 0x16, 0x69, 0xd7,
0x2d, 0x13, 0xd4, 0x02, 0x3d, 0x15, 0x51, 0xe0, 0x07, 0x63, 0xab, 0xd2, 0x26, 0x1d, 0x83, 0x15, 0xd9, 0x22, 0x41, 0x6d, 0x30, 0x32, 0x11, 0x87, 0x41, 0x38, 0xb2, 0x57, 0x5a, 0xa4, 0x6d, 0xb2,
0x47, 0xfb, 0x13, 0x01, 0x5d, 0xd5, 0xa5, 0x14, 0xaa, 0x93, 0x58, 0xcc, 0x51, 0x6e, 0x32, 0x8c, 0x72, 0xe9, 0x7c, 0x22, 0x60, 0xa8, 0xba, 0x94, 0x82, 0x3e, 0x4e, 0xc4, 0x0c, 0xe5, 0x16, 0xc3,
0x65, 0x6e, 0xec, 0xa6, 0x2e, 0xca, 0x4c, 0x86, 0x31, 0xfd, 0x17, 0x6a, 0x09, 0x7f, 0x1f, 0x0a, 0x58, 0xe6, 0x46, 0x5e, 0xe6, 0xa1, 0xcc, 0x62, 0x18, 0xd3, 0xff, 0xa1, 0x96, 0xf2, 0xf7, 0x91,
0x74, 0x6a, 0xb2, 0xfc, 0x40, 0x9b, 0x50, 0xc7, 0xa2, 0xc3, 0x5e, 0x62, 0x55, 0xdb, 0x5a, 0xc7, 0x40, 0xa7, 0x16, 0x2b, 0x16, 0x32, 0x8b, 0x45, 0x6d, 0x1d, 0x3b, 0x14, 0x0b, 0xf4, 0x15, 0x9e,
0x60, 0x8b, 0x33, 0xba, 0x0b, 0xce, 0x42, 0x37, 0xcd, 0x62, 0x6e, 0xd5, 0x50, 0xb5, 0x4c, 0xd0, 0x45, 0x5e, 0x96, 0x27, 0xdc, 0xae, 0x21, 0xbf, 0x48, 0xd0, 0x06, 0x68, 0x53, 0x7e, 0x69, 0xaf,
0x06, 0x68, 0x33, 0x7e, 0x69, 0xad, 0x61, 0x5e, 0x86, 0xf6, 0x37, 0x02, 0x9b, 0xab, 0x43, 0xd3, 0x62, 0x5e, 0x86, 0xce, 0x37, 0x02, 0x1b, 0xcb, 0xc7, 0xa5, 0x4f, 0xa0, 0x16, 0x4e, 0xbc, 0x0b,
0x27, 0x50, 0x0b, 0xa6, 0xee, 0x05, 0x57, 0x1f, 0xe1, 0xbf, 0x9b, 0xeb, 0x19, 0x0e, 0xdc, 0x0b, 0xae, 0xc6, 0x7f, 0xef, 0xe6, 0x60, 0x06, 0x7d, 0xef, 0x82, 0xb3, 0x82, 0x42, 0xfc, 0x83, 0x17,
0xce, 0x72, 0x0a, 0xf1, 0x0f, 0x6e, 0x98, 0xaa, 0xdd, 0xdf, 0x86, 0x9f, 0xba, 0x61, 0xca, 0x72, 0x65, 0x6a, 0xea, 0xbf, 0xc3, 0x4f, 0xbd, 0x28, 0x63, 0x05, 0x25, 0xf1, 0xb3, 0xc4, 0x1b, 0x67,
0x4a, 0xe2, 0x67, 0xb1, 0x3b, 0x49, 0x2d, 0xed, 0x2e, 0xfc, 0x8d, 0x7c, 0xcd, 0x72, 0x4a, 0xe2, 0xb6, 0x76, 0x1b, 0xfe, 0x46, 0x6e, 0xb3, 0x82, 0x92, 0x78, 0x9c, 0xe4, 0x11, 0xb7, 0xf5, 0xdb,
0x51, 0x9c, 0x85, 0x1c, 0x07, 0xbd, 0x15, 0x1f, 0xc9, 0xd7, 0x2c, 0xa7, 0xec, 0x01, 0x98, 0x65, 0xf0, 0xa1, 0xdc, 0x66, 0x05, 0xe5, 0xf4, 0xc1, 0xaa, 0x7a, 0x9c, 0xff, 0x88, 0x41, 0x17, 0xa7,
0x8f, 0x8b, 0xcf, 0x31, 0xec, 0xe1, 0xae, 0x8b, 0xcf, 0x31, 0xec, 0xd1, 0x16, 0xc0, 0x3c, 0x1f, 0x5c, 0xfe, 0x88, 0x41, 0x97, 0x36, 0x01, 0x66, 0xc5, 0x81, 0x07, 0xdd, 0x14, 0xbd, 0x9b, 0xac,
0x58, 0xae, 0xb1, 0x82, 0x6b, 0x2c, 0x65, 0x6c, 0x67, 0x59, 0x49, 0xda, 0xbf, 0xc6, 0x93, 0x1b, 0x92, 0x71, 0xdc, 0x45, 0x25, 0x69, 0xff, 0x1a, 0x4f, 0x6e, 0xf0, 0xed, 0x39, 0x8f, 0xfe, 0x6f,
0x7c, 0x67, 0xc1, 0xa3, 0xff, 0xbb, 0x3b, 0xdb, 0xf3, 0x05, 0x89, 0xd6, 0xef, 0xf1, 0xf8, 0x08, 0xef, 0xec, 0xcc, 0xe6, 0x24, 0x5a, 0xbf, 0xc3, 0xe3, 0x23, 0xa8, 0xc5, 0x9c, 0x27, 0xa9, 0x1a,
0x6a, 0x11, 0xe7, 0x71, 0xa2, 0x56, 0xfb, 0x4f, 0x69, 0xf8, 0x11, 0xe7, 0xf1, 0x30, 0x9c, 0x08, 0xed, 0x7f, 0x95, 0xc3, 0x0f, 0x39, 0x4f, 0x06, 0xd1, 0x58, 0xb0, 0x82, 0x90, 0x45, 0x7c, 0x2f,
0x96, 0x13, 0xb2, 0x88, 0xe7, 0xfa, 0x33, 0x31, 0x99, 0xe0, 0x5d, 0xa9, 0xb2, 0xe2, 0x68, 0x1f, 0x98, 0x8a, 0xf1, 0x18, 0x6f, 0x89, 0xce, 0xca, 0xa5, 0x73, 0x08, 0xf5, 0x12, 0xa6, 0x9b, 0xb0,
0x42, 0xbd, 0x80, 0xe9, 0x16, 0xac, 0x49, 0x5c, 0x75, 0x32, 0x99, 0x3a, 0xd1, 0x1d, 0x68, 0xc8, 0x2a, 0x71, 0xd5, 0xc9, 0x62, 0x6a, 0x45, 0x77, 0xa0, 0x21, 0x2f, 0x09, 0x1f, 0x49, 0x92, 0xf1,
0x4b, 0xc2, 0xc7, 0x92, 0x64, 0xdc, 0x17, 0xf1, 0x58, 0xdd, 0xc3, 0x1b, 0x79, 0xfb, 0x8b, 0x06, 0x40, 0x24, 0x23, 0x75, 0x03, 0x6f, 0xe4, 0x9d, 0x2f, 0x1a, 0xfc, 0x7b, 0x22, 0x0d, 0x76, 0x79,
0x7f, 0x9f, 0x48, 0x83, 0x3d, 0x9e, 0xff, 0x83, 0x22, 0x96, 0x77, 0x37, 0x74, 0xe7, 0x5c, 0xf9, 0xf1, 0xfa, 0x44, 0x22, 0x6f, 0x6d, 0xe4, 0xcd, 0xb8, 0xf2, 0x8f, 0x31, 0x7d, 0x0e, 0xba, 0x97,
0xc7, 0x98, 0x3e, 0x87, 0xaa, 0x9b, 0xa5, 0x53, 0xac, 0xb3, 0xde, 0x7d, 0x50, 0xf2, 0x7e, 0x4d, 0x67, 0x13, 0xac, 0xb3, 0xd6, 0x79, 0x50, 0xf1, 0x7e, 0x4d, 0xed, 0xee, 0xe5, 0xd9, 0x04, 0x5f,
0xed, 0xec, 0x65, 0xe9, 0x14, 0xff, 0x6b, 0x14, 0xd0, 0x67, 0xa0, 0xf1, 0xd0, 0x57, 0x3f, 0xa7, 0x34, 0x0a, 0xe8, 0x33, 0xd0, 0x78, 0x14, 0xa8, 0x67, 0xe9, 0xdc, 0xa1, 0xeb, 0x45, 0x01, 0xca,
0x7d, 0x8f, 0xae, 0x1f, 0xfa, 0x28, 0x93, 0x78, 0xf3, 0x23, 0x81, 0x7a, 0x51, 0x88, 0xbe, 0x82, 0x24, 0xbe, 0xf5, 0x91, 0x40, 0xbd, 0x2c, 0x44, 0x5f, 0x81, 0x3e, 0x13, 0xa3, 0xc2, 0xcf, 0x46,
0xea, 0x5c, 0x8c, 0x73, 0x3f, 0x9b, 0xdd, 0xdd, 0x3f, 0xe8, 0x8d, 0xc1, 0x81, 0x18, 0x73, 0x86, 0x67, 0xf7, 0x2f, 0x7a, 0x63, 0x70, 0x20, 0x46, 0x9c, 0xa1, 0x52, 0x9e, 0x68, 0xca, 0x2f, 0x8b,
0x4a, 0x39, 0xd1, 0x8c, 0x5f, 0xe6, 0x9b, 0x37, 0x19, 0xc6, 0xf6, 0xc3, 0xbc, 0x83, 0xa4, 0x68, 0xc9, 0x5b, 0x0c, 0x63, 0xe7, 0x61, 0xd1, 0x41, 0x52, 0xb4, 0x0e, 0xfa, 0xe1, 0xd1, 0x61, 0xaf,
0x1d, 0xaa, 0x87, 0x47, 0x87, 0xfd, 0xc6, 0x5f, 0x54, 0x07, 0xed, 0x6d, 0xff, 0x5d, 0x83, 0xc8, 0xf1, 0x0f, 0x35, 0x40, 0x7b, 0xdb, 0x7b, 0xd7, 0x20, 0x32, 0x38, 0x3d, 0x3a, 0x69, 0xac, 0x6c,
0xe0, 0xf4, 0xe8, 0xa4, 0x51, 0x69, 0x7e, 0x26, 0xa0, 0x2b, 0x6f, 0xf4, 0xe5, 0x8a, 0x93, 0x9d, 0x7d, 0x26, 0x60, 0x28, 0x6f, 0xf4, 0xe5, 0x92, 0x93, 0x9d, 0x3f, 0x9f, 0x46, 0x7e, 0x2b, 0x3e,
0xdf, 0x4f, 0x23, 0x9f, 0x25, 0x1f, 0xdb, 0x60, 0xcc, 0xf8, 0xe5, 0xc0, 0x4d, 0xa6, 0xbc, 0x30, 0xb6, 0xc1, 0x9c, 0xf2, 0xcb, 0xbe, 0x97, 0x4e, 0x78, 0x69, 0x66, 0x91, 0x70, 0x1e, 0x63, 0xa3,
0xb3, 0x4c, 0xd8, 0x8f, 0xb1, 0xd1, 0x35, 0x43, 0x1b, 0x60, 0x1c, 0x0f, 0xf6, 0x58, 0xbf, 0xb7, 0x6b, 0x86, 0xd6, 0xc1, 0x3c, 0xee, 0xef, 0xb1, 0x5e, 0x77, 0xd9, 0xd6, 0x6b, 0xeb, 0xeb, 0x55,
0x6a, 0xeb, 0xb5, 0xf9, 0xf5, 0xaa, 0x45, 0xbe, 0x5f, 0xb5, 0xc8, 0x8f, 0xab, 0x16, 0xf9, 0x15, 0x93, 0x7c, 0xbf, 0x6a, 0x92, 0x1f, 0x57, 0x4d, 0xf2, 0x2b, 0x00, 0x00, 0xff, 0xff, 0xfc, 0x52,
0x00, 0x00, 0xff, 0xff, 0xd4, 0xb4, 0x37, 0x28, 0x91, 0x05, 0x00, 0x00, 0x7a, 0xa2, 0x8b, 0x05, 0x00, 0x00,
} }
func (m *RPC) Marshal() (dAtA []byte, err error) { func (m *RPC) Marshal() (dAtA []byte, err error) {
@ -1040,14 +1040,12 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i-- i--
dAtA[i] = 0x2a dAtA[i] = 0x2a
} }
if len(m.TopicIDs) > 0 { if m.Topic != nil {
for iNdEx := len(m.TopicIDs) - 1; iNdEx >= 0; iNdEx-- { i -= len(*m.Topic)
i -= len(m.TopicIDs[iNdEx]) copy(dAtA[i:], *m.Topic)
copy(dAtA[i:], m.TopicIDs[iNdEx]) i = encodeVarintRpc(dAtA, i, uint64(len(*m.Topic)))
i = encodeVarintRpc(dAtA, i, uint64(len(m.TopicIDs[iNdEx]))) i--
i-- dAtA[i] = 0x22
dAtA[i] = 0x22
}
} }
if m.Seqno != nil { if m.Seqno != nil {
i -= len(m.Seqno) i -= len(m.Seqno)
@ -1579,11 +1577,9 @@ func (m *Message) Size() (n int) {
l = len(m.Seqno) l = len(m.Seqno)
n += 1 + l + sovRpc(uint64(l)) n += 1 + l + sovRpc(uint64(l))
} }
if len(m.TopicIDs) > 0 { if m.Topic != nil {
for _, s := range m.TopicIDs { l = len(*m.Topic)
l = len(s) n += 1 + l + sovRpc(uint64(l))
n += 1 + l + sovRpc(uint64(l))
}
} }
if m.Signature != nil { if m.Signature != nil {
l = len(m.Signature) l = len(m.Signature)
@ -2207,7 +2203,7 @@ func (m *Message) Unmarshal(dAtA []byte) error {
iNdEx = postIndex iNdEx = postIndex
case 4: case 4:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TopicIDs", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
} }
var stringLen uint64 var stringLen uint64
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -2235,7 +2231,8 @@ func (m *Message) Unmarshal(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.TopicIDs = append(m.TopicIDs, string(dAtA[iNdEx:postIndex])) s := string(dAtA[iNdEx:postIndex])
m.Topic = &s
iNdEx = postIndex iNdEx = postIndex
case 5: case 5:
if wireType != 2 { if wireType != 2 {

View File

@ -18,7 +18,7 @@ message Message {
optional bytes from = 1; optional bytes from = 1;
optional bytes data = 2; optional bytes data = 2;
optional bytes seqno = 3; optional bytes seqno = 3;
repeated string topicIDs = 4; optional string topic = 4;
optional bytes signature = 5; optional bytes signature = 5;
optional bytes key = 6; optional bytes key = 6;
} }

File diff suppressed because it is too large Load Diff

View File

@ -39,25 +39,25 @@ message TraceEvent {
message PublishMessage { message PublishMessage {
optional bytes messageID = 1; optional bytes messageID = 1;
repeated string topics = 2; optional string topic = 2;
} }
message RejectMessage { message RejectMessage {
optional bytes messageID = 1; optional bytes messageID = 1;
optional bytes receivedFrom = 2; optional bytes receivedFrom = 2;
optional string reason = 3; optional string reason = 3;
repeated string topics = 4; optional string topic = 4;
} }
message DuplicateMessage { message DuplicateMessage {
optional bytes messageID = 1; optional bytes messageID = 1;
optional bytes receivedFrom = 2; optional bytes receivedFrom = 2;
repeated string topics = 3; optional string topic = 3;
} }
message DeliverMessage { message DeliverMessage {
optional bytes messageID = 1; optional bytes messageID = 1;
repeated string topics = 2; optional string topic = 2;
} }
message AddPeer { message AddPeer {
@ -110,7 +110,7 @@ message TraceEvent {
message MessageMeta { message MessageMeta {
optional bytes messageID = 1; optional bytes messageID = 1;
repeated string topics = 2; optional string topic = 2;
} }
message SubMeta { message SubMeta {

View File

@ -396,10 +396,8 @@ func (pg *peerGater) DeliverMessage(msg *Message) {
st := pg.getPeerStats(msg.ReceivedFrom) st := pg.getPeerStats(msg.ReceivedFrom)
weight := 0.0 topic := msg.GetTopic()
for _, topic := range msg.GetTopicIDs() { weight := pg.params.TopicDeliveryWeights[topic]
weight += pg.params.TopicDeliveryWeights[topic]
}
if weight == 0 { if weight == 0 {
weight = 1 weight = 1

View File

@ -834,14 +834,13 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
// notifySubs sends a given message to all corresponding subscribers. // notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop. // Only called from processLoop.
func (p *PubSub) notifySubs(msg *Message) { func (p *PubSub) notifySubs(msg *Message) {
for _, topic := range msg.GetTopicIDs() { topic := msg.GetTopic()
subs := p.mySubs[topic] subs := p.mySubs[topic]
for f := range subs { for f := range subs {
select { select {
case f.ch <- msg: case f.ch <- msg:
default: default:
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic) log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
}
} }
} }
} }
@ -873,12 +872,10 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
return false return false
} }
for _, t := range msg.GetTopicIDs() { topic := msg.GetTopic()
if _, ok := p.mySubs[t]; ok { _, ok := p.mySubs[topic]
return true
} return ok
}
return false
} }
// canRelayMsg returns whether we are able to relay for one of the topics // canRelayMsg returns whether we are able to relay for one of the topics
@ -888,12 +885,10 @@ func (p *PubSub) canRelayMsg(msg *pb.Message) bool {
return false return false
} }
for _, t := range msg.GetTopicIDs() { topic := msg.GetTopic()
if relays := p.myRelays[t]; relays != 0 { relays := p.myRelays[topic]
return true
} return relays > 0
}
return false
} }
func (p *PubSub) notifyLeave(topic string, pid peer.ID) { func (p *PubSub) notifyLeave(topic string, pid peer.ID) {

View File

@ -103,22 +103,21 @@ func (rs *RandomSubRouter) Publish(msg *Message) {
rspeers := make(map[peer.ID]struct{}) rspeers := make(map[peer.ID]struct{})
src := peer.ID(msg.GetFrom()) src := peer.ID(msg.GetFrom())
for _, topic := range msg.GetTopicIDs() { topic := msg.GetTopic()
tmap, ok := rs.p.topics[topic] tmap, ok := rs.p.topics[topic]
if !ok { if !ok {
return
}
for p := range tmap {
if p == from || p == src {
continue continue
} }
for p := range tmap { if rs.peers[p] == FloodSubID {
if p == from || p == src { tosend[p] = struct{}{}
continue } else {
} rspeers[p] = struct{}{}
if rs.peers[p] == FloodSubID {
tosend[p] = struct{}{}
} else {
rspeers[p] = struct{}{}
}
} }
} }

View File

@ -865,14 +865,13 @@ func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
return return
} }
for _, topic := range msg.GetTopicIDs() { topic := msg.GetTopic()
tstats, ok := pstats.getTopicStats(topic, ps.params) tstats, ok := pstats.getTopicStats(topic, ps.params)
if !ok { if !ok {
continue return
}
tstats.invalidMessageDeliveries += 1
} }
tstats.invalidMessageDeliveries += 1
} }
// markFirstMessageDelivery increments the "first message deliveries" counter // markFirstMessageDelivery increments the "first message deliveries" counter
@ -884,27 +883,26 @@ func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
return return
} }
for _, topic := range msg.GetTopicIDs() { topic := msg.GetTopic()
tstats, ok := pstats.getTopicStats(topic, ps.params) tstats, ok := pstats.getTopicStats(topic, ps.params)
if !ok { if !ok {
continue return
} }
cap := ps.params.Topics[topic].FirstMessageDeliveriesCap cap := ps.params.Topics[topic].FirstMessageDeliveriesCap
tstats.firstMessageDeliveries += 1 tstats.firstMessageDeliveries += 1
if tstats.firstMessageDeliveries > cap { if tstats.firstMessageDeliveries > cap {
tstats.firstMessageDeliveries = cap tstats.firstMessageDeliveries = cap
} }
if !tstats.inMesh { if !tstats.inMesh {
continue return
} }
cap = ps.params.Topics[topic].MeshMessageDeliveriesCap cap = ps.params.Topics[topic].MeshMessageDeliveriesCap
tstats.meshMessageDeliveries += 1 tstats.meshMessageDeliveries += 1
if tstats.meshMessageDeliveries > cap { if tstats.meshMessageDeliveries > cap {
tstats.meshMessageDeliveries = cap tstats.meshMessageDeliveries = cap
}
} }
} }
@ -912,41 +910,34 @@ func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
// for messages we've seen before, as long the message was received within the // for messages we've seen before, as long the message was received within the
// P3 window. // P3 window.
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) { func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) {
var now time.Time
pstats, ok := ps.peerStats[p] pstats, ok := ps.peerStats[p]
if !ok { if !ok {
return return
} }
if !validated.IsZero() { topic := msg.GetTopic()
now = time.Now() tstats, ok := pstats.getTopicStats(topic, ps.params)
if !ok {
return
} }
for _, topic := range msg.GetTopicIDs() { if !tstats.inMesh {
tstats, ok := pstats.getTopicStats(topic, ps.params) return
if !ok { }
continue
}
if !tstats.inMesh { tparams := ps.params.Topics[topic]
continue
}
tparams := ps.params.Topics[topic] // check against the mesh delivery window -- if the validated time is passed as 0, then
// the message was received before we finished validation and thus falls within the mesh
// delivery window.
if !validated.IsZero() && time.Since(validated) > tparams.MeshMessageDeliveriesWindow {
return
}
// check against the mesh delivery window -- if the validated time is passed as 0, then cap := tparams.MeshMessageDeliveriesCap
// the message was received before we finished validation and thus falls within the mesh tstats.meshMessageDeliveries += 1
// delivery window. if tstats.meshMessageDeliveries > cap {
if !validated.IsZero() && now.After(validated.Add(tparams.MeshMessageDeliveriesWindow)) { tstats.meshMessageDeliveries = cap
continue
}
cap := tparams.MeshMessageDeliveriesCap
tstats.meshMessageDeliveries += 1
if tstats.meshMessageDeliveries > cap {
tstats.meshMessageDeliveries = cap
}
} }
} }

View File

@ -108,7 +108,7 @@ func TestScoreFirstMessageDeliveries(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -148,7 +148,7 @@ func TestScoreFirstMessageDeliveriesCap(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -188,7 +188,7 @@ func TestScoreFirstMessageDeliveriesDecay(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -268,7 +268,7 @@ func TestScoreMeshMessageDeliveries(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -338,7 +338,7 @@ func TestScoreMeshMessageDeliveriesDecay(t *testing.T) {
nMessages := 40 nMessages := 40
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -412,7 +412,7 @@ func TestScoreMeshFailurePenalty(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -472,7 +472,7 @@ func TestScoreInvalidMessageDeliveries(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.RejectMessage(&msg, rejectInvalidSignature) ps.RejectMessage(&msg, rejectInvalidSignature)
} }
@ -509,7 +509,7 @@ func TestScoreInvalidMessageDeliveriesDecay(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.RejectMessage(&msg, rejectInvalidSignature) ps.RejectMessage(&msg, rejectInvalidSignature)
} }
@ -555,7 +555,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
ps.AddPeer(peerB, "myproto") ps.AddPeer(peerB, "myproto")
pbMsg := makeTestMessage(0) pbMsg := makeTestMessage(0)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
msg2 := Message{ReceivedFrom: peerB, Message: pbMsg} msg2 := Message{ReceivedFrom: peerB, Message: pbMsg}
@ -887,7 +887,7 @@ func TestScoreRecapTopicParams(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.DeliverMessage(&msg) ps.DeliverMessage(&msg)
@ -969,7 +969,7 @@ func TestScoreResetTopicParams(t *testing.T) {
nMessages := 100 nMessages := 100
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
pbMsg := makeTestMessage(i) pbMsg := makeTestMessage(i)
pbMsg.TopicIDs = []string{mytopic} pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg) ps.ValidateMessage(&msg)
ps.RejectMessage(&msg, rejectValidationFailed) ps.RejectMessage(&msg, rejectValidationFailed)

View File

@ -28,11 +28,12 @@ func testSignVerify(t *testing.T, privk crypto.PrivKey) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
topic := "foo"
m := pb.Message{ m := pb.Message{
Data: []byte("abc"), Data: []byte("abc"),
TopicIDs: []string{"foo"}, Topic: &topic,
From: []byte(id), From: []byte(id),
Seqno: []byte("123"), Seqno: []byte("123"),
} }
signMessage(id, privk, &m) signMessage(id, privk, &m)
err = verifyMessageSignature(&m) err = verifyMessageSignature(&m)

View File

@ -151,11 +151,10 @@ func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error {
} }
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
for _, topic := range msg.TopicIDs { topic := msg.GetTopic()
err := t.bumpDeliveryTag(p, topic) err := t.bumpDeliveryTag(p, topic)
if err != nil { if err != nil {
log.Warnf("error bumping delivery tag: %s", err) log.Warnf("error bumping delivery tag: %s", err)
}
} }
} }

View File

@ -88,16 +88,16 @@ func TestTagTracerDeliveryTags(t *testing.T) {
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
// deliver only 5 messages to topic 2 (less than the cap) // deliver only 5 messages to topic 2 (less than the cap)
topics := []string{topic1} topic := &topic1
if i < 5 { if i < 5 {
topics = append(topics, topic2) topic = &topic2
} }
msg := &Message{ msg := &Message{
ReceivedFrom: p, ReceivedFrom: p,
Message: &pb.Message{ Message: &pb.Message{
From: []byte(p), From: []byte(p),
Data: []byte("hello"), Data: []byte("hello"),
TopicIDs: topics, Topic: topic,
}, },
} }
tt.DeliverMessage(msg) tt.DeliverMessage(msg)
@ -175,14 +175,13 @@ func TestTagTracerDeliveryTagsNearFirst(t *testing.T) {
tt.Join(topic) tt.Join(topic)
for i := 0; i < GossipSubConnTagMessageDeliveryCap+5; i++ { for i := 0; i < GossipSubConnTagMessageDeliveryCap+5; i++ {
topics := []string{topic}
msg := &Message{ msg := &Message{
ReceivedFrom: p, ReceivedFrom: p,
Message: &pb.Message{ Message: &pb.Message{
From: []byte(p), From: []byte(p),
Data: []byte(fmt.Sprintf("msg-%d", i)), Data: []byte(fmt.Sprintf("msg-%d", i)),
TopicIDs: topics, Topic: &topic,
Seqno: []byte(fmt.Sprintf("%d", i)), Seqno: []byte(fmt.Sprintf("%d", i)),
}, },
} }

View File

@ -212,10 +212,10 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
} }
m := &pb.Message{ m := &pb.Message{
Data: data, Data: data,
TopicIDs: []string{t.topic}, Topic: &t.topic,
From: nil, From: nil,
Seqno: nil, Seqno: nil,
} }
if t.p.signID != "" { if t.p.signID != "" {
m.From = []byte(t.p.signID) m.From = []byte(t.p.signID)

View File

@ -53,7 +53,7 @@ func (t *pubsubTracer) PublishMessage(msg *Message) {
Timestamp: &now, Timestamp: &now,
PublishMessage: &pb.TraceEvent_PublishMessage{ PublishMessage: &pb.TraceEvent_PublishMessage{
MessageID: []byte(t.msgID(msg.Message)), MessageID: []byte(t.msgID(msg.Message)),
Topics: msg.Message.TopicIDs, Topic: msg.Message.Topic,
}, },
} }
@ -96,7 +96,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
MessageID: []byte(t.msgID(msg.Message)), MessageID: []byte(t.msgID(msg.Message)),
ReceivedFrom: []byte(msg.ReceivedFrom), ReceivedFrom: []byte(msg.ReceivedFrom),
Reason: &reason, Reason: &reason,
Topics: msg.TopicIDs, Topic: msg.Topic,
}, },
} }
@ -126,7 +126,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) {
DuplicateMessage: &pb.TraceEvent_DuplicateMessage{ DuplicateMessage: &pb.TraceEvent_DuplicateMessage{
MessageID: []byte(t.msgID(msg.Message)), MessageID: []byte(t.msgID(msg.Message)),
ReceivedFrom: []byte(msg.ReceivedFrom), ReceivedFrom: []byte(msg.ReceivedFrom),
Topics: msg.TopicIDs, Topic: msg.Topic,
}, },
} }
@ -155,7 +155,7 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) {
Timestamp: &now, Timestamp: &now,
DeliverMessage: &pb.TraceEvent_DeliverMessage{ DeliverMessage: &pb.TraceEvent_DeliverMessage{
MessageID: []byte(t.msgID(msg.Message)), MessageID: []byte(t.msgID(msg.Message)),
Topics: msg.TopicIDs, Topic: msg.Topic,
}, },
} }
@ -292,7 +292,7 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
for _, m := range rpc.Publish { for _, m := range rpc.Publish {
msgs = append(msgs, &pb.TraceEvent_MessageMeta{ msgs = append(msgs, &pb.TraceEvent_MessageMeta{
MessageID: []byte(t.msgID(m)), MessageID: []byte(t.msgID(m)),
Topics: m.TopicIDs, Topic: m.Topic,
}) })
} }
rpcMeta.Messages = msgs rpcMeta.Messages = msgs

View File

@ -211,18 +211,14 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {
// getValidators returns all validators that apply to a given message // getValidators returns all validators that apply to a given message
func (v *validation) getValidators(msg *Message) []*topicVal { func (v *validation) getValidators(msg *Message) []*topicVal {
var vals []*topicVal topic := msg.GetTopic()
for _, topic := range msg.GetTopicIDs() { val, ok := v.topicVals[topic]
val, ok := v.topicVals[topic] if !ok {
if !ok { return nil
continue
}
vals = append(vals, val)
} }
return vals return []*topicVal{val}
} }
// validateWorker is an active goroutine performing inline validation // validateWorker is an active goroutine performing inline validation

View File

@ -4,18 +4,11 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"sync" "sync"
"testing" "testing"
"time" "time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-msgio/protoio"
) )
func TestRegisterUnregisterValidator(t *testing.T) { func TestRegisterUnregisterValidator(t *testing.T) {
@ -300,471 +293,3 @@ func TestValidateAssortedOptions(t *testing.T) {
} }
} }
} }
func TestValidateMultitopic(t *testing.T) {
// this test adds coverage for multi-topic validation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
psubs := getPubsubs(ctx, hosts[1:], WithMessageSigning(false))
for _, ps := range psubs {
err := ps.RegisterTopicValidator("test1", func(context.Context, peer.ID, *Message) bool {
return true
})
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test2", func(context.Context, peer.ID, *Message) bool {
return true
})
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test3", func(context.Context, peer.ID, *Message) bool {
return false
})
if err != nil {
t.Fatal(err)
}
}
publisher := &multiTopicPublisher{ctx: ctx, h: hosts[0]}
hosts[0].SetStreamHandler(FloodSubID, publisher.handleStream)
connectAll(t, hosts)
var subs1, subs2, subs3 []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test1")
if err != nil {
t.Fatal(err)
}
subs1 = append(subs1, sub)
sub, err = ps.Subscribe("test2")
if err != nil {
t.Fatal(err)
}
subs2 = append(subs2, sub)
sub, err = ps.Subscribe("test3")
if err != nil {
t.Fatal(err)
}
subs3 = append(subs2, sub)
}
time.Sleep(time.Second)
msg1 := "i am a walrus"
// this goes to test1 and test2, which is accepted and should be delivered
publisher.publish(msg1, "test1", "test2")
for _, sub := range subs1 {
assertReceive(t, sub, []byte(msg1))
}
for _, sub := range subs2 {
assertReceive(t, sub, []byte(msg1))
}
// this goes to test2 and test3, which is rejected by the test3 validator and should not be delivered
msg2 := "i am not a walrus"
publisher.publish(msg2, "test2", "test3")
expectNoMessage := func(sub *Subscription) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
m, err := sub.Next(ctx)
if err == nil {
t.Fatal("expected no message, but got ", string(m.Data))
}
}
for _, sub := range subs2 {
expectNoMessage(sub)
}
for _, sub := range subs3 {
expectNoMessage(sub)
}
}
func TestValidateMultitopicEx(t *testing.T) {
// this test adds coverage for multi-topic validation with extended validators
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
psubs := getPubsubs(ctx, hosts[1:], WithMessageSigning(false))
for _, ps := range psubs {
err := ps.RegisterTopicValidator("test1", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationAccept
})
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test2", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationAccept
})
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test3", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationIgnore
})
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test4", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationReject
})
if err != nil {
t.Fatal(err)
}
// this is a bogus validator that returns an invalid validation result; the system should interpret
// this as ValidationIgnore and not crash or do anything other than ignore the message
err = ps.RegisterTopicValidator("test5", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationResult(1234)
})
if err != nil {
t.Fatal(err)
}
}
publisher := &multiTopicPublisher{ctx: ctx, h: hosts[0]}
hosts[0].SetStreamHandler(FloodSubID, publisher.handleStream)
connectAll(t, hosts)
var subs1, subs2, subs3, subs4, subs5 []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test1")
if err != nil {
t.Fatal(err)
}
subs1 = append(subs1, sub)
sub, err = ps.Subscribe("test2")
if err != nil {
t.Fatal(err)
}
subs2 = append(subs2, sub)
sub, err = ps.Subscribe("test3")
if err != nil {
t.Fatal(err)
}
subs3 = append(subs3, sub)
sub, err = ps.Subscribe("test4")
if err != nil {
t.Fatal(err)
}
subs4 = append(subs4, sub)
sub, err = ps.Subscribe("test5")
if err != nil {
t.Fatal(err)
}
subs4 = append(subs5, sub)
}
time.Sleep(time.Second)
msg1 := "i am a walrus"
// this goes to test1 and test2, which is accepted and should be delivered
publisher.publish(msg1, "test1", "test2")
for _, sub := range subs1 {
assertReceive(t, sub, []byte(msg1))
}
for _, sub := range subs2 {
assertReceive(t, sub, []byte(msg1))
}
// this goes to test2 and test3, which is ignored by the test3 validator and should not be delivered
msg2 := "i am not a walrus"
publisher.publish(msg2, "test2", "test3")
expectNoMessage := func(sub *Subscription) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
m, err := sub.Next(ctx)
if err == nil {
t.Fatal("expected no message, but got ", string(m.Data))
}
}
for _, sub := range subs2 {
expectNoMessage(sub)
}
for _, sub := range subs3 {
expectNoMessage(sub)
}
// this goes to test2 and test4, which is rejected by the test4 validator and should not be delivered
publisher.publish(msg2, "test2", "test4")
for _, sub := range subs2 {
expectNoMessage(sub)
}
for _, sub := range subs4 {
expectNoMessage(sub)
}
// this goes to test3 and test4, which is rejected by the test4 validator and should not be delivered
publisher.publish(msg2, "test3", "test4")
for _, sub := range subs3 {
expectNoMessage(sub)
}
for _, sub := range subs4 {
expectNoMessage(sub)
}
// this goes to test1 and test5, which by virtue of its bogus validator should result on the message
// being ignored
publisher.publish(msg2, "test1", "test5")
for _, sub := range subs1 {
expectNoMessage(sub)
}
for _, sub := range subs5 {
expectNoMessage(sub)
}
}
func TestValidateMultitopicEx2(t *testing.T) {
// like the previous test, but with all validators inline
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 3)
psubs := getPubsubs(ctx, hosts[1:], WithMessageSigning(false))
for _, ps := range psubs {
err := ps.RegisterTopicValidator("test1", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationAccept
},
WithValidatorInline(true))
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test2", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationAccept
},
WithValidatorInline(true))
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test3", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationIgnore
},
WithValidatorInline(true))
if err != nil {
t.Fatal(err)
}
err = ps.RegisterTopicValidator("test4", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationReject
},
WithValidatorInline(true))
if err != nil {
t.Fatal(err)
}
// this is a bogus validator that returns an invalid validation result; the system should interpret
// this as ValidationIgnore and not crash or do anything other than ignore the message
err = ps.RegisterTopicValidator("test5", func(context.Context, peer.ID, *Message) ValidationResult {
return ValidationResult(1234)
},
WithValidatorInline(true))
if err != nil {
t.Fatal(err)
}
}
publisher := &multiTopicPublisher{ctx: ctx, h: hosts[0]}
hosts[0].SetStreamHandler(FloodSubID, publisher.handleStream)
connectAll(t, hosts)
var subs1, subs2, subs3, subs4, subs5 []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test1")
if err != nil {
t.Fatal(err)
}
subs1 = append(subs1, sub)
sub, err = ps.Subscribe("test2")
if err != nil {
t.Fatal(err)
}
subs2 = append(subs2, sub)
sub, err = ps.Subscribe("test3")
if err != nil {
t.Fatal(err)
}
subs3 = append(subs3, sub)
sub, err = ps.Subscribe("test4")
if err != nil {
t.Fatal(err)
}
subs4 = append(subs4, sub)
sub, err = ps.Subscribe("test5")
if err != nil {
t.Fatal(err)
}
subs4 = append(subs5, sub)
}
time.Sleep(time.Second)
msg1 := "i am a walrus"
// this goes to test1 and test2, which is accepted and should be delivered
publisher.publish(msg1, "test1", "test2")
for _, sub := range subs1 {
assertReceive(t, sub, []byte(msg1))
}
for _, sub := range subs2 {
assertReceive(t, sub, []byte(msg1))
}
// this goes to test2 and test3, which is ignored by the test3 validator and should not be delivered
msg2 := "i am not a walrus"
publisher.publish(msg2, "test2", "test3")
expectNoMessage := func(sub *Subscription) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
m, err := sub.Next(ctx)
if err == nil {
t.Fatal("expected no message, but got ", string(m.Data))
}
}
for _, sub := range subs2 {
expectNoMessage(sub)
}
for _, sub := range subs3 {
expectNoMessage(sub)
}
// this goes to test2 and test4, which is rejected by the test4 validator and should not be delivered
publisher.publish(msg2, "test2", "test4")
for _, sub := range subs2 {
expectNoMessage(sub)
}
for _, sub := range subs4 {
expectNoMessage(sub)
}
// this goes to test3 and test4, which is rejected by the test4 validator and should not be delivered
publisher.publish(msg2, "test3", "test4")
for _, sub := range subs3 {
expectNoMessage(sub)
}
for _, sub := range subs4 {
expectNoMessage(sub)
}
// this goes to test1 and test5, which by virtue of its bogus validator should result on the message
// being ignored
publisher.publish(msg2, "test1", "test5")
for _, sub := range subs1 {
expectNoMessage(sub)
}
for _, sub := range subs5 {
expectNoMessage(sub)
}
}
type multiTopicPublisher struct {
ctx context.Context
h host.Host
mx sync.Mutex
out []network.Stream
mcount int
}
func (p *multiTopicPublisher) handleStream(s network.Stream) {
defer s.Close()
os, err := p.h.NewStream(p.ctx, s.Conn().RemotePeer(), FloodSubID)
if err != nil {
panic(err)
}
p.mx.Lock()
p.out = append(p.out, os)
p.mx.Unlock()
r := protoio.NewDelimitedReader(s, 1<<20)
var rpc pb.RPC
for {
rpc.Reset()
err = r.ReadMsg(&rpc)
if err != nil {
if err != io.EOF {
s.Reset()
}
return
}
}
}
func (p *multiTopicPublisher) publish(msg string, topics ...string) {
p.mcount++
rpc := &pb.RPC{
Publish: []*pb.Message{
&pb.Message{
From: []byte(p.h.ID()),
Data: []byte(msg),
Seqno: []byte{byte(p.mcount)},
TopicIDs: topics,
},
},
}
p.mx.Lock()
defer p.mx.Unlock()
for _, os := range p.out {
w := protoio.NewDelimitedWriter(os)
err := w.WriteMsg(rpc)
if err != nil {
panic(err)
}
}
}