Skip to content

Commit e4bbcf1

Browse files
committed
stream: added is_sync flag for begin/commit
1 parent 55abba9 commit e4bbcf1

File tree

5 files changed

+188
-2
lines changed

5 files changed

+188
-2
lines changed

internal/request/begin_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package request
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"github.com/stretchr/testify/require"
7+
"github.com/tarantool/go-tarantool/v2"
8+
"github.com/vmihailenco/msgpack/v5"
9+
"testing"
10+
"time"
11+
12+
"github.com/kr/pretty"
13+
)
14+
15+
/*
16+
example file:
17+
18+
BEGIN {
19+
"Timeout": 0.123,
20+
"TxnIsolation": 1,
21+
"IsSync": true,
22+
}
23+
EXPECTED "8356CB3F847AE147AE147B590161C3"
24+
25+
COMMIT {
26+
"
27+
}
28+
*/
29+
30+
func CompareEncodeDecode(t *testing.T, expectedMsgpack []byte, gotMsgpack []byte) bool {
31+
t.Helper()
32+
33+
tarantool.NewBeginRequest()
34+
35+
return bytes.Equal(expectedMsgpack, gotMsgpack)
36+
}
37+
38+
type dummySchemaResolver struct{}
39+
40+
func (d dummySchemaResolver) ResolveSpace(interface{}) (uint32, error) { panic("unexpected") }
41+
func (d dummySchemaResolver) ResolveIndex(interface{}, uint32) (uint32, error) { panic("unexpected") }
42+
func (d dummySchemaResolver) NamesUseSupported() bool { panic("unexpected") }
43+
44+
func TestBegin(t *testing.T) {
45+
var b bytes.Buffer
46+
encoder := msgpack.NewEncoder(&b)
47+
48+
req := tarantool.NewBeginRequest().TxnIsolation(1).IsSync(true).Timeout(10 * time.Millisecond)
49+
err := req.Body(dummySchemaResolver{}, encoder)
50+
require.NoError(t, err)
51+
52+
dumpBinary(t, b.Bytes())
53+
54+
var abstractRequest map[int]interface{}
55+
56+
err = msgpack.NewDecoder(&b).Decode(&abstractRequest)
57+
require.NoError(t, err)
58+
59+
pretty.Println(abstractRequest)
60+
}
61+
62+
func dumpBinary(t *testing.T, b []byte) {
63+
t.Helper()
64+
65+
for _, v := range b {
66+
fmt.Printf("%02X", v)
67+
}
68+
}

internal/utils/optional.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package utils
2+
3+
type Optional[T any] struct {
4+
isSet bool
5+
value T
6+
}
7+
8+
func (o Optional[T]) Value() (T, bool) {
9+
return o.value, o.isSet
10+
}
11+
12+
func (o Optional[T]) HasValue() bool {
13+
return o.isSet
14+
}
15+
16+
func Some[T any](val T) Optional[T] {
17+
return Optional[T]{value: val, isSet: true}
18+
}
19+
20+
func None[T any]() Optional[T] {
21+
return Optional[T]{} // IsSet is false by default
22+
}

stream.go

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"context"
55
"errors"
6+
"github.com/tarantool/go-tarantool/v2/internal/utils"
67
"time"
78

89
"github.com/tarantool/go-iproto"
@@ -42,6 +43,7 @@ type BeginRequest struct {
4243
baseRequest
4344
txnIsolation TxnIsolationLevel
4445
timeout time.Duration
46+
isSync utils.Optional[bool]
4547
}
4648

4749
// NewBeginRequest returns a new BeginRequest.
@@ -59,12 +61,18 @@ func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequ
5961
return req
6062
}
6163

62-
// WithTimeout allows to set up a timeout for call BeginRequest.
64+
// Timeout allows to set up a timeout for call BeginRequest.
6365
func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest {
6466
req.timeout = timeout
6567
return req
6668
}
6769

70+
// IsSync allows to set up a IsSync flag for call BeginRequest.
71+
func (req *BeginRequest) IsSync(isSync bool) *BeginRequest {
72+
req.isSync = utils.Some(isSync)
73+
return req
74+
}
75+
6876
// Body fills an msgpack.Encoder with the begin request body.
6977
func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
7078
var (
@@ -81,6 +89,10 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
8189
mapLen++
8290
}
8391

92+
if req.isSync.HasValue() {
93+
mapLen++
94+
}
95+
8496
err := enc.EncodeMapLen(mapLen)
8597
if err != nil {
8698
return err
@@ -110,6 +122,18 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
110122
}
111123
}
112124

125+
if val, ok := req.isSync.Value(); ok {
126+
err = enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC))
127+
if err != nil {
128+
return err
129+
}
130+
131+
err = enc.EncodeBool(val)
132+
if err != nil {
133+
return err
134+
}
135+
}
136+
113137
return nil
114138
}
115139

@@ -129,6 +153,8 @@ func (req *BeginRequest) Context(ctx context.Context) *BeginRequest {
129153
// Commit request can not be processed out of stream.
130154
type CommitRequest struct {
131155
baseRequest
156+
157+
isSync utils.Optional[bool]
132158
}
133159

134160
// NewCommitRequest returns a new CommitRequest.
@@ -138,9 +164,37 @@ func NewCommitRequest() *CommitRequest {
138164
return req
139165
}
140166

167+
// IsSync allows to set up a IsSync flag for call BeginRequest.
168+
func (req *CommitRequest) IsSync(isSync bool) *CommitRequest {
169+
req.isSync = utils.Some(isSync)
170+
return req
171+
}
172+
141173
// Body fills an msgpack.Encoder with the commit request body.
142174
func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
143-
return enc.EncodeMapLen(0)
175+
var (
176+
mapLen = 0
177+
)
178+
179+
if req.isSync.HasValue() {
180+
mapLen++
181+
}
182+
183+
if err := enc.EncodeMapLen(mapLen); err != nil {
184+
return err
185+
}
186+
187+
if val, ok := req.isSync.Value(); ok {
188+
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
189+
return err
190+
}
191+
192+
if err := enc.EncodeBool(val); err != nil {
193+
return err
194+
}
195+
}
196+
197+
return nil
144198
}
145199

146200
// Context sets a passed context to the request.

tarantool_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4134,6 +4134,40 @@ func TestFdDialer(t *testing.T) {
41344134
require.Equal(t, int8(0), resp[0])
41354135
}
41364136

4137+
func TestIsSync(t *testing.T) {
4138+
test_helpers.SkipIfIsSyncUnsupported(t)
4139+
4140+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4141+
defer conn.Close()
4142+
4143+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
4144+
defer cancel()
4145+
4146+
stream, err := conn.NewStream()
4147+
require.NoError(t, err)
4148+
4149+
requests := []Request{
4150+
NewBeginRequest().IsSync(true),
4151+
NewInsertRequest("test").Tuple([]interface{}{1, "test"}),
4152+
NewCommitRequest().IsSync(true),
4153+
//NewBeginRequest(),
4154+
//NewInsertRequest("test").Tuple([]interface{}{1, "test"}),
4155+
//NewCommitRequest(),
4156+
}
4157+
4158+
for _, req := range requests {
4159+
status := stream.Do(req)
4160+
4161+
select {
4162+
case <-status.WaitChan():
4163+
_, err := status.Get()
4164+
require.NoError(t, err)
4165+
case <-ctx.Done():
4166+
require.Fail(t, "request failed", ctx.Err().Error())
4167+
}
4168+
}
4169+
}
4170+
41374171
// runTestMain is a body of TestMain function
41384172
// (see https://pkg.go.dev/testing#hdr-Main).
41394173
// Using defer + os.Exit is not works so TestMain body

test_helpers/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ func SkipIfCrudSpliceBroken(t *testing.T) {
217217
SkipIfFeatureUnsupported(t, "crud update splice", 2, 0, 0)
218218
}
219219

220+
// SkipIfIsSyncUnsupported skips test run if Tarantool without
221+
// IS_SYNC support is used.
222+
func SkipIfIsSyncUnsupported(t *testing.T) {
223+
t.Helper()
224+
225+
SkipIfFeatureUnsupported(t, "is sync", 3, 1, 0)
226+
}
227+
220228
// IsTcsSupported checks if Tarantool supports centralized storage.
221229
// Tarantool supports centralized storage with Enterprise since 3.3.0 version.
222230
func IsTcsSupported() (bool, error) {

0 commit comments

Comments
 (0)