diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index e88330c9cd..5aebcca6ee 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -64,11 +64,17 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { event := stream.Event{ Index: idx, Topic: topic, - Payload: EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &n, - }, } + payload := EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, + } + + if connect && n.Service.Kind == structs.ServiceKindConnectProxy { + payload.key = n.Service.Proxy.DestinationServiceName + } + + event.Payload = payload // append each event as a separate item so that they can be serialized // separately, to prevent the encoding of one massive message. diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 1d1c0f2f35..b9ef8eadea 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -70,6 +70,70 @@ func TestServiceHealthSnapshot(t *testing.T) { assertDeepEqual(t, expected, buf.events, cmpEvents) } +func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { + store := NewStateStore(nil) + + counter := newIndexCounter() + err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regSidecar)) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2)) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar)) + require.NoError(t, err) + + fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect) + buf := &snapshotAppender{} + req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect} + + idx, err := fn(req, buf) + require.NoError(t, err) + require.Equal(t, counter.Last(), idx) + + expected := [][]stream.Event{ + { + testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error { + e.Index = counter.Last() + ep := e.Payload.(EventPayloadCheckServiceNode) + ep.key = "web" + e.Payload = ep + csn := ep.Value + csn.Node.CreateIndex = 1 + csn.Node.ModifyIndex = 1 + csn.Service.CreateIndex = 3 + csn.Service.ModifyIndex = 3 + csn.Checks[0].CreateIndex = 1 + csn.Checks[0].ModifyIndex = 1 + csn.Checks[1].CreateIndex = 3 + csn.Checks[1].ModifyIndex = 3 + return nil + }), + }, + { + testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error { + e.Index = counter.Last() + ep := e.Payload.(EventPayloadCheckServiceNode) + ep.key = "web" + e.Payload = ep + csn := ep.Value + csn.Node.CreateIndex = 4 + csn.Node.ModifyIndex = 4 + csn.Service.CreateIndex = 5 + csn.Service.ModifyIndex = 5 + csn.Checks[0].CreateIndex = 4 + csn.Checks[0].ModifyIndex = 4 + csn.Checks[1].CreateIndex = 5 + csn.Checks[1].ModifyIndex = 5 + return nil + }), + }, + } + assertDeepEqual(t, expected, buf.events, cmpEvents) +} + type snapshotAppender struct { events [][]stream.Event }