diff --git a/jupiterone/client.py b/jupiterone/client.py index 9fa9577..e7af0ec 100644 --- a/jupiterone/client.py +++ b/jupiterone/client.py @@ -31,6 +31,7 @@ UPDATE_RELATIONSHIPV2, DELETE_RELATIONSHIP, CURSOR_QUERY_V1, + DEFERRED_RESPONSE_QUERY, CREATE_INSTANCE, INTEGRATION_JOB_VALUES, INTEGRATION_INSTANCE_EVENT_VALUES, @@ -265,6 +266,86 @@ def _limit_and_skip_query( page += 1 return {"data": results} + + def query_with_deferred_response(self, query, cursor=None): + """ + Execute a J1QL query that returns a deferred response for handling large result sets. + + Args: + query (str): The J1QL query to execute + cursor (str, optional): Pagination cursor for subsequent requests + + Returns: + list: Combined results from all paginated responses + """ + all_query_results = [] + current_cursor = cursor + + while True: + variables = { + "query": query, + "deferredResponse": "FORCE", + "cursor": current_cursor, + "flags": {"variableResultSize": True} + } + + payload = { + "query": DEFERRED_RESPONSE_QUERY, + "variables": variables + } + + # Use session with retries for reliability + max_retries = 5 + backoff_factor = 2 + + for attempt in range(1, max_retries + 1): + + session = requests.Session() + retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 429]) + session.mount('https://', HTTPAdapter(max_retries=retries)) + + # Get the download URL + url_response = session.post( + self.graphql_url, + headers=self.headers, + json=payload, + timeout=60 + ) + + if url_response.status_code == 429: + retry_after = int(url_response.headers.get("Retry-After", backoff_factor ** attempt)) + print(f"Rate limited. Retrying in {retry_after} seconds...") + time.sleep(retry_after) + else: + break # Exit on success or other non-retryable error + + if url_response.ok: + + download_url = url_response.json()['data']['queryV1']['url'] + + # Poll the download URL until results are ready + while True: + download_response = session.get(download_url, timeout=60).json() + status = download_response['status'] + + if status != 'IN_PROGRESS': + break + + time.sleep(0.2) # Sleep 200 milliseconds between checks + + # Add results to the collection + all_query_results.extend(download_response['data']) + + # Check for more pages + if 'cursor' in download_response: + current_cursor = download_response['cursor'] + else: + break + + else: + print(f"Request failed after {max_retries} attempts. Status: {url_response.status_code}") + + return all_query_results def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict: """Executes POST request to SyncAPI endpoints""" diff --git a/jupiterone/constants.py b/jupiterone/constants.py index 26ac77a..f2feec0 100644 --- a/jupiterone/constants.py +++ b/jupiterone/constants.py @@ -131,7 +131,6 @@ entityRawDataLegacy(entityId: $entityId, , source: $source) { entityId payload { - ... on RawDataJSONEntityLegacy { contentType name @@ -360,7 +359,7 @@ ...IntegrationDefinitionConfigFragment @include(if: $includeConfig) __typename } - + fragment IntegrationDefinitionConfigFragment on IntegrationDefinition { configFields { ...ConfigFieldsRecursive @@ -387,7 +386,7 @@ } __typename } - + fragment ConfigFieldsRecursive on ConfigField { ...ConfigFieldValues configFields { @@ -400,7 +399,7 @@ } __typename } - + fragment ConfigFieldValues on ConfigField { key displayName @@ -450,7 +449,7 @@ } __typename } - + query IntegrationInstances($definitionId: String, $cursor: String, $limit: Int, $filter: ListIntegrationInstancesSearchFilter) { integrationInstancesV2( definitionId: $definitionId @@ -507,7 +506,7 @@ collectorPoolId __typename } - + fragment IntegrationInstanceJobValues on IntegrationJob { id status @@ -517,7 +516,7 @@ hasSkippedSteps __typename } - + query IntegrationInstance($integrationInstanceId: String!) { integrationInstance(id: $integrationInstanceId) { ...IntegrationInstanceValues @@ -577,6 +576,24 @@ } } """ +DEFERRED_RESPONSE_QUERY = """ +query J1QL( + $query: String! + $variables: JSON + $cursor: String + $deferredResponse: DeferredResponseOption +) { + queryV1( + query: $query + variables: $variables + deferredResponse: $deferredResponse + cursor: $cursor + ) { + type + url + } +} +""" J1QL_FROM_NATURAL_LANGUAGE = """ query j1qlFromNaturalLanguage($input: J1qlFromNaturalLanguageInput!) { j1qlFromNaturalLanguage(input: $input) { @@ -660,7 +677,7 @@ __typename } } - + fragment RuleInstanceFields on QuestionRuleInstance { id accountId @@ -726,7 +743,7 @@ __typename } } - + fragment RuleInstanceFields on QuestionRuleInstance { id accountId @@ -869,7 +886,7 @@ __typename } } - + fragment QuestionFields on Question { id sourceId