Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions jupiterone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
UPDATE_RELATIONSHIPV2,
DELETE_RELATIONSHIP,
CURSOR_QUERY_V1,
DEFERRED_RESPONSE_QUERY,
CREATE_INSTANCE,
INTEGRATION_JOB_VALUES,
INTEGRATION_INSTANCE_EVENT_VALUES,
Expand Down Expand Up @@ -265,6 +266,86 @@ def _limit_and_skip_query(
page += 1

return {"data": results}

def query_with_deferred_response(self, query, cursor=None):
"""
Execute a J1QL query that returns a deferred response for handling large result sets.

Args:
query (str): The J1QL query to execute
cursor (str, optional): Pagination cursor for subsequent requests

Returns:
list: Combined results from all paginated responses
"""
all_query_results = []
current_cursor = cursor

while True:
variables = {
"query": query,
"deferredResponse": "FORCE",
"cursor": current_cursor,
"flags": {"variableResultSize": True}
}

payload = {
"query": DEFERRED_RESPONSE_QUERY,
"variables": variables
}

# Use session with retries for reliability
max_retries = 5
backoff_factor = 2

for attempt in range(1, max_retries + 1):

session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 429])
session.mount('https://', HTTPAdapter(max_retries=retries))

# Get the download URL
url_response = session.post(
self.graphql_url,
headers=self.headers,
json=payload,
timeout=60
)

if url_response.status_code == 429:
retry_after = int(url_response.headers.get("Retry-After", backoff_factor ** attempt))
print(f"Rate limited. Retrying in {retry_after} seconds...")
time.sleep(retry_after)
else:
break # Exit on success or other non-retryable error

if url_response.ok:

download_url = url_response.json()['data']['queryV1']['url']

# Poll the download URL until results are ready
while True:
download_response = session.get(download_url, timeout=60).json()
status = download_response['status']

if status != 'IN_PROGRESS':
break

time.sleep(0.2) # Sleep 200 milliseconds between checks

# Add results to the collection
all_query_results.extend(download_response['data'])

# Check for more pages
if 'cursor' in download_response:
current_cursor = download_response['cursor']
else:
break

else:
print(f"Request failed after {max_retries} attempts. Status: {url_response.status_code}")

return all_query_results

def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict:
"""Executes POST request to SyncAPI endpoints"""
Expand Down
37 changes: 27 additions & 10 deletions jupiterone/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
entityRawDataLegacy(entityId: $entityId, , source: $source) {
entityId
payload {

... on RawDataJSONEntityLegacy {
contentType
name
Expand Down Expand Up @@ -360,7 +359,7 @@
...IntegrationDefinitionConfigFragment @include(if: $includeConfig)
__typename
}

fragment IntegrationDefinitionConfigFragment on IntegrationDefinition {
configFields {
...ConfigFieldsRecursive
Expand All @@ -387,7 +386,7 @@
}
__typename
}

fragment ConfigFieldsRecursive on ConfigField {
...ConfigFieldValues
configFields {
Expand All @@ -400,7 +399,7 @@
}
__typename
}

fragment ConfigFieldValues on ConfigField {
key
displayName
Expand Down Expand Up @@ -450,7 +449,7 @@
}
__typename
}

query IntegrationInstances($definitionId: String, $cursor: String, $limit: Int, $filter: ListIntegrationInstancesSearchFilter) {
integrationInstancesV2(
definitionId: $definitionId
Expand Down Expand Up @@ -507,7 +506,7 @@
collectorPoolId
__typename
}

fragment IntegrationInstanceJobValues on IntegrationJob {
id
status
Expand All @@ -517,7 +516,7 @@
hasSkippedSteps
__typename
}

query IntegrationInstance($integrationInstanceId: String!) {
integrationInstance(id: $integrationInstanceId) {
...IntegrationInstanceValues
Expand Down Expand Up @@ -577,6 +576,24 @@
}
}
"""
DEFERRED_RESPONSE_QUERY = """
query J1QL(
$query: String!
$variables: JSON
$cursor: String
$deferredResponse: DeferredResponseOption
) {
queryV1(
query: $query
variables: $variables
deferredResponse: $deferredResponse
cursor: $cursor
) {
type
url
}
}
"""
J1QL_FROM_NATURAL_LANGUAGE = """
query j1qlFromNaturalLanguage($input: J1qlFromNaturalLanguageInput!) {
j1qlFromNaturalLanguage(input: $input) {
Expand Down Expand Up @@ -660,7 +677,7 @@
__typename
}
}

fragment RuleInstanceFields on QuestionRuleInstance {
id
accountId
Expand Down Expand Up @@ -726,7 +743,7 @@
__typename
}
}

fragment RuleInstanceFields on QuestionRuleInstance {
id
accountId
Expand Down Expand Up @@ -869,7 +886,7 @@
__typename
}
}

fragment QuestionFields on Question {
id
sourceId
Expand Down
Loading