Skip to content

Commit c67e6d5

Browse files
committed
clientv3: call KV/Txn APIs with default gRPC call options
Signed-off-by: Gyuho Lee <[email protected]>
1 parent e82f055 commit c67e6d5

File tree

5 files changed

+29
-15
lines changed

5 files changed

+29
-15
lines changed

clientv3/integration/dial_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func TestDialForeignEndpoint(t *testing.T) {
183183

184184
// grpc can return a lazy connection that's not connected yet; confirm
185185
// that it can communicate with the cluster.
186-
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn))
186+
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0))
187187
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
188188
defer cancel()
189189
if _, gerr := kvc.Get(ctx, "abc"); gerr != nil {

clientv3/kv.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
1919

2020
"golang.org/x/net/context"
21+
"google.golang.org/grpc"
2122
)
2223

2324
type (
@@ -88,15 +89,24 @@ func (resp *TxnResponse) OpResponse() OpResponse {
8889
}
8990

9091
type kv struct {
91-
remote pb.KVClient
92+
remote pb.KVClient
93+
callOpts []grpc.CallOption
9294
}
9395

9496
func NewKV(c *Client) KV {
95-
return &kv{remote: RetryKVClient(c)}
97+
api := &kv{remote: RetryKVClient(c)}
98+
if c != nil {
99+
api.callOpts = c.callOpts
100+
}
101+
return api
96102
}
97103

98-
func NewKVFromKVClient(remote pb.KVClient) KV {
99-
return &kv{remote: remote}
104+
func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
105+
api := &kv{remote: remote}
106+
if c != nil {
107+
api.callOpts = c.callOpts
108+
}
109+
return api
100110
}
101111

102112
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
@@ -115,7 +125,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
115125
}
116126

117127
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
118-
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest())
128+
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
119129
if err != nil {
120130
return nil, toErr(ctx, err)
121131
}
@@ -124,8 +134,9 @@ func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*C
124134

125135
func (kv *kv) Txn(ctx context.Context) Txn {
126136
return &txn{
127-
kv: kv,
128-
ctx: ctx,
137+
kv: kv,
138+
ctx: ctx,
139+
callOpts: kv.callOpts,
129140
}
130141
}
131142

@@ -134,27 +145,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
134145
switch op.t {
135146
case tRange:
136147
var resp *pb.RangeResponse
137-
resp, err = kv.remote.Range(ctx, op.toRangeRequest())
148+
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
138149
if err == nil {
139150
return OpResponse{get: (*GetResponse)(resp)}, nil
140151
}
141152
case tPut:
142153
var resp *pb.PutResponse
143154
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
144-
resp, err = kv.remote.Put(ctx, r)
155+
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
145156
if err == nil {
146157
return OpResponse{put: (*PutResponse)(resp)}, nil
147158
}
148159
case tDeleteRange:
149160
var resp *pb.DeleteRangeResponse
150161
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
151-
resp, err = kv.remote.DeleteRange(ctx, r)
162+
resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
152163
if err == nil {
153164
return OpResponse{del: (*DeleteResponse)(resp)}, nil
154165
}
155166
case tTxn:
156167
var resp *pb.TxnResponse
157-
resp, err = kv.remote.Txn(ctx, op.toTxnRequest())
168+
resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
158169
if err == nil {
159170
return OpResponse{txn: (*TxnResponse)(resp)}, nil
160171
}

clientv3/txn.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2121

2222
"golang.org/x/net/context"
23+
"google.golang.org/grpc"
2324
)
2425

2526
// Txn is the interface that wraps mini-transactions.
@@ -66,6 +67,8 @@ type txn struct {
6667

6768
sus []*pb.RequestOp
6869
fas []*pb.RequestOp
70+
71+
callOpts []grpc.CallOption
6972
}
7073

7174
func (txn *txn) If(cs ...Cmp) Txn {
@@ -140,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
140143

141144
var resp *pb.TxnResponse
142145
var err error
143-
resp, err = txn.kv.remote.Txn(txn.ctx, r)
146+
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
144147
if err != nil {
145148
return nil, toErr(txn.ctx, err)
146149
}

etcdserver/api/v3client/v3client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
3232
c := clientv3.NewCtxClient(context.Background())
3333

3434
kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
35-
c.KV = clientv3.NewKVFromKVClient(kvc)
35+
c.KV = clientv3.NewKVFromKVClient(kvc, c)
3636

3737
lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
3838
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)

integration/cluster_proxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
9999
return nil, err
100100
}
101101
rpc := toGRPC(c)
102-
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
102+
c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
103103
pmu.Lock()
104104
lc := c.Lease
105105
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)

0 commit comments

Comments
 (0)