refactor retry logic on mutate row#4665
Conversation
|
Can you please add a description for the pull request? |
theacodes
left a comment
There was a problem hiding this comment.
This PR also needs tests.
| bigtable_pb2 as messages_v2_pb2) | ||
| from google.api_core import retry | ||
| from google.cloud import exceptions | ||
| from functools import partial, update_wrapper |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| """The maximum number of mutations that a row can accumulate.""" | ||
|
|
||
|
|
||
| def wrapped_partial(func, *args, **kwargs): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| self._table, | ||
| request_pb) | ||
| retry_ = retry.Retry( | ||
| predicate=retry.if_exception_type(exceptions.GrpcRendezvous), |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| mutations_list.extend(to_append) | ||
|
|
||
|
|
||
| def call_mutate_row(table, request_pb): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
@aneepct this PR has open comments and little activity, do you still intend to merge this? |
|
I am actively resolving all the review questions now and will advance to the state needed for merging. Please also see question from @zakons above. |
|
All questions / requests have been responded to. If it is not possible in the current release to get the mapping to exceptions.ServiceUnavailable from the _Rendezvous exception (see comment above), this could be done later when that mapping is available. Can we move forward with the merge? |
theacodes
left a comment
There was a problem hiding this comment.
This still needs tests.
| self._table, | ||
| request_pb) | ||
| retry_ = retry.Retry( | ||
| predicate=retry.if_exception_type(exceptions.GrpcRendezvous), |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| return partial_func | ||
|
|
||
|
|
||
| def call_mutate_row(table, request_pb): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| mutations_list.extend(to_append) | ||
|
|
||
|
|
||
| def wrapped_partial(func, *args, **kwargs): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
core/google/cloud/exceptions.py
Outdated
| BadGateway = exceptions.BadGateway | ||
| ServiceUnavailable = exceptions.ServiceUnavailable | ||
| GatewayTimeout = exceptions.GatewayTimeout | ||
| RetryError = exceptions.RetryError |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
Thanks. Almost there. Just not sure how to do the checking for ServiceUnavailable (the exception that we want). Here is the retry logic with the predicate you are recommending: Where does the checking for exceptions.ServiceUnavailable go? |
|
@zakons you'll need to basically write a custom predicate function: def _should_retry_commit_exception(exc):
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, exceptions.ServiceUnavailable)
...
retry_commit = functools.partial(
_call_mutate_row,
table=self._table,
request_pb=request_pb)
retry_ = retry.Retry(
predicate=_should_retry_commit_exception,
deadline=30)
retry_(retry_commit)() |
|
@tseaver @jonparrott @sduskis The circleci coverage is now at 100% and our endurance test looks good, can we please have this PR merged? Many thanks for your help. |
|
LGTM if @jonparrott says go. |
|
|
||
| import struct | ||
|
|
||
| import functools |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| import functools | ||
| import six | ||
| import grpc |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| data_pb2 as data_v2_pb2) | ||
| from google.cloud.bigtable._generated import ( | ||
| bigtable_pb2 as messages_v2_pb2) | ||
| from google.api_core import retry |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| return isinstance(exc, exceptions.ServiceUnavailable) | ||
|
|
||
|
|
||
| def _call_mutate_row(table, request_pb): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| client = self._table._instance._client | ||
| client._data_stub.MutateRow(request_pb) | ||
|
|
||
| retry_commit = functools.partial( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_row.py
Outdated
|
|
||
| expected_result = True | ||
| result = _check_rendezvous_exception( | ||
| grpc._channel._Rendezvous(State, None, None, 0)) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
core/google/cloud/exceptions.py
Outdated
| BadGateway = exceptions.BadGateway | ||
| ServiceUnavailable = exceptions.ServiceUnavailable | ||
| GatewayTimeout = exceptions.GatewayTimeout | ||
| RetryError = exceptions.RetryError |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
@jonparrott How do i use RpcError and set the code to UNAVAILABLE? |
|
@jonparrott @zakons OK, we are at 100% coverage and all of your review comments have been incorporated. Please review and if OK, please merge. |
|
@jonparrott @tseaver Can we please merge this PR now that all requested changes have been applied? Our endurance test results are all fine as well, meaning that the UNAVAILABLE exception is caught and the commit retried until success. |
|
Thanks everyone! Sorry for the protracted review process here, I'm glad we ended up with something short and sweet. :) |
|
@jonparrott Thank-you! We were impressed with your reviews. In fact, I have been looking at your python testing style guide and am finding it very clear and helpful. |
|
Awesome, I'm glad to be helpful. :) |
Added retry logic for commit on a DirectRow. The retry uses the retry.py module and performs retry when a Rendezvous exception is thrown due to a disconnect on the channel. The retry will call MutateRow on the original protobuf request with a timeout backoff algorithm provided in retry.py. This has been tested using an endurance test framework that runs for 23 hours and was receiving 8+ exceptions previously. Now all the retries are successful.