Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.

Commit 5cbe1f5

Browse files
Add Consul KV store based on atomic transactions
1 parent a322db4 commit 5cbe1f5

File tree

173 files changed

+38049
-4
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

173 files changed

+38049
-4
lines changed

conf/orchestrator-sample.conf.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,6 @@
142142
"GraphitePath": "",
143143
"GraphiteConvertHostnameDotsToUnderscores": true,
144144
"ConsulAddress": "",
145-
"ConsulAclToken": ""
145+
"ConsulAclToken": "",
146+
"ConsulKVStoreProvider": "consul"
146147
}

docs/kv.md

+12
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,15 @@ With `ConsulCrossDataCenterDistribution`, `orchestrator` runs an additional, per
7878
Once per minute, `orchestrator` leader node queries its configured Consul server for the list of [known datacenters](https://www.consul.io/api/catalog.html#list-datacenters). It then iterates throught those data center clusters, and updates each and every one with the current identities of masters.
7979

8080
This functionality is required in case one has more Consul datacenters than just one-local-consul-per-orchestrator-node. We illustrated above how in a `orchestrator/raft` setup, each node updates its local Consul cluster. However, Consul clusters that are not local to any `orchestrator` node are unaffected by that approach. `ConsulCrossDataCenterDistribution` is the way to include all those other DCs.
81+
82+
#### Consul Transaction support
83+
84+
Atomic [Consul Transaction](https://www.consul.io/api-docs/txn) support is enabled by configuring:
85+
86+
```json
87+
"ConsulKVStoreProvider": "consul-txn",
88+
```
89+
90+
_Note: this feature requires Consul version 0.7 or greater._
91+
92+
This will cause Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. The use of transactions reduces the number of requests to the Consul server while ensuring updates of several KVs are atomic.

go/config/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ type Configuration struct {
265265
ConsulScheme string // Scheme (http or https) for Consul
266266
ConsulAclToken string // ACL token used to write to Consul KV
267267
ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs
268+
ConsulKVStoreProvider string // Consul KV store provider (consul or consul-txn), default: "consul"
268269
ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c
269270
KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master"
270271
WebMessage string // If provided, will be shown on all web pages below the title bar
@@ -432,6 +433,7 @@ func newConfiguration() *Configuration {
432433
ConsulScheme: "http",
433434
ConsulAclToken: "",
434435
ConsulCrossDataCenterDistribution: false,
436+
ConsulKVStoreProvider: "consul",
435437
ZkAddress: "",
436438
KVClusterMasterPrefix: "mysql/master",
437439
WebMessage: "",

go/kv/consul.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ import (
3131
"github.com/openark/golib/log"
3232
)
3333

34+
// getConsulKVCacheKey returns a Consul KV cache key for a given datacenter
35+
func getConsulKVCacheKey(dc, key string) string {
36+
return fmt.Sprintf("%s;%s", dc, key)
37+
}
38+
3439
// A Consul store based on config's `ConsulAddress`, `ConsulScheme`, and `ConsulKVPrefix`
3540
type consulStore struct {
3641
client *consulapi.Client
@@ -135,7 +140,7 @@ func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
135140

136141
for _, consulPair := range consulPairs {
137142
val := string(consulPair.Value)
138-
kcCacheKey := fmt.Sprintf("%s;%s", datacenter, consulPair.Key)
143+
kcCacheKey := getConsulKVCacheKey(datacenter, consulPair.Key)
139144

140145
if value, found := this.kvCache.Get(kcCacheKey); found && val == value {
141146
skipped++

go/kv/consul_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package kv
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io/ioutil"
7+
"net/http"
8+
"net/http/httptest"
9+
"reflect"
10+
"sort"
11+
"strings"
12+
"testing"
13+
14+
consulapi "github.com/hashicorp/consul/api"
15+
)
16+
17+
const consulTestDefaultDatacenter = "dc1"
18+
19+
type consulTestServerOp struct {
20+
Method string
21+
URL string
22+
Request interface{}
23+
Response interface{}
24+
ResponseCode int
25+
}
26+
27+
// sortTxnKVOps sort TxnOps by op.KV.Key to resolve random test failures
28+
func sortTxnKVOps(txnOps []*consulapi.TxnOp) []*consulapi.TxnOp {
29+
sort.Slice(txnOps, func(a, b int) bool {
30+
return txnOps[a].KV.Key < txnOps[b].KV.Key
31+
})
32+
return txnOps
33+
}
34+
35+
func buildConsulTestServer(t *testing.T, testOps []consulTestServerOp) *httptest.Server {
36+
handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
37+
requestBytes, _ := ioutil.ReadAll(r.Body)
38+
requestBody := strings.TrimSpace(string(requestBytes))
39+
40+
for _, testOp := range testOps {
41+
if r.Method != testOp.Method || r.URL.String() != testOp.URL {
42+
continue
43+
}
44+
if testOp.ResponseCode == 0 {
45+
testOp.ResponseCode = http.StatusOK
46+
}
47+
if strings.HasPrefix(r.URL.String(), "/v1/kv") && testOp.Response != nil {
48+
w.WriteHeader(testOp.ResponseCode)
49+
json.NewEncoder(w).Encode(testOp.Response)
50+
return
51+
} else if strings.HasPrefix(r.URL.String(), "/v1/txn") {
52+
var txnOps consulapi.TxnOps
53+
if err := json.Unmarshal(requestBytes, &txnOps); err != nil {
54+
t.Fatalf("Unable to unmarshal json request body: %v", err)
55+
continue
56+
}
57+
testOpRequest := sortTxnKVOps(testOp.Request.(consulapi.TxnOps))
58+
if testOp.Response != nil && reflect.DeepEqual(testOpRequest, sortTxnKVOps(txnOps)) {
59+
w.WriteHeader(testOp.ResponseCode)
60+
json.NewEncoder(w).Encode(testOp.Response)
61+
return
62+
}
63+
}
64+
}
65+
66+
t.Fatalf("No requests matched setup. Got method %s, Path %s, body %s", r.Method, r.URL.String(), requestBody)
67+
w.WriteHeader(http.StatusNotFound)
68+
_, _ = fmt.Fprintln(w, "")
69+
})
70+
return httptest.NewServer(handlerFunc)
71+
}

go/kv/consul_txn.go

+234
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
Copyright 2020 Shlomi Noach, GitHub Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kv
18+
19+
import (
20+
"crypto/tls"
21+
"fmt"
22+
"net/http"
23+
"sync"
24+
"sync/atomic"
25+
26+
"github.com/openark/orchestrator/go/config"
27+
28+
consulapi "github.com/hashicorp/consul/api"
29+
"github.com/patrickmn/go-cache"
30+
31+
"github.com/openark/golib/log"
32+
)
33+
34+
// A Consul store based on config's `ConsulAddress`, `ConsulScheme`, and `ConsulKVPrefix`
35+
type consulTxnStore struct {
36+
client *consulapi.Client
37+
kvCache *cache.Cache
38+
pairsDistributionSuccessMutex sync.Mutex
39+
distributionReentry int64
40+
}
41+
42+
// NewConsulTxnStore creates a new consul store that uses Consul Transactions to read/write multiple KVPairs.
43+
// It is possible that the client for this store is nil, which is the case if no consul config is provided
44+
func NewConsulTxnStore() KVStore {
45+
store := &consulTxnStore{
46+
kvCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
47+
}
48+
49+
if config.Config.ConsulAddress != "" {
50+
consulConfig := consulapi.DefaultConfig()
51+
consulConfig.Address = config.Config.ConsulAddress
52+
consulConfig.Scheme = config.Config.ConsulScheme
53+
if config.Config.ConsulScheme == "https" {
54+
consulConfig.HttpClient = &http.Client{
55+
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
56+
}
57+
}
58+
// ConsulAclToken defaults to ""
59+
consulConfig.Token = config.Config.ConsulAclToken
60+
if client, err := consulapi.NewClient(consulConfig); err != nil {
61+
log.Errore(err)
62+
} else {
63+
store.client = client
64+
}
65+
}
66+
return store
67+
}
68+
69+
// doWriteTxn performs one or many of write operations using a Consul Transaction and handles any client/server
70+
// or transaction-level errors. Updates are all-or-nothing - all operations are rolled-back on any txn error
71+
func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *consulapi.QueryOptions) (err error) {
72+
ok, resp, _, err := this.client.Txn().Txn(txnOps, queryOptions)
73+
if err != nil {
74+
return err
75+
} else if !ok {
76+
// return the first transaction error found
77+
for _, txnErr := range resp.Errors {
78+
if txnErr.What != "" {
79+
return fmt.Errorf("consul txn error: %v", txnErr.What)
80+
}
81+
}
82+
}
83+
return err
84+
}
85+
86+
func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair) (skipped, existing, written, failed int, err error) {
87+
defer wg.Done()
88+
89+
queryOptions := &consulapi.QueryOptions{Datacenter: dc}
90+
kcCacheKeys := make([]string, 0)
91+
92+
// get the current key-values in a single transaction
93+
var getTxnOps consulapi.TxnOps
94+
var possibleSetKVPairs []*consulapi.KVPair
95+
for _, kvPair := range kvPairs {
96+
val := string(kvPair.Value)
97+
kcCacheKey := getConsulKVCacheKey(dc, kvPair.Key)
98+
kcCacheKeys = append(kcCacheKeys, kcCacheKey)
99+
if value, found := this.kvCache.Get(kcCacheKey); found && val == value {
100+
skipped++
101+
continue
102+
}
103+
getTxnOps = append(getTxnOps, &consulapi.TxnOp{
104+
KV: &consulapi.KVTxnOp{
105+
Verb: consulapi.KVGet,
106+
Key: kvPair.Key,
107+
},
108+
})
109+
possibleSetKVPairs = append(possibleSetKVPairs, kvPair)
110+
}
111+
_, getTxnResp, _, e := this.client.Txn().Txn(getTxnOps, queryOptions)
112+
if err != nil {
113+
err = e
114+
}
115+
116+
// find key-value pairs that need updating, add pairs that need updating to set transaction
117+
var setTxnOps consulapi.TxnOps
118+
for _, pair := range possibleSetKVPairs {
119+
var kvExists bool
120+
for _, result := range getTxnResp.Results {
121+
if pair.Key == result.KV.Key && string(pair.Value) == string(result.KV.Value) {
122+
existing++
123+
kvExists = true
124+
this.kvCache.SetDefault(getConsulKVCacheKey(dc, pair.Key), string(pair.Value))
125+
break
126+
}
127+
}
128+
if !kvExists {
129+
setTxnOps = append(setTxnOps, &consulapi.TxnOp{
130+
KV: &consulapi.KVTxnOp{
131+
Verb: consulapi.KVSet,
132+
Key: pair.Key,
133+
Value: pair.Value,
134+
},
135+
})
136+
}
137+
}
138+
139+
// update key-value pairs in a single Consul Transaction
140+
if len(setTxnOps) > 0 {
141+
if e := this.doWriteTxn(setTxnOps, queryOptions); e != nil {
142+
log.Errorf("consulTxnStore.DistributePairs(): failed %v", kcCacheKeys)
143+
failed = len(setTxnOps)
144+
err = e
145+
} else {
146+
for _, txnOp := range setTxnOps {
147+
this.kvCache.SetDefault(getConsulKVCacheKey(dc, txnOp.KV.Key), string(txnOp.KV.Value))
148+
written++
149+
}
150+
}
151+
}
152+
153+
return skipped, existing, written, failed, err
154+
}
155+
156+
// GetKeyValue returns the value of a Consul KV if it exists
157+
func (this *consulTxnStore) GetKeyValue(key string) (value string, found bool, err error) {
158+
if this.client == nil {
159+
return value, found, nil
160+
}
161+
pair, _, err := this.client.KV().Get(key, nil)
162+
if err != nil {
163+
return value, found, err
164+
}
165+
return string(pair.Value), (pair != nil), nil
166+
}
167+
168+
// PutKeyValue performs a Consul KV put operation for a key/value
169+
func (this *consulTxnStore) PutKeyValue(key string, value string) (err error) {
170+
if this.client == nil {
171+
return nil
172+
}
173+
pair := &consulapi.KVPair{Key: key, Value: []byte(value)}
174+
_, err = this.client.KV().Put(pair, nil)
175+
return err
176+
}
177+
178+
// PutKVPairs updates one or more KV pairs in a single, atomic Consul operation.
179+
// If a single KV pair is provided PutKeyValue is used to update the pair
180+
func (this *consulTxnStore) PutKVPairs(kvPairs []*KVPair) (err error) {
181+
if this.client == nil {
182+
return nil
183+
}
184+
// use .PutKeyValue for single KVPair puts
185+
if len(kvPairs) == 1 {
186+
return this.PutKeyValue(kvPairs[0].Key, kvPairs[0].Value)
187+
}
188+
var txnOps consulapi.TxnOps
189+
for _, pair := range kvPairs {
190+
txnOps = append(txnOps, &consulapi.TxnOp{
191+
KV: &consulapi.KVTxnOp{
192+
Verb: consulapi.KVSet,
193+
Key: pair.Key,
194+
Value: []byte(pair.Value),
195+
},
196+
})
197+
}
198+
return this.doWriteTxn(txnOps, nil)
199+
}
200+
201+
// DistributePairs updates all known Consul Datacenters with one or more KV pairs
202+
func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
203+
// This function is non re-entrant (it can only be running once at any point in time)
204+
if atomic.CompareAndSwapInt64(&this.distributionReentry, 0, 1) {
205+
defer atomic.StoreInt64(&this.distributionReentry, 0)
206+
} else {
207+
return
208+
}
209+
210+
if !config.Config.ConsulCrossDataCenterDistribution {
211+
return nil
212+
}
213+
214+
datacenters, err := this.client.Catalog().Datacenters()
215+
if err != nil {
216+
return err
217+
}
218+
log.Debugf("consulTxnStore.DistributePairs(): distributing %d pairs to %d datacenters", len(kvPairs), len(datacenters))
219+
consulPairs := []*consulapi.KVPair{}
220+
for _, kvPair := range kvPairs {
221+
consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)})
222+
}
223+
var wg sync.WaitGroup
224+
for _, datacenter := range datacenters {
225+
var skipped, existing, written, failed int
226+
datacenter := datacenter
227+
228+
wg.Add(1)
229+
skipped, existing, written, failed, err = this.updateDatacenterKVPairs(&wg, datacenter, consulPairs)
230+
log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; skipped: %d, existing: %d, written: %d, failed: %d", datacenter, skipped, existing, written, failed)
231+
}
232+
wg.Wait()
233+
return err
234+
}

0 commit comments

Comments
 (0)