From 68342a0cb599cddbe8b60cf5b04334f26394ce20 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 27 Oct 2020 13:53:11 -0400 Subject: [PATCH] proto: remove Event.Key field The field is never used, and the value is available from the payload. --- agent/cache-types/streaming_events_test.go | 2 - agent/rpc/subscribe/subscribe.go | 4 +- agent/rpc/subscribe/subscribe_test.go | 16 -- proto/pbsubscribe/subscribe.pb.go | 166 +++++++-------------- proto/pbsubscribe/subscribe.proto | 11 +- 5 files changed, 62 insertions(+), 137 deletions(-) diff --git a/agent/cache-types/streaming_events_test.go b/agent/cache-types/streaming_events_test.go index 67d4257fee..272372754d 100644 --- a/agent/cache-types/streaming_events_test.go +++ b/agent/cache-types/streaming_events_test.go @@ -35,7 +35,6 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) return &pbsubscribe.Event{ - Key: svc, Index: index, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -114,7 +113,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs node := fmt.Sprintf("node%d", nodeNum) return &pbsubscribe.Event{ - Key: svc, Index: index, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 9fc1dc6532..563880ef5b 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -140,7 +140,7 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) } func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { - e := &pbsubscribe.Event{Key: event.Key, Index: event.Index} + e := &pbsubscribe.Event{Index: event.Index} switch { case event.IsEndOfSnapshot(): e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true} @@ -178,7 +178,7 @@ func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event { result := make([]*pbsubscribe.Event, len(events)) for i := range events { event := events[i] - result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index} + result[i] = &pbsubscribe.Event{Index: event.Index} setPayload(result[i], event.Payload) } return result diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index bf91763b09..80f02d8cf4 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -107,7 +107,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) { expected := []*pbsubscribe.Event{ { - Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -138,7 +137,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, }, { - Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -169,7 +167,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, }, { - Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, }, @@ -189,7 +186,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { event := getEvent(t, chEvents) expectedEvent := &pbsubscribe.Event{ - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -459,7 +455,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) { expected := []*pbsubscribe.Event{ { - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -490,7 +485,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, }, { - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -521,7 +515,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, }, { - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, }, @@ -541,7 +534,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { event := getEvent(t, chEvents) expectedEvent := &pbsubscribe.Event{ - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -922,11 +914,9 @@ func TestNewEventFromSteamEvent(t *testing.T) { { name: "event batch", event: stream.Event{ - Key: "web1", Index: 2002, Payload: []stream.Event{ { - Key: "web1", Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -937,7 +927,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, { - Key: "web1", Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, @@ -950,13 +939,11 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, expected: pbsubscribe.Event{ - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ Events: []*pbsubscribe.Event{ { - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -969,7 +956,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, { - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -989,7 +975,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { { name: "event payload CheckServiceNode", event: stream.Event{ - Key: "web1", Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -1000,7 +985,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, expected: pbsubscribe.Event{ - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index aae083dd92..f7e2011d99 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -6,16 +6,14 @@ package pbsubscribe import ( context "context" fmt "fmt" - io "io" - math "math" - math_bits "math/bits" - proto "github.com/golang/protobuf/proto" + pbservice "github.com/hashicorp/consul/proto/pbservice" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - - pbservice "github.com/hashicorp/consul/proto/pbservice" + io "io" + math "math" + math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used. @@ -200,14 +198,12 @@ func (m *SubscribeRequest) GetNamespace() string { // describe the current "snapshot" of the result as well as ongoing mutations to // that snapshot. type Event struct { - // Key is the logical identifier for the entity that was mutated. - Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"` // Index is the raft index at which the mutation took place. At the top // level of a subscription there will always be at most one Event per index. // If multiple events are published to the same topic in a single raft // transaction then the batch of events will be encoded inside a single // top-level event to ensure they are delivered atomically to clients. - Index uint64 `protobuf:"varint,2,opt,name=Index,proto3" json:"Index,omitempty"` + Index uint64 `protobuf:"varint,1,opt,name=Index,proto3" json:"Index,omitempty"` // Payload is the actual event content. // // Types that are valid to be assigned to Payload: @@ -261,13 +257,13 @@ type isEvent_Payload interface { } type Event_EndOfSnapshot struct { - EndOfSnapshot bool `protobuf:"varint,3,opt,name=EndOfSnapshot,proto3,oneof"` + EndOfSnapshot bool `protobuf:"varint,2,opt,name=EndOfSnapshot,proto3,oneof"` } type Event_NewSnapshotToFollow struct { - NewSnapshotToFollow bool `protobuf:"varint,4,opt,name=NewSnapshotToFollow,proto3,oneof"` + NewSnapshotToFollow bool `protobuf:"varint,3,opt,name=NewSnapshotToFollow,proto3,oneof"` } type Event_EventBatch struct { - EventBatch *EventBatch `protobuf:"bytes,5,opt,name=EventBatch,proto3,oneof"` + EventBatch *EventBatch `protobuf:"bytes,4,opt,name=EventBatch,proto3,oneof"` } type Event_ServiceHealth struct { ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"` @@ -285,13 +281,6 @@ func (m *Event) GetPayload() isEvent_Payload { return nil } -func (m *Event) GetKey() string { - if m != nil { - return m.Key - } - return "" -} - func (m *Event) GetIndex() uint64 { if m != nil { return m.Index @@ -346,17 +335,17 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if x.EndOfSnapshot { t = 1 } - _ = b.EncodeVarint(3<<3 | proto.WireVarint) + _ = b.EncodeVarint(2<<3 | proto.WireVarint) _ = b.EncodeVarint(t) case *Event_NewSnapshotToFollow: t := uint64(0) if x.NewSnapshotToFollow { t = 1 } - _ = b.EncodeVarint(4<<3 | proto.WireVarint) + _ = b.EncodeVarint(3<<3 | proto.WireVarint) _ = b.EncodeVarint(t) case *Event_EventBatch: - _ = b.EncodeVarint(5<<3 | proto.WireBytes) + _ = b.EncodeVarint(4<<3 | proto.WireBytes) if err := b.EncodeMessage(x.EventBatch); err != nil { return err } @@ -375,21 +364,21 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { m := msg.(*Event) switch tag { - case 3: // Payload.EndOfSnapshot + case 2: // Payload.EndOfSnapshot if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() m.Payload = &Event_EndOfSnapshot{x != 0} return true, err - case 4: // Payload.NewSnapshotToFollow + case 3: // Payload.NewSnapshotToFollow if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() m.Payload = &Event_NewSnapshotToFollow{x != 0} return true, err - case 5: // Payload.EventBatch + case 4: // Payload.EventBatch if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -551,41 +540,41 @@ func init() { func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) } var fileDescriptor_ab3eb8c810e315fb = []byte{ - // 542 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xf6, 0xba, 0xbf, 0x9e, 0xd0, 0xca, 0x6c, 0x8b, 0xb0, 0x5a, 0x64, 0x85, 0x08, 0x55, 0xa1, - 0x12, 0x31, 0x0a, 0x12, 0xdc, 0x40, 0x24, 0x6d, 0x09, 0x42, 0x4a, 0x90, 0xd3, 0x1e, 0xe0, 0xb6, - 0xb1, 0x87, 0xd8, 0x8a, 0xbb, 0x6b, 0xec, 0x4d, 0x43, 0xef, 0xbc, 0x03, 0x3c, 0x09, 0xcf, 0xc0, - 0x91, 0x47, 0x40, 0xe1, 0x45, 0x50, 0x36, 0x8e, 0x63, 0x27, 0xbd, 0x79, 0xbe, 0x9f, 0xdd, 0xcf, - 0xb3, 0x33, 0xf0, 0x38, 0x4e, 0x84, 0x14, 0x4e, 0x3c, 0x48, 0xc7, 0x83, 0xd4, 0x4b, 0xc2, 0x01, - 0x3a, 0xf9, 0x57, 0x43, 0x71, 0xd4, 0xc8, 0x81, 0xa3, 0xa3, 0x5c, 0x8d, 0xc9, 0x4d, 0xe8, 0xa1, - 0xc3, 0x85, 0x9f, 0xc9, 0x6a, 0xbf, 0x08, 0x98, 0xfd, 0x85, 0xd2, 0xc5, 0xaf, 0x63, 0x4c, 0x25, - 0x3d, 0x81, 0xad, 0x4b, 0x11, 0x87, 0x9e, 0x45, 0xaa, 0xa4, 0xbe, 0xdf, 0x34, 0x1b, 0xcb, 0xc3, - 0x15, 0xee, 0xce, 0x69, 0x6a, 0xc2, 0xc6, 0x07, 0xbc, 0xb5, 0xf4, 0x2a, 0xa9, 0x1b, 0xee, 0xec, - 0x93, 0x1e, 0xce, 0x9c, 0x23, 0xe4, 0xd6, 0x86, 0xc2, 0xe6, 0xc5, 0x0c, 0x7d, 0xcf, 0x7d, 0xfc, - 0x66, 0x6d, 0x56, 0x49, 0x7d, 0xd3, 0x9d, 0x17, 0xd4, 0x06, 0x38, 0x63, 0x92, 0x79, 0xc8, 0x25, - 0x26, 0xd6, 0x96, 0x32, 0x14, 0x10, 0xfa, 0x08, 0x8c, 0x2e, 0xbb, 0xc6, 0x34, 0x66, 0x1e, 0x5a, - 0xdb, 0x8a, 0x5e, 0x02, 0xb5, 0x1f, 0x3a, 0x6c, 0x9d, 0xdf, 0x20, 0x97, 0x8b, 0x14, 0xa4, 0x94, - 0x62, 0x7e, 0x9f, 0x5e, 0xbc, 0xef, 0x04, 0xf6, 0xce, 0xb9, 0xdf, 0xfb, 0xd2, 0xe7, 0x2c, 0x4e, - 0x03, 0x21, 0x55, 0xc6, 0xdd, 0x8e, 0xe6, 0x96, 0x61, 0xda, 0x84, 0x83, 0x2e, 0x4e, 0x16, 0xe5, - 0xa5, 0xb8, 0x10, 0x51, 0x24, 0x26, 0x2a, 0xfb, 0x4c, 0x7d, 0x17, 0x49, 0x5f, 0x01, 0xa8, 0x30, - 0x2d, 0x26, 0xbd, 0x40, 0xfd, 0x4b, 0xa5, 0xf9, 0xa0, 0xd0, 0xb6, 0x25, 0xd9, 0xd1, 0xdc, 0x82, - 0x94, 0x5e, 0xc0, 0x5e, 0x7f, 0xfe, 0x2a, 0x1d, 0x64, 0x91, 0x0c, 0x2c, 0x50, 0x5e, 0xbb, 0xe0, - 0x2d, 0xf1, 0x57, 0xb1, 0xcf, 0x24, 0xce, 0x42, 0x97, 0xe0, 0x96, 0x01, 0x3b, 0x1f, 0xd9, 0x6d, - 0x24, 0x98, 0x5f, 0x7b, 0x59, 0xcc, 0x42, 0xeb, 0xb0, 0xad, 0xaa, 0xd4, 0x22, 0xd5, 0x8d, 0x7a, - 0xa5, 0xf4, 0x98, 0x8a, 0x70, 0x33, 0xbe, 0xf6, 0x9d, 0xc0, 0xc1, 0x1d, 0x77, 0xd1, 0x27, 0xa0, - 0xf7, 0xe2, 0x6c, 0x14, 0x0e, 0x0b, 0xee, 0x36, 0x93, 0x2c, 0x12, 0xc3, 0x5e, 0xec, 0xea, 0xbd, - 0x98, 0xbe, 0x03, 0xb3, 0x1d, 0xa0, 0x37, 0xca, 0x4e, 0xe8, 0x0a, 0x1f, 0x55, 0xfb, 0x2b, 0xcd, - 0xe3, 0x46, 0x3e, 0x79, 0x8d, 0x55, 0x89, 0xbb, 0x66, 0x3a, 0x7d, 0x9b, 0x0d, 0x1f, 0xad, 0xc0, - 0xce, 0x15, 0x1f, 0x71, 0x31, 0xe1, 0xa6, 0x46, 0xef, 0xaf, 0xf4, 0xc9, 0x24, 0xd4, 0x82, 0xc3, - 0x12, 0xd4, 0x16, 0x9c, 0xa3, 0x27, 0x4d, 0xfd, 0xf4, 0x29, 0x18, 0x79, 0x38, 0x7a, 0x0f, 0x76, - 0x5d, 0x1c, 0x86, 0xa9, 0xc4, 0xc4, 0xd4, 0xe8, 0x3e, 0xc0, 0x19, 0x26, 0x8b, 0x9a, 0x34, 0x3f, - 0xc1, 0xc3, 0xbe, 0x64, 0x12, 0xdb, 0x01, 0xe3, 0x43, 0xcc, 0x36, 0x21, 0x96, 0xa1, 0xe0, 0xf4, - 0x35, 0x18, 0xf9, 0x66, 0xd0, 0xe3, 0xe2, 0x83, 0xac, 0xec, 0xcb, 0xd1, 0x5a, 0x4f, 0x6b, 0xda, - 0x73, 0xd2, 0x7a, 0xf3, 0x7b, 0x6a, 0x93, 0x3f, 0x53, 0x9b, 0xfc, 0x9d, 0xda, 0xe4, 0xe7, 0x3f, - 0x5b, 0xfb, 0xfc, 0x6c, 0x18, 0xca, 0x60, 0x3c, 0x68, 0x78, 0xe2, 0xda, 0x09, 0x58, 0x1a, 0x84, - 0x9e, 0x48, 0x62, 0xc7, 0x13, 0x3c, 0x1d, 0x47, 0xce, 0xda, 0x4a, 0x0f, 0xb6, 0x15, 0xf4, 0xe2, - 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0xe5, 0xc9, 0x9d, 0xee, 0x03, 0x00, 0x00, + // 536 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x5f, 0x6f, 0xd2, 0x50, + 0x14, 0xef, 0x65, 0x83, 0xad, 0x07, 0xb7, 0xd4, 0x3b, 0x8c, 0x0d, 0x33, 0x0d, 0x12, 0xb3, 0xe0, + 0x12, 0xa9, 0xc1, 0x44, 0xdf, 0x34, 0xc2, 0x36, 0x31, 0x26, 0x60, 0xca, 0xf6, 0xa0, 0x6f, 0x97, + 0xf6, 0x48, 0x1b, 0xba, 0x7b, 0x6b, 0x7b, 0x19, 0xee, 0x5d, 0xbf, 0x83, 0x9f, 0xc4, 0xcf, 0xe0, + 0xa3, 0x1f, 0xc1, 0xe0, 0x17, 0x31, 0x5c, 0x4a, 0x29, 0xb0, 0xb7, 0x9e, 0xdf, 0x9f, 0x73, 0x4f, + 0xcf, 0x1f, 0x78, 0x1c, 0xc5, 0x42, 0x0a, 0x3b, 0x1a, 0x26, 0x93, 0x61, 0xe2, 0xc6, 0xc1, 0x10, + 0xed, 0xec, 0xab, 0xa9, 0x38, 0xaa, 0x67, 0x40, 0xb5, 0x9a, 0xa9, 0x31, 0xbe, 0x09, 0x5c, 0xb4, + 0xb9, 0xf0, 0x52, 0x59, 0xfd, 0x17, 0x01, 0x63, 0xb0, 0x54, 0x3a, 0xf8, 0x75, 0x82, 0x89, 0xa4, + 0x27, 0x50, 0xbc, 0x14, 0x51, 0xe0, 0x9a, 0xa4, 0x46, 0x1a, 0x87, 0x2d, 0xa3, 0xb9, 0x4a, 0xae, + 0x70, 0x67, 0x41, 0x53, 0x03, 0x76, 0x3e, 0xe0, 0xad, 0x59, 0xa8, 0x91, 0x86, 0xee, 0xcc, 0x3f, + 0x69, 0x65, 0xee, 0x1c, 0x23, 0x37, 0x77, 0x14, 0xb6, 0x08, 0xe6, 0xe8, 0x7b, 0xee, 0xe1, 0x37, + 0x73, 0xb7, 0x46, 0x1a, 0xbb, 0xce, 0x22, 0xa0, 0x16, 0xc0, 0x19, 0x93, 0xcc, 0x45, 0x2e, 0x31, + 0x36, 0x8b, 0xca, 0x90, 0x43, 0xe8, 0x23, 0xd0, 0x7b, 0xec, 0x1a, 0x93, 0x88, 0xb9, 0x68, 0x96, + 0x14, 0xbd, 0x02, 0xea, 0x3f, 0x0a, 0x50, 0x3c, 0xbf, 0x41, 0x2e, 0x57, 0xd9, 0x49, 0x3e, 0xfb, + 0x09, 0x1c, 0x9c, 0x73, 0xaf, 0xff, 0x65, 0xc0, 0x59, 0x94, 0xf8, 0x42, 0xaa, 0x2a, 0xf7, 0xbb, + 0x9a, 0xb3, 0x0e, 0xd3, 0x16, 0x1c, 0xf5, 0x70, 0xba, 0x0c, 0x2f, 0xc5, 0x85, 0x08, 0x43, 0x31, + 0x55, 0xf5, 0xcf, 0xd5, 0x77, 0x91, 0xf4, 0x15, 0x80, 0x7a, 0xba, 0xcd, 0xa4, 0xeb, 0xab, 0x9f, + 0x2a, 0xb7, 0x1e, 0xe4, 0x9a, 0xb4, 0x22, 0xbb, 0x9a, 0x93, 0x93, 0xd2, 0x0b, 0x38, 0x18, 0x2c, + 0x66, 0xd0, 0x45, 0x16, 0x4a, 0xdf, 0x04, 0xe5, 0xb5, 0x72, 0xde, 0x35, 0xfe, 0x2a, 0xf2, 0x98, + 0xc4, 0x79, 0xd1, 0x6b, 0x70, 0x5b, 0x87, 0xbd, 0x8f, 0xec, 0x36, 0x14, 0xcc, 0xab, 0xbf, 0xcc, + 0xd7, 0x42, 0x1b, 0x50, 0x52, 0x51, 0x62, 0x92, 0xda, 0x4e, 0xa3, 0xbc, 0x36, 0x3a, 0x45, 0x38, + 0x29, 0x5f, 0xff, 0x4e, 0xe0, 0xe8, 0x8e, 0xb7, 0xe8, 0x13, 0x28, 0xf4, 0xa3, 0x74, 0xf0, 0x95, + 0x9c, 0xbb, 0xc3, 0x24, 0x0b, 0xc5, 0xa8, 0x1f, 0x39, 0x85, 0x7e, 0x44, 0xdf, 0x81, 0xd1, 0xf1, + 0xd1, 0x1d, 0xa7, 0x19, 0x7a, 0xc2, 0x43, 0xd5, 0xe0, 0x72, 0xeb, 0xb8, 0x99, 0xed, 0x59, 0x73, + 0x53, 0xe2, 0x6c, 0x99, 0x4e, 0xdf, 0xa6, 0xab, 0x46, 0xcb, 0xb0, 0x77, 0xc5, 0xc7, 0x5c, 0x4c, + 0xb9, 0xa1, 0xd1, 0xfb, 0x1b, 0x7d, 0x32, 0x08, 0x35, 0xa1, 0xb2, 0x06, 0x75, 0x04, 0xe7, 0xe8, + 0x4a, 0xa3, 0x70, 0xfa, 0x14, 0xf4, 0xac, 0x38, 0x7a, 0x0f, 0xf6, 0x1d, 0x1c, 0x05, 0x89, 0xc4, + 0xd8, 0xd0, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xcb, 0x98, 0xb4, 0x3e, 0xc1, 0xc3, 0x81, 0x64, 0x12, + 0x3b, 0x3e, 0xe3, 0x23, 0x4c, 0xf7, 0x3e, 0x92, 0x81, 0xe0, 0xf4, 0x35, 0xe8, 0xd9, 0x1d, 0xd0, + 0xe3, 0xfc, 0x40, 0x36, 0xae, 0xa3, 0xba, 0xd5, 0xd3, 0xba, 0xf6, 0x9c, 0xb4, 0xdf, 0xfc, 0x9e, + 0x59, 0xe4, 0xcf, 0xcc, 0x22, 0x7f, 0x67, 0x16, 0xf9, 0xf9, 0xcf, 0xd2, 0x3e, 0x3f, 0x1b, 0x05, + 0xd2, 0x9f, 0x0c, 0x9b, 0xae, 0xb8, 0xb6, 0x7d, 0x96, 0xf8, 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, + 0x4f, 0x26, 0xa1, 0xbd, 0x75, 0xc0, 0xc3, 0x92, 0x82, 0x5e, 0xfc, 0x0f, 0x00, 0x00, 0xff, 0xff, + 0x8f, 0x56, 0x73, 0x78, 0xdc, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -834,14 +823,7 @@ func (m *Event) MarshalToSizedBuffer(dAtA []byte) (int, error) { if m.Index != 0 { i = encodeVarintSubscribe(dAtA, i, uint64(m.Index)) i-- - dAtA[i] = 0x10 - } - if len(m.Key) > 0 { - i -= len(m.Key) - copy(dAtA[i:], m.Key) - i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key))) - i-- - dAtA[i] = 0xa + dAtA[i] = 0x8 } return len(dAtA) - i, nil } @@ -859,7 +841,7 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- - dAtA[i] = 0x18 + dAtA[i] = 0x10 return len(dAtA) - i, nil } func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) { @@ -875,7 +857,7 @@ func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, erro dAtA[i] = 0 } i-- - dAtA[i] = 0x20 + dAtA[i] = 0x18 return len(dAtA) - i, nil } func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) { @@ -894,7 +876,7 @@ func (m *Event_EventBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintSubscribe(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x2a + dAtA[i] = 0x22 } return len(dAtA) - i, nil } @@ -1054,10 +1036,6 @@ func (m *Event) Size() (n int) { } var l int _ = l - l = len(m.Key) - if l > 0 { - n += 1 + l + sovSubscribe(uint64(l)) - } if m.Index != 0 { n += 1 + sovSubscribe(uint64(m.Index)) } @@ -1405,38 +1383,6 @@ func (m *Event) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowSubscribe - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthSubscribe - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthSubscribe - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Key = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) } @@ -1455,7 +1401,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { break } } - case 3: + case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType) } @@ -1476,7 +1422,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } b := bool(v != 0) m.Payload = &Event_EndOfSnapshot{b} - case 4: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType) } @@ -1497,7 +1443,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } b := bool(v != 0) m.Payload = &Event_NewSnapshotToFollow{b} - case 5: + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType) } diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index 9d1147922c..37ae9e0206 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -85,34 +85,31 @@ message SubscribeRequest { // describe the current "snapshot" of the result as well as ongoing mutations to // that snapshot. message Event { - // Key is the logical identifier for the entity that was mutated. - string Key = 1; - // Index is the raft index at which the mutation took place. At the top // level of a subscription there will always be at most one Event per index. // If multiple events are published to the same topic in a single raft // transaction then the batch of events will be encoded inside a single // top-level event to ensure they are delivered atomically to clients. - uint64 Index = 2; + uint64 Index = 1; // Payload is the actual event content. oneof Payload { // EndOfSnapshot indicates the event stream for the initial snapshot has // ended. Subsequent Events delivered will be mutations to that result. - bool EndOfSnapshot = 3; + bool EndOfSnapshot = 2; // NewSnapshotToFollow indicates that the client view is stale. The client // must reset its view before handing any more events. Subsequent events // in the stream will be for a new snapshot until an EndOfSnapshot event // is received. - bool NewSnapshotToFollow = 4; + bool NewSnapshotToFollow = 3; // EventBatch is a set of events. This is typically used as the payload // type where multiple events are emitted in a single topic and raft // index (e.g. transactional updates). In this case the Topic and Index // values of all events will match and the whole set should be delivered // and consumed atomically. - EventBatch EventBatch = 5; + EventBatch EventBatch = 4; // ServiceHealth is used for ServiceHealth and ServiceHealthConnect // topics.