-
Notifications
You must be signed in to change notification settings - Fork 89
feat: initial support for Extended Operations #344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
software-dov
merged 5 commits into
googleapis:main
from
software-dov:diregapic-operations
Mar 2, 2022
Merged
Changes from 2 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
97da2d9
feat: initial support for Extended Operations
software-dov 3b3da9d
Error codes are http error codes
software-dov f2caa5c
Integrate reviews
software-dov 9f55dc0
Update extended_operation.py
software-dov b6c9582
Merge branch 'main' into diregapic-operations
software-dov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
# Copyright 2022 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Futures for extended long-running operations returned from Google Cloud APIs. | ||
|
||
These futures can be used to synchronously wait for the result of a | ||
lon-running operations using :meth:`ExtendedOperation.result`: | ||
|
||
.. code-black:: python | ||
|
||
extended_operation = my_api_client.long_running_method() | ||
result = | ||
|
||
""" | ||
|
||
import threading | ||
|
||
from google.api_core import exceptions | ||
from google.api_core.future import polling | ||
|
||
|
||
class ExtendedOperation(polling.PollingFuture): | ||
"""An ExtendedOperation future for interacting with a Google API Long-Running Operation. | ||
|
||
Args: | ||
extended_operation (proto.Message): The initial operation. | ||
refresh (Callable[[], type(extended_operation)]): A callable that returns | ||
the latest state of the operation. | ||
cancel (Callable[[], None]): A callable that tries to cancel the operation. | ||
retry: Optional(google.api_core.retry.Retry): The retry configuration used | ||
when polling. This can be used to control how often :meth:`done` | ||
is polled. Regardless of the retry's ``deadline``, it will be | ||
overridden by the ``timeout`` argument to :meth:`result`. | ||
|
||
Note: Most long-running API methods use google.api_core.operation.Operation | ||
This class is a wrapper for a subset of methods that use alternative | ||
Long-Running Operation (LRO) semantics. | ||
""" | ||
|
||
def __init__( | ||
self, extended_operation, refresh, cancel, retry=polling.DEFAULT_RETRY | ||
): | ||
super().__init__(retry=retry) | ||
# Note: there is not a concrete type the extended operation must be. | ||
# It MUST have fields that correspond to the following, POSSIBLY WITH DIFFERENT NAMES: | ||
# * name: str | ||
# * status: Union[str, bool, enum.Enum] | ||
# * error_code: int | ||
# * error_message: str | ||
software-dov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._extended_operation = extended_operation | ||
self._refresh = refresh | ||
self._cancel = cancel | ||
# Note: the extended operation does not give a good way to indicate cancellation. | ||
# We make do with manually tracking cancellation and checking for doneness. | ||
self._cancelled = False | ||
self._completion_lock = threading.Lock() | ||
# Invoke in case the operation came back already complete. | ||
self._handle_refreshed_operation() | ||
|
||
# Note: the following four properties MUST be overridden in a subclass | ||
# if, and only if, the fields in the corresponding extended operation message | ||
# have different names. | ||
# | ||
# E.g. we have an extended operation class that looks like | ||
# | ||
# class MyOperation(proto.Message): | ||
# moniker = proto.Field(proto.STRING, number=1) | ||
# status_msg = proto.Field(proto.STRING, number=2) | ||
# optional http_error_code = proto.Field(proto.INT32, number=3) | ||
# optional http_error_msg = proto.Field(proto.STRING, number=4) | ||
# | ||
# the ExtendedOperation subclass would provide property overrrides that map | ||
# to these (poorly named) fields. | ||
@property | ||
def name(self): | ||
return self._extended_operation.name | ||
|
||
@property | ||
def status(self): | ||
return self._extended_operation.status | ||
|
||
@property | ||
def error_code(self): | ||
return self._extended_operation.error_code | ||
|
||
@property | ||
def error_message(self): | ||
return self._extended_operation.error_message | ||
|
||
def done(self, retry=polling.DEFAULT_RETRY): | ||
self._refresh_and_update(retry) | ||
return self._extended_operation.done | ||
|
||
def cancel(self): | ||
if self.done(): | ||
return False | ||
|
||
self._cancel() | ||
self._cancelled = True | ||
return True | ||
|
||
def cancelled(self): | ||
# TODO(dovs): there is not currently a good way to determine whether the | ||
# operation has been cancelled. | ||
# The best we can do is manually keep track of cancellation | ||
# and check for doneness. | ||
if not self._cancelled: | ||
return False | ||
|
||
self._refresh_and_update() | ||
return self._extended_operation.done | ||
|
||
def _refresh_and_update(self, retry=polling.DEFAULT_RETRY): | ||
if not self._extended_operation.done: | ||
self._extended_operation = self._refresh(retry=retry) | ||
self._handle_refreshed_operation() | ||
|
||
def _handle_refreshed_operation(self): | ||
with self._completion_lock: | ||
if not self._extended_operation.done: | ||
return | ||
|
||
if self.error_code and self.error_message: | ||
exception = exceptions.from_http_status( | ||
status_code=self.error_code, | ||
message=self.error_message, | ||
response=self._extended_operation, | ||
) | ||
self.set_exception(exception) | ||
elif self.error_code or self.error_message: | ||
exception = exceptions.GoogleAPICallError( | ||
f"Unexpected error {self.error_code}: {self.error_message}" | ||
) | ||
self.set_exception(exception) | ||
else: | ||
# Extended operations have no payload. | ||
self.set_result(None) | ||
|
||
@classmethod | ||
def make(cls, refresh, cancel, extended_operation, **kwargs): | ||
# Note: it is the caller's responsibility to set up refresh and cancel | ||
# with their correct request argument. | ||
# The reason for this is that the services that use Extended Operations | ||
# have rpcs that look something like the following: | ||
# // service.proto | ||
# service MyLongService { | ||
# rpc StartLongTask(StartLongTaskRequest) returns (ExtendedOperation) { | ||
# option (google.cloud.operation_service) = "CustomOperationService"; | ||
# } | ||
# } | ||
# | ||
# service CustomOperationService { | ||
# rpc Get(GetOperationRequest) returns (ExtendedOperation) { | ||
# option (google.cloud.operation_polling_method) = true; | ||
# } | ||
# } | ||
# | ||
# Any info needed for the poll, e.g. a name, path params, etc. | ||
# is held in the request, which the initial client method is in a much | ||
# better position to make made because the caller made the initial request. | ||
# | ||
# TL;DR: the caller sets up closures for refresh and cancel that carry | ||
# the properly configured requests. | ||
software-dov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return cls(extended_operation, refresh, cancel, **kwargs) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.