Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 4628cd1

Browse files
authored
feat: commit begins transaction if not already begun (#1143)
Fixes #689
1 parent fb5edf1 commit 4628cd1

File tree

2 files changed

+112
-62
lines changed

2 files changed

+112
-62
lines changed

google/cloud/spanner/internal/connection_impl.cc

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "google/cloud/spanner/internal/partial_result_set_resume.h"
1717
#include "google/cloud/spanner/internal/partial_result_set_source.h"
1818
#include "google/cloud/spanner/internal/retry_loop.h"
19+
#include "google/cloud/spanner/internal/status_utils.h"
1920
#include "google/cloud/spanner/internal/time.h"
2021
#include "google/cloud/spanner/query_partition.h"
2122
#include "google/cloud/spanner/read_partition.h"
@@ -723,20 +724,33 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
723724
for (auto&& m : params.mutations) {
724725
*request.add_mutations() = std::move(m).as_proto();
725726
}
726-
bool is_idempotent = false;
727-
if (s.has_single_use()) {
728-
*request.mutable_single_use_transaction() = s.single_use();
729-
} else if (s.has_begin()) {
730-
*request.mutable_single_use_transaction() = s.begin();
731-
} else {
732-
request.set_transaction_id(s.id());
733-
is_idempotent = true;
727+
728+
if (s.selector_case() != spanner_proto::TransactionSelector::kId) {
729+
spanner_proto::BeginTransactionRequest begin;
730+
begin.set_session(session->session_name());
731+
*begin.mutable_options() = s.has_begin() ? s.begin() : s.single_use();
732+
auto stub = session_pool_->GetStub(*session);
733+
auto response = internal::RetryLoop(
734+
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
735+
true,
736+
[&stub](grpc::ClientContext& context,
737+
spanner_proto::BeginTransactionRequest const& request) {
738+
return stub->BeginTransaction(context, request);
739+
},
740+
begin, __func__);
741+
if (!response) {
742+
auto status = std::move(response).status();
743+
if (internal::IsSessionNotFound(status)) session->set_bad();
744+
return status;
745+
}
746+
s.set_id(response->id());
734747
}
748+
request.set_transaction_id(s.id());
735749

736750
auto stub = session_pool_->GetStub(*session);
737751
auto response = internal::RetryLoop(
738752
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
739-
is_idempotent,
753+
true,
740754
[&stub](grpc::ClientContext& context,
741755
spanner_proto::CommitRequest const& request) {
742756
return stub->Commit(context, request);

google/cloud/spanner/internal/connection_impl_test.cc

Lines changed: 89 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,23 @@ MATCHER_P(BatchCreateSessionsRequestHasDatabase, database,
7474
return arg.database() == database.FullName();
7575
}
7676

77+
// Matches a spanner::Transaction that is marked "is_bad()"
78+
MATCHER(HasBadSession, "bound to a session that's marked bad") {
79+
return internal::Visit(
80+
arg, [&](internal::SessionHolder& session,
81+
google::spanner::v1::TransactionSelector&, std::int64_t) {
82+
if (!session) {
83+
*result_listener << "has no session";
84+
return false;
85+
}
86+
if (!session->is_bad()) {
87+
*result_listener << "session expected to be bad, but was not";
88+
return false;
89+
}
90+
return true;
91+
});
92+
}
93+
7794
// Helper to set the Transaction's ID.
7895
void SetTransactionId(Transaction& txn, std::string tid) {
7996
internal::Visit(
@@ -1285,6 +1302,8 @@ TEST(ConnectionImplTest, CommitGetSession_TooManyTransientFailures) {
12851302
TEST(ConnectionImplTest, CommitGetSession_Retry) {
12861303
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
12871304

1305+
spanner_proto::Transaction txn;
1306+
txn.set_id("1234567890");
12881307
auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
12891308
auto conn = MakeLimitedRetryConnection(db, mock);
12901309
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
@@ -1300,21 +1319,78 @@ TEST(ConnectionImplTest, CommitGetSession_Retry) {
13001319
EXPECT_EQ(db.FullName(), request.database());
13011320
return MakeSessionsResponse({"test-session-name"});
13021321
});
1322+
EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
13031323
EXPECT_CALL(*mock, Commit(_, _))
1304-
.WillOnce([](grpc::ClientContext&,
1305-
spanner_proto::CommitRequest const& request) {
1324+
.WillOnce([&txn](grpc::ClientContext&,
1325+
spanner_proto::CommitRequest const& request) {
13061326
EXPECT_EQ("test-session-name", request.session());
1307-
EXPECT_TRUE(request.has_single_use_transaction());
1327+
EXPECT_EQ(txn.id(), request.transaction_id());
13081328
return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
13091329
});
13101330
auto commit = conn->Commit({MakeReadWriteTransaction(), {}});
13111331
EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
13121332
EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
13131333
}
13141334

1335+
TEST(ConnectionImplTest, CommitBeginTransaction_Retry) {
1336+
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1337+
1338+
spanner_proto::Transaction txn;
1339+
txn.set_id("1234567890");
1340+
auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1341+
auto conn = MakeLimitedRetryConnection(db, mock);
1342+
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1343+
.WillOnce(
1344+
[&db](grpc::ClientContext&,
1345+
spanner_proto::BatchCreateSessionsRequest const& request) {
1346+
EXPECT_EQ(db.FullName(), request.database());
1347+
return MakeSessionsResponse({"test-session-name"});
1348+
});
1349+
EXPECT_CALL(*mock, BeginTransaction(_, _))
1350+
.WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1351+
.WillOnce(Return(txn));
1352+
EXPECT_CALL(*mock, Commit(_, _))
1353+
.WillOnce([&txn](grpc::ClientContext&,
1354+
spanner_proto::CommitRequest const& request) {
1355+
EXPECT_EQ("test-session-name", request.session());
1356+
EXPECT_EQ(txn.id(), request.transaction_id());
1357+
spanner_proto::CommitResponse response;
1358+
*response.mutable_commit_timestamp() =
1359+
internal::ToProto(Timestamp{std::chrono::seconds(123)});
1360+
return response;
1361+
});
1362+
1363+
auto commit = conn->Commit({MakeReadWriteTransaction(), {}});
1364+
EXPECT_STATUS_OK(commit);
1365+
EXPECT_EQ(Timestamp(std::chrono::seconds(123)), commit->commit_timestamp);
1366+
}
1367+
1368+
TEST(ConnectionImplTest, CommitBeginTransaction_SessionNotFound) {
1369+
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1370+
auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1371+
auto conn = MakeLimitedRetryConnection(db, mock);
1372+
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1373+
.WillOnce(
1374+
[&db](grpc::ClientContext&,
1375+
spanner_proto::BatchCreateSessionsRequest const& request) {
1376+
EXPECT_EQ(db.FullName(), request.database());
1377+
return MakeSessionsResponse({"test-session-name"});
1378+
});
1379+
EXPECT_CALL(*mock, BeginTransaction(_, _))
1380+
.WillOnce(Return(Status(StatusCode::kNotFound, "Session not found")));
1381+
auto txn = MakeReadWriteTransaction();
1382+
auto commit = conn->Commit({txn, {}});
1383+
EXPECT_FALSE(commit.ok());
1384+
auto status = commit.status();
1385+
EXPECT_TRUE(IsSessionNotFound(status)) << status;
1386+
EXPECT_THAT(txn, HasBadSession());
1387+
}
1388+
13151389
TEST(ConnectionImplTest, CommitCommit_PermanentFailure) {
13161390
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
13171391

1392+
spanner_proto::Transaction txn;
1393+
txn.set_id("1234567890");
13181394
auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
13191395
auto conn = MakeLimitedRetryConnection(db, mock);
13201396
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
@@ -1324,11 +1400,12 @@ TEST(ConnectionImplTest, CommitCommit_PermanentFailure) {
13241400
EXPECT_EQ(db.FullName(), request.database());
13251401
return MakeSessionsResponse({"test-session-name"});
13261402
});
1403+
EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
13271404
EXPECT_CALL(*mock, Commit(_, _))
1328-
.WillOnce([](grpc::ClientContext&,
1329-
spanner_proto::CommitRequest const& request) {
1405+
.WillOnce([&txn](grpc::ClientContext&,
1406+
spanner_proto::CommitRequest const& request) {
13301407
EXPECT_EQ("test-session-name", request.session());
1331-
EXPECT_TRUE(request.has_single_use_transaction());
1408+
EXPECT_EQ(txn.id(), request.transaction_id());
13321409
return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
13331410
});
13341411
auto commit = conn->Commit({MakeReadWriteTransaction(), {}});
@@ -1339,6 +1416,8 @@ TEST(ConnectionImplTest, CommitCommit_PermanentFailure) {
13391416
TEST(ConnectionImplTest, CommitCommit_TooManyTransientFailures) {
13401417
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
13411418

1419+
spanner_proto::Transaction txn;
1420+
txn.set_id("1234567890");
13421421
auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
13431422
auto conn = MakeLimitedRetryConnection(db, mock);
13441423
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
@@ -1348,11 +1427,12 @@ TEST(ConnectionImplTest, CommitCommit_TooManyTransientFailures) {
13481427
EXPECT_EQ(db.FullName(), request.database());
13491428
return MakeSessionsResponse({"test-session-name"});
13501429
});
1430+
EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
13511431
EXPECT_CALL(*mock, Commit(_, _))
1352-
.WillOnce([](grpc::ClientContext&,
1353-
spanner_proto::CommitRequest const& request) {
1432+
.WillOnce([&txn](grpc::ClientContext&,
1433+
spanner_proto::CommitRequest const& request) {
13541434
EXPECT_EQ("test-session-name", request.session());
1355-
EXPECT_TRUE(request.has_single_use_transaction());
1435+
EXPECT_EQ(txn.id(), request.transaction_id());
13561436
return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
13571437
});
13581438
auto commit = conn->Commit({MakeReadWriteTransaction(), {}});
@@ -1398,34 +1478,6 @@ TEST(ConnectionImplTest, CommitCommit_IdempotentTransientSuccess) {
13981478
EXPECT_EQ(Timestamp(std::chrono::seconds(123)), commit->commit_timestamp);
13991479
}
14001480

1401-
TEST(ConnectionImplTest, CommitCommit_NonIdempotentTransientFailure) {
1402-
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1403-
1404-
auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1405-
auto conn = MakeConnection(db, {mock});
1406-
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1407-
.WillOnce(
1408-
[&db](grpc::ClientContext&,
1409-
spanner_proto::BatchCreateSessionsRequest const& request) {
1410-
EXPECT_EQ(db.FullName(), request.database());
1411-
return MakeSessionsResponse({"test-session-name"});
1412-
});
1413-
EXPECT_CALL(*mock, Commit(_, _))
1414-
.WillOnce([](grpc::ClientContext&,
1415-
spanner_proto::CommitRequest const& request) {
1416-
EXPECT_EQ("test-session-name", request.session());
1417-
return Status(StatusCode::kUnavailable, "try-again in Commit");
1418-
});
1419-
1420-
// Create a transaction without an id, that makes `Commit()` non-idempotent,
1421-
// so the transient failure should not be retried.
1422-
auto txn = MakeReadWriteTransaction();
1423-
1424-
auto commit = conn->Commit({txn, {}});
1425-
EXPECT_EQ(StatusCode::kUnavailable, commit.status().code());
1426-
EXPECT_THAT(commit.status().message(), HasSubstr("try-again in Commit"));
1427-
}
1428-
14291481
TEST(ConnectionImplTest, CommitSuccessWithTransactionId) {
14301482
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
14311483

@@ -2034,22 +2086,6 @@ TEST(ConnectionImplTest, TransactionOutlivesConnection) {
20342086
conn.reset();
20352087
}
20362088

2037-
MATCHER(HasBadSession, "bound to a session that's marked bad") {
2038-
return internal::Visit(
2039-
arg, [&](internal::SessionHolder& session,
2040-
google::spanner::v1::TransactionSelector&, std::int64_t) {
2041-
if (!session) {
2042-
*result_listener << "has no session";
2043-
return false;
2044-
}
2045-
if (!session->is_bad()) {
2046-
*result_listener << "session expected to be bad, but was not";
2047-
return false;
2048-
}
2049-
return true;
2050-
});
2051-
}
2052-
20532089
TEST(ConnectionImplTest, Read_SessionNotFound) {
20542090
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
20552091
EXPECT_CALL(*mock, BatchCreateSessions(_, _))

0 commit comments

Comments
 (0)