Skip to content

Commit 56be30f

Browse files
committed
Add partitioned DML support (#459)
* Add 'Datatbase.execute_partitioned_dml' method. * Add system test which exercises PDML. both for UPDATE (with parameter) and DELETE.
1 parent 08a3399 commit 56be30f

File tree

4 files changed

+380
-194
lines changed

4 files changed

+380
-194
lines changed

spanner/google/cloud/spanner_v1/database.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,32 @@
1414

1515
"""User friendly container for Cloud Spanner Database."""
1616

17+
import copy
18+
import functools
1719
import re
1820
import threading
19-
import copy
2021

2122
from google.api_core.gapic_v1 import client_info
2223
import google.auth.credentials
24+
from google.protobuf.struct_pb2 import Struct
2325
from google.cloud.exceptions import NotFound
2426
import six
2527

2628
# pylint: disable=ungrouped-imports
2729
from google.cloud.spanner_v1 import __version__
30+
from google.cloud.spanner_v1._helpers import _make_value_pb
2831
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
2932
from google.cloud.spanner_v1.batch import Batch
3033
from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient
3134
from google.cloud.spanner_v1.keyset import KeySet
3235
from google.cloud.spanner_v1.pool import BurstyPool
3336
from google.cloud.spanner_v1.pool import SessionCheckout
3437
from google.cloud.spanner_v1.session import Session
38+
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
3539
from google.cloud.spanner_v1.snapshot import Snapshot
40+
from google.cloud.spanner_v1.streamed import StreamedResultSet
41+
from google.cloud.spanner_v1.proto.transaction_pb2 import (
42+
TransactionSelector, TransactionOptions)
3643
# pylint: enable=ungrouped-imports
3744

3845

@@ -272,6 +279,70 @@ def drop(self):
272279
metadata = _metadata_with_prefix(self.name)
273280
api.drop_database(self.name, metadata=metadata)
274281

282+
def execute_partitioned_dml(
283+
self, dml, params=None, param_types=None, query_mode=None):
284+
"""Execute a partitionable DML statement.
285+
286+
:type dml: str
287+
:param dml: SQL DML statement
288+
289+
:type params: dict, {str -> column value}
290+
:param params: values for parameter replacement. Keys must match
291+
the names used in ``dml``.
292+
293+
:type param_types: dict[str -> Union[dict, .types.Type]]
294+
:param param_types:
295+
(Optional) maps explicit types for one or more param values;
296+
required if parameters are passed.
297+
298+
:type query_mode:
299+
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryMode`
300+
:param query_mode: Mode governing return of results / query plan. See
301+
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
302+
303+
:rtype: int
304+
:returns: Count of rows affected by the DML statement.
305+
"""
306+
if params is not None:
307+
if param_types is None:
308+
raise ValueError(
309+
"Specify 'param_types' when passing 'params'.")
310+
params_pb = Struct(fields={
311+
key: _make_value_pb(value) for key, value in params.items()})
312+
else:
313+
params_pb = None
314+
315+
api = self.spanner_api
316+
317+
txn_options = TransactionOptions(
318+
partitioned_dml=TransactionOptions.PartitionedDml())
319+
320+
metadata = _metadata_with_prefix(self.name)
321+
322+
with SessionCheckout(self._pool) as session:
323+
324+
txn = api.begin_transaction(
325+
session.name, txn_options, metadata=metadata)
326+
327+
txn_selector = TransactionSelector(id=txn.id)
328+
329+
restart = functools.partial(
330+
api.execute_streaming_sql,
331+
session.name,
332+
dml,
333+
transaction=txn_selector,
334+
params=params_pb,
335+
param_types=param_types,
336+
query_mode=query_mode,
337+
metadata=metadata)
338+
339+
iterator = _restart_on_unavailable(restart)
340+
341+
result_set = StreamedResultSet(iterator)
342+
list(result_set) # consume all partials
343+
344+
return result_set.stats.row_count_lower_bound
345+
275346
def session(self, labels=None):
276347
"""Factory to create a session for this database.
277348

spanner/google/cloud/spanner_v1/transaction.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,8 @@ def execute_update(self, dml, params=None, param_types=None,
149149
:param query_mode: Mode governing return of results / query plan. See
150150
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
151151
152-
:rtype:
153-
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.ResultSetStats`
154-
:returns:
155-
stats object, including count of rows affected by the DML
156-
statement.
152+
:rtype: int
153+
:returns: Count of rows affected by the DML statement.
157154
"""
158155
if params is not None:
159156
if param_types is None:

spanner/tests/system/test_system.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,62 @@ def test_transaction_execute_update_then_insert_commit(self):
730730
rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL))
731731
self._check_rows_data(rows)
732732

733+
def test_execute_partitioned_dml(self):
734+
retry = RetryInstanceState(_has_all_ddl)
735+
retry(self._db.reload)()
736+
737+
delete_statement = 'DELETE FROM {} WHERE true'.format(self.TABLE)
738+
739+
def _setup_table(txn):
740+
txn.execute_update(delete_statement)
741+
for insert_statement in self._generate_insert_statements():
742+
txn.execute_update(insert_statement)
743+
744+
committed = self._db.run_in_transaction(_setup_table)
745+
746+
with self._db.snapshot(read_timestamp=committed) as snapshot:
747+
before_pdml = list(snapshot.read(
748+
self.TABLE, self.COLUMNS, self.ALL))
749+
750+
self._check_rows_data(before_pdml)
751+
752+
nonesuch = '[email protected]'
753+
target = '[email protected]'
754+
update_statement = (
755+
'UPDATE {table} SET {table}.email = @email '
756+
'WHERE {table}.email = @target').format(
757+
table=self.TABLE)
758+
759+
row_count = self._db.execute_partitioned_dml(
760+
update_statement,
761+
params={
762+
'email': nonesuch,
763+
'target': target,
764+
},
765+
param_types={
766+
'email': Type(code=STRING),
767+
'target': Type(code=STRING),
768+
},
769+
)
770+
self.assertEqual(row_count, 1)
771+
772+
row = self.ROW_DATA[0]
773+
updated = [row[:3] + (nonesuch,)] + list(self.ROW_DATA[1:])
774+
775+
with self._db.snapshot(read_timestamp=committed) as snapshot:
776+
after_update = list(snapshot.read(
777+
self.TABLE, self.COLUMNS, self.ALL))
778+
self._check_rows_data(after_update, updated)
779+
780+
row_count = self._db.execute_partitioned_dml(delete_statement)
781+
self.assertEqual(row_count, len(self.ROW_DATA))
782+
783+
with self._db.snapshot(read_timestamp=committed) as snapshot:
784+
after_delete = list(snapshot.read(
785+
self.TABLE, self.COLUMNS, self.ALL))
786+
787+
self._check_rows_data(after_delete, [])
788+
733789
def _transaction_concurrency_helper(self, unit_of_work, pkey):
734790
INITIAL_VALUE = 123
735791
NUM_THREADS = 3 # conforms to equivalent Java systest.

0 commit comments

Comments
 (0)