Skip to content

Commit 513ae48

Browse files
committed
stream: added is_sync flag for begin/commit
1 parent 7819266 commit 513ae48

File tree

4 files changed

+130
-2
lines changed

4 files changed

+130
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
- Implemented all box.schema.user operations requests and sugar interface (#426).
1414
- Implemented box.session.su request and sugar interface only for current session granting (#426).
15+
- Implemented support for `IPROTO_IS_SYNC` flag in stream transactions,
16+
added `IsSync(bool)` method for `BeginRequest`/`CommitRequest` (#447).
1517

1618
### Changed
1719

stream.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type BeginRequest struct {
4242
baseRequest
4343
txnIsolation TxnIsolationLevel
4444
timeout time.Duration
45+
isSync *bool
4546
}
4647

4748
// NewBeginRequest returns a new BeginRequest.
@@ -59,12 +60,18 @@ func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequ
5960
return req
6061
}
6162

62-
// WithTimeout allows to set up a timeout for call BeginRequest.
63+
// Timeout allows to set up a timeout for call BeginRequest.
6364
func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest {
6465
req.timeout = timeout
6566
return req
6667
}
6768

69+
// IsSync allows to set up a IsSync flag for call BeginRequest.
70+
func (req *BeginRequest) IsSync(isSync bool) *BeginRequest {
71+
req.isSync = &isSync
72+
return req
73+
}
74+
6875
// Body fills an msgpack.Encoder with the begin request body.
6976
func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
7077
var (
@@ -81,6 +88,10 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
8188
mapLen++
8289
}
8390

91+
if req.isSync != nil {
92+
mapLen++
93+
}
94+
8495
err := enc.EncodeMapLen(mapLen)
8596
if err != nil {
8697
return err
@@ -110,6 +121,18 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
110121
}
111122
}
112123

124+
if req.isSync != nil {
125+
err = enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC))
126+
if err != nil {
127+
return err
128+
}
129+
130+
err = enc.EncodeBool(*req.isSync)
131+
if err != nil {
132+
return err
133+
}
134+
}
135+
113136
return nil
114137
}
115138

@@ -129,6 +152,8 @@ func (req *BeginRequest) Context(ctx context.Context) *BeginRequest {
129152
// Commit request can not be processed out of stream.
130153
type CommitRequest struct {
131154
baseRequest
155+
156+
isSync *bool
132157
}
133158

134159
// NewCommitRequest returns a new CommitRequest.
@@ -138,9 +163,37 @@ func NewCommitRequest() *CommitRequest {
138163
return req
139164
}
140165

166+
// IsSync allows to set up a IsSync flag for call BeginRequest.
167+
func (req *CommitRequest) IsSync(isSync bool) *CommitRequest {
168+
req.isSync = &isSync
169+
return req
170+
}
171+
141172
// Body fills an msgpack.Encoder with the commit request body.
142173
func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
143-
return enc.EncodeMapLen(0)
174+
var (
175+
mapLen = 0
176+
)
177+
178+
if req.isSync != nil {
179+
mapLen++
180+
}
181+
182+
if err := enc.EncodeMapLen(mapLen); err != nil {
183+
return err
184+
}
185+
186+
if req.isSync != nil {
187+
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
188+
return err
189+
}
190+
191+
if err := enc.EncodeBool(*req.isSync); err != nil {
192+
return err
193+
}
194+
}
195+
196+
return nil
144197
}
145198

146199
// Context sets a passed context to the request.

tarantool_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4200,6 +4200,71 @@ func TestFdDialer(t *testing.T) {
42004200
require.Equal(t, int8(0), resp[0])
42014201
}
42024202

4203+
func TestDoBeginRequest_IsSync(t *testing.T) {
4204+
test_helpers.SkipIfIsSyncUnsupported(t)
4205+
4206+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4207+
defer conn.Close()
4208+
4209+
stream, err := conn.NewStream()
4210+
require.NoError(t, err)
4211+
4212+
_, err = stream.Do(NewBeginRequest().IsSync(true)).Get()
4213+
assert.Nil(t, err)
4214+
4215+
_, err = stream.Do(
4216+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4217+
).Get()
4218+
require.Nil(t, err)
4219+
4220+
_, err = stream.Do(NewCommitRequest()).Get()
4221+
require.NotNil(t, err)
4222+
assert.Contains(t, err.Error(), "The synchronous transaction queue doesn't belong to any instance")
4223+
}
4224+
4225+
func TestDoCommitRequest_IsSync(t *testing.T) {
4226+
test_helpers.SkipIfIsSyncUnsupported(t)
4227+
4228+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4229+
defer conn.Close()
4230+
4231+
stream, err := conn.NewStream()
4232+
require.NoError(t, err)
4233+
4234+
_, err = stream.Do(NewBeginRequest()).Get()
4235+
require.Nil(t, err)
4236+
4237+
_, err = stream.Do(
4238+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4239+
).Get()
4240+
require.Nil(t, err)
4241+
4242+
_, err = stream.Do(NewCommitRequest().IsSync(true)).Get()
4243+
require.NotNil(t, err)
4244+
assert.Contains(t, err.Error(), "The synchronous transaction queue doesn't belong to any instance")
4245+
}
4246+
4247+
func TestDoCommitRequest_NoSync(t *testing.T) {
4248+
test_helpers.SkipIfIsSyncUnsupported(t)
4249+
4250+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4251+
defer conn.Close()
4252+
4253+
stream, err := conn.NewStream()
4254+
require.NoError(t, err)
4255+
4256+
_, err = stream.Do(NewBeginRequest()).Get()
4257+
require.Nil(t, err)
4258+
4259+
_, err = stream.Do(
4260+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4261+
).Get()
4262+
require.Nil(t, err)
4263+
4264+
_, err = stream.Do(NewCommitRequest()).Get()
4265+
assert.Nil(t, err)
4266+
}
4267+
42034268
// runTestMain is a body of TestMain function
42044269
// (see https://pkg.go.dev/testing#hdr-Main).
42054270
// Using defer + os.Exit is not works so TestMain body

test_helpers/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ func SkipIfCrudSpliceBroken(t *testing.T) {
217217
SkipIfFeatureUnsupported(t, "crud update splice", 2, 0, 0)
218218
}
219219

220+
// SkipIfIsSyncUnsupported skips test run if Tarantool without
221+
// IS_SYNC support is used.
222+
func SkipIfIsSyncUnsupported(t *testing.T) {
223+
t.Helper()
224+
225+
SkipIfFeatureUnsupported(t, "is sync", 3, 1, 0)
226+
}
227+
220228
// IsTcsSupported checks if Tarantool supports centralized storage.
221229
// Tarantool supports centralized storage with Enterprise since 3.3.0 version.
222230
func IsTcsSupported() (bool, error) {

0 commit comments

Comments
 (0)