Skip to content

Commit 6cfc436

Browse files
CM-61550: Show upload progress and detect slow connections during scan (#419)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent cf6379c commit 6cfc436

File tree

5 files changed

+216
-18
lines changed

5 files changed

+216
-18
lines changed

cycode/cli/apps/scan/code_scanner.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,36 @@
4747
logger = get_logger('Code Scanner')
4848

4949

50+
class _UploadProgressAggregator:
51+
"""Aggregates upload progress across parallel batch uploads for display in the progress bar."""
52+
53+
def __init__(self, progress_bar: 'BaseProgressBar') -> None:
54+
self._progress_bar = progress_bar
55+
self._slots: list[list[int]] = []
56+
57+
def create_callback(self) -> Callable[[int, int], None]:
58+
"""Create a progress callback for one batch upload. Each batch gets its own slot."""
59+
slot = [0, 0]
60+
self._slots.append(slot)
61+
62+
def on_upload_progress(bytes_read: int, total_bytes: int) -> None:
63+
slot[0] = bytes_read
64+
slot[1] = total_bytes
65+
66+
# Sum across all batch slots to show combined progress
67+
total_read = sum(s[0] for s in self._slots)
68+
total_size = sum(s[1] for s in self._slots)
69+
70+
if total_read >= total_size:
71+
self._progress_bar.update_right_side_label(None)
72+
else:
73+
mb_read = total_read / (1024 * 1024)
74+
mb_total = total_size / (1024 * 1024)
75+
self._progress_bar.update_right_side_label(f'Uploading {mb_read:.1f} / {mb_total:.1f} MB')
76+
77+
return on_upload_progress
78+
79+
5080
def scan_disk_files(ctx: typer.Context, paths: tuple[str, ...]) -> None:
5181
scan_type = ctx.obj['scan_type']
5282
progress_bar = ctx.obj['progress_bar']
@@ -121,6 +151,9 @@ def _get_scan_documents_thread_func(
121151
severity_threshold = ctx.obj['severity_threshold']
122152
sync_option = ctx.obj['sync']
123153
command_scan_type = ctx.info_name
154+
progress_bar = ctx.obj['progress_bar']
155+
156+
aggregator = _UploadProgressAggregator(progress_bar)
124157

125158
def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, LocalScanResult]:
126159
local_scan_result = error = error_message = None
@@ -143,6 +176,7 @@ def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, Local
143176
is_commit_range,
144177
scan_parameters,
145178
should_use_sync_flow,
179+
on_upload_progress=aggregator.create_callback(),
146180
)
147181

148182
enrich_scan_result_with_data_from_detection_rules(cycode_client, scan_result)
@@ -268,11 +302,14 @@ def _perform_scan_v4_async(
268302
scan_parameters: dict,
269303
is_git_diff: bool,
270304
is_commit_range: bool,
305+
on_upload_progress: Optional[Callable] = None,
271306
) -> ZippedFileScanResult:
272307
upload_link = cycode_client.get_upload_link(scan_type)
273308
logger.debug('Got upload link, %s', {'upload_id': upload_link.upload_id})
274309

275-
cycode_client.upload_to_presigned_post(upload_link.url, upload_link.presigned_post_fields, zipped_documents)
310+
cycode_client.upload_to_presigned_post(
311+
upload_link.url, upload_link.presigned_post_fields, zipped_documents, on_upload_progress
312+
)
276313
logger.debug('Uploaded zip to presigned URL')
277314

278315
scan_async_result = cycode_client.scan_repository_from_upload_id(
@@ -292,9 +329,14 @@ def _perform_scan_async(
292329
scan_type: str,
293330
scan_parameters: dict,
294331
is_commit_range: bool,
332+
on_upload_progress: Optional[Callable] = None,
295333
) -> ZippedFileScanResult:
296334
scan_async_result = cycode_client.zipped_file_scan_async(
297-
zipped_documents, scan_type, scan_parameters, is_commit_range=is_commit_range
335+
zipped_documents,
336+
scan_type,
337+
scan_parameters,
338+
is_commit_range=is_commit_range,
339+
on_upload_progress=on_upload_progress,
298340
)
299341
logger.debug('Async scan request has been triggered successfully, %s', {'scan_id': scan_async_result.scan_id})
300342

@@ -326,6 +368,7 @@ def _perform_scan(
326368
is_commit_range: bool,
327369
scan_parameters: dict,
328370
should_use_sync_flow: bool = False,
371+
on_upload_progress: Optional[Callable] = None,
329372
) -> ZippedFileScanResult:
330373
if should_use_sync_flow:
331374
# it does not support commit range scans; should_use_sync_flow handles it
@@ -334,12 +377,20 @@ def _perform_scan(
334377
if should_use_presigned_upload(scan_type):
335378
try:
336379
return _perform_scan_v4_async(
337-
cycode_client, zipped_documents, scan_type, scan_parameters, is_git_diff, is_commit_range
380+
cycode_client,
381+
zipped_documents,
382+
scan_type,
383+
scan_parameters,
384+
is_git_diff,
385+
is_commit_range,
386+
on_upload_progress,
338387
)
339388
except requests.exceptions.RequestException:
340389
logger.warning('Direct upload to object storage failed. Falling back to upload via Cycode API. ')
341390

342-
return _perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range)
391+
return _perform_scan_async(
392+
cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range, on_upload_progress
393+
)
343394

344395

345396
def poll_scan_results(

cycode/cli/exceptions/custom_exceptions.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ def __str__(self) -> str:
5555
return f'HTTP unauthorized error occurred during the request. Message: {self.error_message}'
5656

5757

58+
class SlowUploadConnectionError(CycodeError):
59+
def __str__(self) -> str:
60+
return 'Upload was interrupted mid-transfer, indicating a slow or unstable network connection.'
61+
62+
5863
class ZipTooLargeError(CycodeError):
5964
def __init__(self, size_limit: int) -> None:
6065
self.size_limit = size_limit
@@ -102,6 +107,12 @@ def __str__(self) -> str:
102107
code='timeout_error',
103108
message='The request timed out. Please try again by executing the `cycode scan` command',
104109
),
110+
SlowUploadConnectionError: CliError(
111+
soft_fail=True,
112+
code='slow_upload_error',
113+
message='The scan upload was interrupted. This is likely due to a slow or unstable network connection. '
114+
'Please try again by executing the `cycode scan` command',
115+
),
105116
HttpUnauthorizedError: CliError(
106117
soft_fail=True,
107118
code='auth_error',

cycode/cyclient/cycode_client_base.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import platform
33
import ssl
4+
from io import BytesIO
45
from typing import TYPE_CHECKING, Callable, ClassVar, Optional
56

67
import requests
@@ -15,6 +16,7 @@
1516
RequestHttpError,
1617
RequestSslError,
1718
RequestTimeoutError,
19+
SlowUploadConnectionError,
1820
)
1921
from cycode.cyclient import config
2022
from cycode.cyclient.headers import get_cli_user_agent, get_correlation_id
@@ -90,6 +92,23 @@ def _should_retry_exception(exception: BaseException) -> bool:
9092
return is_request_error or is_server_error
9193

9294

95+
class UploadProgressTracker:
96+
"""File-like wrapper that tracks bytes read during upload and fires a progress callback."""
97+
98+
def __init__(self, data: bytes, callback: Optional[Callable[[int, int], None]]) -> None:
99+
self._io = BytesIO(data)
100+
self._callback = callback
101+
self.bytes_read = 0
102+
self.len = len(data)
103+
104+
def read(self, size: int = -1) -> bytes:
105+
chunk = self._io.read(size)
106+
self.bytes_read += len(chunk)
107+
if self._callback and chunk:
108+
self._callback(self.bytes_read, self.len)
109+
return chunk
110+
111+
93112
class CycodeClientBase:
94113
MANDATORY_HEADERS: ClassVar[dict[str, str]] = {
95114
'User-Agent': get_cli_user_agent(),
@@ -117,6 +136,72 @@ def put(self, url_path: str, body: Optional[dict] = None, headers: Optional[dict
117136
def get(self, url_path: str, headers: Optional[dict] = None, **kwargs) -> Response:
118137
return self._execute(method='get', endpoint=url_path, headers=headers, **kwargs)
119138

139+
def post_multipart(
140+
self,
141+
url_path: str,
142+
form_fields: dict,
143+
files: dict,
144+
on_upload_progress: Optional[Callable[[int, int], None]] = None,
145+
hide_response_content_log: bool = False,
146+
) -> Response:
147+
"""POST a multipart form body with optional upload progress tracking and retry."""
148+
url = self.build_full_url(self.api_url, url_path)
149+
logger.debug('Executing request, %s', {'method': 'POST', 'url': url})
150+
151+
# Encode the multipart body once up front so we can reuse the same bytes across retries.
152+
# A dummy URL is used because requests.Request requires one, but only the encoded body matters here.
153+
prepared = requests.Request('POST', 'https://dummy', data=form_fields, files=files).prepare()
154+
155+
return self._send_multipart(
156+
url=url,
157+
body=prepared.body,
158+
content_type=prepared.headers['Content-Type'],
159+
on_upload_progress=on_upload_progress,
160+
hide_response_content_log=hide_response_content_log,
161+
)
162+
163+
@retry(
164+
retry=retry_if_exception(_should_retry_exception),
165+
stop=_RETRY_STOP_STRATEGY,
166+
wait=_RETRY_WAIT_STRATEGY,
167+
reraise=True,
168+
before_sleep=_retry_before_sleep,
169+
)
170+
def _send_multipart(
171+
self,
172+
url: str,
173+
body: bytes,
174+
content_type: str,
175+
on_upload_progress: Optional[Callable[[int, int], None]],
176+
hide_response_content_log: bool,
177+
) -> Response:
178+
# Wrap the body in a fresh tracker each attempt so bytes_read starts from zero.
179+
tracker = UploadProgressTracker(body, on_upload_progress)
180+
headers = self.get_request_headers({'Content-Type': content_type})
181+
try:
182+
response = _get_request_function()(
183+
method='post', url=url, data=tracker, headers=headers, timeout=self.timeout
184+
)
185+
186+
content = 'HIDDEN' if hide_response_content_log else response.text
187+
logger.debug(
188+
'Receiving response, %s',
189+
{'status_code': response.status_code, 'url': url, 'content': content},
190+
)
191+
192+
response.raise_for_status()
193+
return response
194+
except (exceptions.ChunkedEncodingError, exceptions.ConnectionError) as e:
195+
# A connection drop before the full body was sent indicates a slow/unstable network.
196+
if tracker.bytes_read < tracker.len:
197+
raise SlowUploadConnectionError from e
198+
# Full body was sent — map to our types so _should_retry_exception handles retry logic.
199+
if isinstance(e, exceptions.ConnectionError):
200+
raise RequestConnectionError from e
201+
raise
202+
except Exception as e:
203+
self._handle_exception(e)
204+
120205
@retry(
121206
retry=retry_if_exception(_should_retry_exception),
122207
stop=_RETRY_STOP_STRATEGY,

cycode/cyclient/scan_client.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
import json
22
from copy import deepcopy
3-
from typing import TYPE_CHECKING, Optional, Union
3+
from typing import TYPE_CHECKING, Callable, Optional, Union
44
from uuid import UUID
55

66
import requests
77
from requests import Response
88

99
from cycode.cli import consts
1010
from cycode.cli.config import configuration_manager
11-
from cycode.cli.exceptions.custom_exceptions import CycodeError, RequestHttpError
11+
from cycode.cli.exceptions.custom_exceptions import (
12+
CycodeError,
13+
RequestHttpError,
14+
SlowUploadConnectionError,
15+
)
1216
from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip
1317
from cycode.cyclient import models
14-
from cycode.cyclient.cycode_client_base import CycodeClientBase
18+
from cycode.cyclient.cycode_client_base import CycodeClientBase, UploadProgressTracker
1519
from cycode.cyclient.logger import logger
1620

1721
if TYPE_CHECKING:
@@ -114,18 +118,18 @@ def zipped_file_scan_async(
114118
scan_parameters: dict,
115119
is_git_diff: bool = False,
116120
is_commit_range: bool = False,
121+
on_upload_progress: Optional[Callable[[int, int], None]] = None,
117122
) -> models.ScanInitializationResponse:
118-
files = {'file': ('multiple_files_scan.zip', zip_file.read())}
119-
120-
response = self.scan_cycode_client.post(
123+
response = self.scan_cycode_client.post_multipart(
121124
url_path=self.get_zipped_file_scan_async_url_path(scan_type),
122-
data={
125+
form_fields={
123126
'is_git_diff': is_git_diff,
124127
'scan_parameters': json.dumps(scan_parameters),
125128
'is_commit_range': is_commit_range,
126129
'compression_manifest': self._create_compression_manifest_string(zip_file),
127130
},
128-
files=files,
131+
files={'file': ('multiple_files_scan.zip', zip_file.read(), 'application/octet-stream')},
132+
on_upload_progress=on_upload_progress,
129133
)
130134
return models.ScanInitializationResponseSchema().load(response.json())
131135

@@ -135,12 +139,32 @@ def get_upload_link(self, scan_type: str) -> models.UploadLinkResponse:
135139
response = self.scan_cycode_client.get(url_path=url_path, hide_response_content_log=self._hide_response_log)
136140
return models.UploadLinkResponseSchema().load(response.json())
137141

138-
def upload_to_presigned_post(self, url: str, fields: dict[str, str], zip_file: 'InMemoryZip') -> None:
139-
multipart = {key: (None, value) for key, value in fields.items()}
140-
multipart['file'] = (None, zip_file.read())
141-
# We are not using Cycode client, as we are calling aws S3.
142-
response = requests.post(url, files=multipart, timeout=self.scan_cycode_client.timeout)
143-
response.raise_for_status()
142+
def upload_to_presigned_post(
143+
self,
144+
url: str,
145+
fields: dict[str, str],
146+
zip_file: 'InMemoryZip',
147+
on_upload_progress: Optional[Callable[[int, int], None]] = None,
148+
) -> None:
149+
all_files = {key: (None, value) for key, value in fields.items()}
150+
all_files['file'] = ('multiple_files_scan.zip', zip_file.read(), 'application/octet-stream')
151+
152+
prepared = requests.Request('POST', 'https://dummy', files=all_files).prepare()
153+
tracker = UploadProgressTracker(prepared.body, on_upload_progress)
154+
155+
try:
156+
# We are not using Cycode client, as we are calling aws S3.
157+
response = requests.post(
158+
url,
159+
data=tracker,
160+
headers={'Content-Type': prepared.headers['Content-Type']},
161+
timeout=self.scan_cycode_client.timeout,
162+
)
163+
response.raise_for_status()
164+
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as e:
165+
if tracker.bytes_read < tracker.len:
166+
raise SlowUploadConnectionError from e
167+
raise
144168

145169
def scan_repository_from_upload_id(
146170
self,

tests/cyclient/test_scan_client.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55
import requests
66
import responses
7+
from pytest_mock import MockerFixture
78
from requests.exceptions import ConnectionError as RequestsConnectionError
89

910
from cycode.cli.cli_types import ScanTypeOption
@@ -12,6 +13,7 @@
1213
HttpUnauthorizedError,
1314
RequestConnectionError,
1415
RequestTimeoutError,
16+
SlowUploadConnectionError,
1517
)
1618
from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip
1719
from cycode.cli.models import Document
@@ -168,3 +170,28 @@ def test_get_scan_details(
168170
scan_details_response = scan_client.get_scan_details(scan_type, str(scan_id))
169171
assert scan_details_response.id == str(scan_id)
170172
assert scan_details_response.scan_status == 'Completed'
173+
174+
175+
@pytest.mark.parametrize('scan_type', list(ScanTypeOption))
176+
def test_zipped_file_scan_async_slow_upload_error(
177+
scan_type: ScanTypeOption, scan_client: ScanClient, mocker: MockerFixture
178+
) -> None:
179+
"""Test that a connection failure mid-transfer raises SlowUploadConnectionError."""
180+
zip_file = get_test_zip_file(scan_type)
181+
182+
def _partial_upload_then_fail(**kwargs) -> None:
183+
# Read only a small portion of the body to simulate a partial upload
184+
data = kwargs.get('data')
185+
if data is not None:
186+
data.read(10)
187+
raise requests.exceptions.ChunkedEncodingError('Connection broken mid-transfer')
188+
189+
mocker.patch('cycode.cyclient.cycode_client_base._get_request_function', return_value=_partial_upload_then_fail)
190+
mocker.patch.object(
191+
scan_client.scan_cycode_client,
192+
'get_request_headers',
193+
return_value={'Authorization': 'Bearer test'},
194+
)
195+
196+
with pytest.raises(SlowUploadConnectionError):
197+
scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={})

0 commit comments

Comments
 (0)