consul/state: working on service registration storage

This commit is contained in:
Ryan Uber 2015-08-22 13:21:38 -07:00 committed by James Phillips
parent 82039191a1
commit b2d9c10fad
4 changed files with 65 additions and 2 deletions

View File

@ -91,7 +91,7 @@ func servicesTableSchema() *memdb.TableSchema {
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: true,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
Field: "ServiceID", Field: "ID",
Lowercase: true, Lowercase: true,
}, },
}, },
@ -100,7 +100,7 @@ func servicesTableSchema() *memdb.TableSchema {
AllowMissing: true, AllowMissing: true,
Unique: false, Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
Field: "ServiceName", Field: "Service",
Lowercase: true, Lowercase: true,
}, },
}, },

View File

@ -98,3 +98,45 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) {
} }
return nil, nil return nil, nil
} }
// EnsureService is called to upsert creation of a given NodeService.
func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call the service registration upsert
if err := s.ensureServiceTxn(idx, svc, tx); err != nil {
return err
}
tx.Commit()
return nil
}
// ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction.
func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *memdb.Txn) error {
// Check for existing service
existing, err := tx.First("services", "id", svc.Service)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
// Populate the indexes
if existing != nil {
svc.CreateIndex = existing.(*structs.NodeService).CreateIndex
svc.ModifyIndex = idx
} else {
svc.CreateIndex = idx
svc.ModifyIndex = idx
}
// Insert the service and update the index
if err := tx.Insert("services", svc); err != nil {
return fmt.Errorf("failed inserting service: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}

View File

@ -65,3 +65,21 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
} }
func TestStateStore_EnsureService(t *testing.T) {
s := testStateStore(t)
// Create the service registration
in := &structs.NodeService{
ID: "service1",
Service: "redis",
Tags: []string{"prod"},
Address: "1.1.1.1",
Port: 1111,
}
// Service successfully registers into the state store
if err := s.EnsureService(1, in); err != nil {
t.Fatalf("err: %s", err)
}
}

View File

@ -260,7 +260,10 @@ type NodeService struct {
Address string Address string
Port int Port int
EnableTagOverride bool EnableTagOverride bool
RaftIndex
} }
type NodeServices struct { type NodeServices struct {
Node Node Node Node
Services map[string]*NodeService Services map[string]*NodeService