From b2d9c10fad422c950d55f796ae590b7bd3dd5c14 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Sat, 22 Aug 2015 13:21:38 -0700 Subject: [PATCH] consul/state: working on service registration storage --- consul/state/schema.go | 4 +-- consul/state/state_store.go | 42 ++++++++++++++++++++++++++++++++ consul/state/state_store_test.go | 18 ++++++++++++++ consul/structs/structs.go | 3 +++ 4 files changed, 65 insertions(+), 2 deletions(-) diff --git a/consul/state/schema.go b/consul/state/schema.go index 9aff0ecdb8..5918793659 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -91,7 +91,7 @@ func servicesTableSchema() *memdb.TableSchema { AllowMissing: false, Unique: true, Indexer: &memdb.StringFieldIndex{ - Field: "ServiceID", + Field: "ID", Lowercase: true, }, }, @@ -100,7 +100,7 @@ func servicesTableSchema() *memdb.TableSchema { AllowMissing: true, Unique: false, Indexer: &memdb.StringFieldIndex{ - Field: "ServiceName", + Field: "Service", Lowercase: true, }, }, diff --git a/consul/state/state_store.go b/consul/state/state_store.go index b841dbd854..1369d732bd 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -98,3 +98,45 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) { } return nil, nil } + +// EnsureService is called to upsert creation of a given NodeService. +func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Call the service registration upsert + if err := s.ensureServiceTxn(idx, svc, tx); err != nil { + return err + } + + tx.Commit() + return nil +} + +// ensureServiceTxn is used to upsert a service registration within an +// existing memdb transaction. +func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *memdb.Txn) error { + // Check for existing service + existing, err := tx.First("services", "id", svc.Service) + if err != nil { + return fmt.Errorf("failed service lookup: %s", err) + } + + // Populate the indexes + if existing != nil { + svc.CreateIndex = existing.(*structs.NodeService).CreateIndex + svc.ModifyIndex = idx + } else { + svc.CreateIndex = idx + svc.ModifyIndex = idx + } + + // Insert the service and update the index + if err := tx.Insert("services", svc); err != nil { + return fmt.Errorf("failed inserting service: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + return nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 34e82b0b95..b1ff856175 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -65,3 +65,21 @@ func TestStateStore_EnsureNode(t *testing.T) { t.Fatalf("bad: %#v", out) } } + +func TestStateStore_EnsureService(t *testing.T) { + s := testStateStore(t) + + // Create the service registration + in := &structs.NodeService{ + ID: "service1", + Service: "redis", + Tags: []string{"prod"}, + Address: "1.1.1.1", + Port: 1111, + } + + // Service successfully registers into the state store + if err := s.EnsureService(1, in); err != nil { + t.Fatalf("err: %s", err) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 880ce175b4..9fb3f50b61 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -260,7 +260,10 @@ type NodeService struct { Address string Port int EnableTagOverride bool + + RaftIndex } + type NodeServices struct { Node Node Services map[string]*NodeService