Skip to content

feat: Added read_time as a parameter to various calls (synchronous/base classes) #1050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion google/cloud/firestore_v1/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
from google.cloud.firestore_v1.query_profile import ExplainMetrics
from google.cloud.firestore_v1.query_profile import ExplainOptions

import datetime


class AggregationQuery(BaseAggregationQuery):
"""Represents an aggregation query to the Firestore API."""
Expand All @@ -56,6 +58,7 @@ def get(
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> QueryResultsList[AggregationResult]:
"""Runs the aggregation query.

Expand All @@ -78,6 +81,10 @@ def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
QueryResultsList[AggregationResult]: The aggregation query results.
Expand All @@ -90,6 +97,7 @@ def get(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
result_list = list(result)

Expand All @@ -100,13 +108,16 @@ def get(

return QueryResultsList(result_list, explain_options, explain_metrics)

def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None):
def _get_stream_iterator(
self, transaction, retry, timeout, explain_options=None, read_time=None
):
"""Helper method for :meth:`stream`."""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
explain_options,
read_time,
)

return self._client._firestore_api.run_aggregation_query(
Expand All @@ -132,6 +143,7 @@ def _make_stream(
retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
"""Internal method for stream(). Runs the aggregation query.

Expand All @@ -155,6 +167,10 @@ def _make_stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Yields:
List[AggregationResult]:
Expand All @@ -172,6 +188,7 @@ def _make_stream(
retry,
timeout,
explain_options,
read_time,
)
while True:
try:
Expand All @@ -182,6 +199,8 @@ def _make_stream(
transaction,
retry,
timeout,
explain_options,
read_time,
)
continue
else:
Expand All @@ -206,6 +225,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> StreamGenerator[List[AggregationResult]]:
"""Runs the aggregation query.

Expand All @@ -229,6 +249,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
`StreamGenerator[List[AggregationResult]]`:
Expand All @@ -239,5 +263,6 @@ def stream(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
return StreamGenerator(inner_generator, explain_options)
19 changes: 19 additions & 0 deletions google/cloud/firestore_v1/async_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from google.cloud.firestore_v1.base_aggregation import AggregationResult
from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions
import google.cloud.firestore_v1.types.query_profile as query_profile_pb
import datetime


class AsyncAggregationQuery(BaseAggregationQuery):
Expand All @@ -55,6 +56,7 @@ async def get(
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> QueryResultsList[List[AggregationResult]]:
"""Runs the aggregation query.

Expand All @@ -75,6 +77,10 @@ async def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
QueryResultsList[List[AggregationResult]]: The aggregation query results.
Expand All @@ -87,6 +93,7 @@ async def get(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
try:
result = [aggregation async for aggregation in stream_result]
Expand All @@ -106,6 +113,7 @@ async def _make_stream(
retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]:
"""Internal method for stream(). Runs the aggregation query.

Expand All @@ -130,6 +138,10 @@ async def _make_stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Yields:
List[AggregationResult] | query_profile_pb.ExplainMetrics:
Expand All @@ -143,6 +155,7 @@ async def _make_stream(
retry,
timeout,
explain_options,
read_time,
)

response_iterator = await self._client._firestore_api.run_aggregation_query(
Expand All @@ -167,6 +180,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> AsyncStreamGenerator[List[AggregationResult]]:
"""Runs the aggregation query.

Expand All @@ -190,6 +204,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
`AsyncStreamGenerator[List[AggregationResult]]`:
Expand All @@ -201,5 +219,6 @@ def stream(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
return AsyncStreamGenerator(inner_generator, explain_options)
22 changes: 18 additions & 4 deletions google/cloud/firestore_v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
grpc_asyncio as firestore_grpc_transport,
)

if TYPE_CHECKING:
from google.cloud.firestore_v1.bulk_writer import BulkWriter # pragma: NO COVER
if TYPE_CHECKING: # pragma: NO COVER
import datetime

from google.cloud.firestore_v1.bulk_writer import BulkWriter


class AsyncClient(BaseClient):
Expand Down Expand Up @@ -227,6 +229,8 @@ async def get_all(
transaction: AsyncTransaction | None = None,
retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
timeout: float | None = None,
*,
read_time: datetime.datetime | None = None,
) -> AsyncGenerator[DocumentSnapshot, Any]:
"""Retrieve a batch of documents.

Expand Down Expand Up @@ -261,13 +265,17 @@ async def get_all(
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Yields:
.DocumentSnapshot: The next document snapshot that fulfills the
query, or :data:`None` if the document does not exist.
"""
request, reference_map, kwargs = self._prep_get_all(
references, field_paths, transaction, retry, timeout
references, field_paths, transaction, retry, timeout, read_time
)

response_iterator = await self._firestore_api.batch_get_documents(
Expand All @@ -283,6 +291,8 @@ async def collections(
self,
retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
timeout: float | None = None,
*,
read_time: datetime.datetime | None = None,
) -> AsyncGenerator[AsyncCollectionReference, Any]:
"""List top-level collections of the client's database.

Expand All @@ -291,12 +301,16 @@ async def collections(
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
Sequence[:class:`~google.cloud.firestore_v1.async_collection.AsyncCollectionReference`]:
iterator of subcollections of the current document.
"""
request, kwargs = self._prep_collections(retry, timeout)
request, kwargs = self._prep_collections(retry, timeout, read_time)
iterator = await self._firestore_api.list_collection_ids(
request=request,
metadata=self._rpc_metadata,
Expand Down
26 changes: 25 additions & 1 deletion google/cloud/firestore_v1/async_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from google.cloud.firestore_v1.document import DocumentReference

if TYPE_CHECKING: # pragma: NO COVER
import datetime

from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from google.cloud.firestore_v1.query_profile import ExplainOptions
Expand Down Expand Up @@ -162,6 +164,8 @@ async def list_documents(
page_size: int | None = None,
retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
timeout: float | None = None,
*,
read_time: datetime.datetime | None = None,
) -> AsyncGenerator[DocumentReference, None]:
"""List all subdocuments of the current collection.

Expand All @@ -173,14 +177,20 @@ async def list_documents(
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
iterator of subdocuments of the current collection. If the
collection does not exist at the time of `snapshot`, the
iterator will be empty
"""
request, kwargs = self._prep_list_documents(page_size, retry, timeout)
request, kwargs = self._prep_list_documents(
page_size, retry, timeout, read_time
)

iterator = await self._client._firestore_api.list_documents(
request=request,
Expand All @@ -197,6 +207,7 @@ async def get(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> QueryResultsList[DocumentSnapshot]:
"""Read the documents in this collection.

Expand All @@ -216,6 +227,10 @@ async def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

If a ``transaction`` is used and it already has write operations added,
this method cannot be used (i.e. read-after-write is not allowed).
Expand All @@ -227,6 +242,8 @@ async def get(
query, kwargs = self._prep_get_or_stream(retry, timeout)
if explain_options is not None:
kwargs["explain_options"] = explain_options
if read_time is not None:
kwargs["read_time"] = read_time

return await query.get(transaction=transaction, **kwargs)

Expand All @@ -237,6 +254,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> AsyncStreamGenerator[DocumentSnapshot]:
"""Read the documents in this collection.

Expand Down Expand Up @@ -268,6 +286,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
`AsyncStreamGenerator[DocumentSnapshot]`: A generator of the query
Expand All @@ -276,5 +298,7 @@ def stream(
query, kwargs = self._prep_get_or_stream(retry, timeout)
if explain_options:
kwargs["explain_options"] = explain_options
if read_time is not None:
kwargs["read_time"] = read_time

return query.stream(transaction=transaction, **kwargs)
Loading
Loading