-
Notifications
You must be signed in to change notification settings - Fork 1.7k
[DRAFT] feat: added client side metric instrumentation to read_rows and mutate_rows #16758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: bigtable_csm_1_basic_instrumentation
Are you sure you want to change the base?
Changes from all commits
a78cb52
0ab1b05
83cbcf0
7ca6e11
b94e9d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,10 +22,8 @@ | |
| import google.cloud.bigtable.data.exceptions as bt_exceptions | ||
| import google.cloud.bigtable_v2.types.bigtable as types_pb | ||
| from google.cloud.bigtable.data._cross_sync import CrossSync | ||
| from google.cloud.bigtable.data._helpers import ( | ||
| _attempt_timeout_generator, | ||
| _retry_exception_factory, | ||
| ) | ||
| from google.cloud.bigtable.data._helpers import _attempt_timeout_generator | ||
| from google.cloud.bigtable.data._metrics import tracked_retry | ||
|
|
||
| # mutate_rows requests are limited to this number of mutations | ||
| from google.cloud.bigtable.data.mutations import ( | ||
|
|
@@ -34,6 +32,7 @@ | |
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from google.cloud.bigtable.data._metrics import ActiveOperationMetric | ||
| from google.cloud.bigtable.data.mutations import RowMutationEntry | ||
|
|
||
| if CrossSync.is_async: | ||
|
|
@@ -72,6 +71,8 @@ class _MutateRowsOperationAsync: | |
| operation_timeout: the timeout to use for the entire operation, in seconds. | ||
| attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds. | ||
| If not specified, the request will run until operation_timeout is reached. | ||
| metric: the metric object representing the active operation | ||
| retryable_exceptions: a list of exceptions that should be retried | ||
| """ | ||
|
|
||
| @CrossSync.convert | ||
|
|
@@ -82,6 +83,7 @@ def __init__( | |
| mutation_entries: list["RowMutationEntry"], | ||
| operation_timeout: float, | ||
| attempt_timeout: float | None, | ||
| metric: ActiveOperationMetric, | ||
| retryable_exceptions: Sequence[type[Exception]] = (), | ||
| ): | ||
| # check that mutations are within limits | ||
|
|
@@ -101,13 +103,12 @@ def __init__( | |
| # Entry level errors | ||
| bt_exceptions._MutateRowsIncomplete, | ||
| ) | ||
| sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) | ||
| self._operation = lambda: CrossSync.retry_target( | ||
| self._run_attempt, | ||
| self.is_retryable, | ||
| sleep_generator, | ||
| operation_timeout, | ||
| exception_factory=_retry_exception_factory, | ||
| self._operation = lambda: tracked_retry( | ||
| retry_fn=CrossSync.retry_target, | ||
| operation=metric, | ||
| target=self._run_attempt, | ||
| predicate=self.is_retryable, | ||
| timeout=operation_timeout, | ||
| ) | ||
| # initialize state | ||
| self.timeout_generator = _attempt_timeout_generator( | ||
|
|
@@ -116,6 +117,8 @@ def __init__( | |
| self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries] | ||
| self.remaining_indices = list(range(len(self.mutations))) | ||
| self.errors: dict[int, list[Exception]] = {} | ||
| # set up metrics | ||
| self._operation_metric = metric | ||
|
|
||
| @CrossSync.convert | ||
| async def start(self): | ||
|
|
@@ -125,34 +128,35 @@ async def start(self): | |
| Raises: | ||
| MutationsExceptionGroup: if any mutations failed | ||
| """ | ||
| try: | ||
| # trigger mutate_rows | ||
| await self._operation() | ||
| except Exception as exc: | ||
| # exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations | ||
| incomplete_indices = self.remaining_indices.copy() | ||
| for idx in incomplete_indices: | ||
| self._handle_entry_error(idx, exc) | ||
| finally: | ||
| # raise exception detailing incomplete mutations | ||
| all_errors: list[Exception] = [] | ||
| for idx, exc_list in self.errors.items(): | ||
| if len(exc_list) == 0: | ||
| raise core_exceptions.ClientError( | ||
| f"Mutation {idx} failed with no associated errors" | ||
| with self._operation_metric: | ||
| try: | ||
| # trigger mutate_rows | ||
| await self._operation() | ||
| except Exception as exc: | ||
| # exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations | ||
| incomplete_indices = self.remaining_indices.copy() | ||
| for idx in incomplete_indices: | ||
| self._handle_entry_error(idx, exc) | ||
| finally: | ||
| # raise exception detailing incomplete mutations | ||
| all_errors: list[Exception] = [] | ||
| for idx, exc_list in self.errors.items(): | ||
| if len(exc_list) == 0: | ||
| raise core_exceptions.ClientError( | ||
| f"Mutation {idx} failed with no associated errors" | ||
| ) | ||
| elif len(exc_list) == 1: | ||
| cause_exc = exc_list[0] | ||
| else: | ||
| cause_exc = bt_exceptions.RetryExceptionGroup(exc_list) | ||
| entry = self.mutations[idx].entry | ||
| all_errors.append( | ||
| bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc) | ||
| ) | ||
| if all_errors: | ||
| raise bt_exceptions.MutationsExceptionGroup( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this affect extracting the grpc status code for the metric since we wrap it in a different exception?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have logic for converting exceptions to statuses is here. When we encounter an exception group, we use the last failure as the status |
||
| all_errors, len(self.mutations) | ||
| ) | ||
| elif len(exc_list) == 1: | ||
| cause_exc = exc_list[0] | ||
| else: | ||
| cause_exc = bt_exceptions.RetryExceptionGroup(exc_list) | ||
| entry = self.mutations[idx].entry | ||
| all_errors.append( | ||
| bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc) | ||
| ) | ||
| if all_errors: | ||
| raise bt_exceptions.MutationsExceptionGroup( | ||
| all_errors, len(self.mutations) | ||
| ) | ||
|
|
||
| @CrossSync.convert | ||
| async def _run_attempt(self): | ||
|
|
@@ -164,6 +168,8 @@ async def _run_attempt(self): | |
| retry after the attempt is complete | ||
| GoogleAPICallError: if the gapic rpc fails | ||
| """ | ||
| # register attempt start | ||
| self._operation_metric.start_attempt() | ||
| request_entries = [self.mutations[idx].proto for idx in self.remaining_indices] | ||
| # track mutations in this request that have not been finalized yet | ||
| active_request_indices = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where did sleep_generator and exception_factory go? will this still wrap timeout exceptions correctly into Deadline Exceeded excetpion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are encapsulated in tracked_retry: https://github.com/googleapis/google-cloud-python/blob/bigtable_csm_2_instrumentation_advanced/packages/google-cloud-bigtable/google/cloud/bigtable/data/_metrics/tracked_retry.py