diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e679ea5..ef00fd2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,8 +127,8 @@ jobs: exit 1 fi - - name: Run E2E upload test + - name: Run E2E tests and examples env: TEST_NODE_PARITY: 0 run: | - poetry run pytest tests/test_e2e_upload.py -q --maxfail=1 --no-cov + poetry run pytest tests/test_e2e_upload.py tests/test_examples.py -q --maxfail=1 --no-cov diff --git a/CHANGELOG.md b/CHANGELOG.md index 631fabd..043f196 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ### 2.0.0 / 2026-05-20 ### +* @TODO before merging: expand the SDK surface so this release supports all Transloadit API endpoints, and update these release notes with the final endpoint coverage. * **Breaking Change**: Raised the supported Python runtime floor from 3.9+ to 3.12+ so the SDK no longer has to retain vulnerable locked dependency versions for EOL Python 3.9 or depend on tooling lines that are already dropping older runtime support. +* Added explicit asyncio support with `AsyncTransloadit`, async request/assembly/template helpers, and `asyncio.sleep`-based polling. Resumable uploads stay on the existing TUS client, but run through `asyncio.to_thread()` so the event loop remains responsive instead of pretending the sync uploader is natively async. +* Hardened upload and response edge cases: invalid service URLs and empty template IDs now fail fast, external absolute API URLs are no longer signed, sync TUS uploads now handle nameless streams and submit rate limits before uploading, async form fields match sync boolean serialization, async TUS cancellation waits for worker cleanup, async polling rate-limit retries reset after successful polls, async rate-limit backoff honors server `retryIn`, Smart CDN signing rejects invalid workspace slugs/reserved query keys, and sync non-JSON responses fall back to response text. +* Hardened sync and async request handling by preserving custom `auth` constraints, quoting path IDs, and keeping explicit/custom service URLs compatible with local, CI, and [Transloadit Gateway](https://github.com/transloadit/gateway) deployments. +* Fixed sync and async template creation to send the current API `template` payload shape. * Raised the runtime HTTP stack to patched versions by requiring `requests` 2.33+ and adding an explicit `urllib3` 2.7+ floor. * Updated development and documentation tooling, including `pytest` 9.0.3, `Sphinx` 9.1, `sphinx-autobuild` 2025.8, `coverage` 7.14, `tox` 4.54, and `requests-mock` 1.12. * Updated CI and local Docker test coverage to a representative Python 3.12, 3.13, and 3.14 matrix. diff --git a/README.md b/README.md index 51d6f4e..74e6b4a 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,33 @@ print(assembly_response.data.get('assembly_id')) print(assembly_response.data['assembly_id']) ``` -## Example +## Async usage -For fully working examples, take a look at [`examples/`](https://github.com/transloadit/python-sdk/tree/HEAD/examples). +```python +from transloadit.async_client import AsyncTransloadit + +async with AsyncTransloadit("TRANSLOADIT_KEY", "TRANSLOADIT_SECRET") as tl: + response = await tl.get_assembly(assembly_id="abc") + print(response.data["ok"]) + + assembly = tl.new_assembly() + assembly.add_step("resize", "/image/resize", {"width": 70, "height": 70}) + with open("PATH/TO/FILE.jpg", "rb") as upload: + assembly.add_file(upload) + response = await assembly.create(wait=True, resumable=False) +``` + +The async client keeps polling on `asyncio.sleep`. Resumable uploads still use the existing TUS client, but are offloaded with `asyncio.to_thread()` so the event loop stays responsive. + +If you do not use `async with`, call `await tl.aclose()` when you are done with the session. + +## Examples + +For copy/paste runnable examples, take a look at +[`examples/`](https://github.com/transloadit/python-sdk/tree/HEAD/examples). + +The examples cover sync uploads, async uploads, resumable uploads, Template usage, +sync and async Template lifecycle management, and Smart CDN URL signing. ## Documentation @@ -60,17 +84,17 @@ This script will: - install Poetry, Node.js 24, and the Transloadit CLI - pass credentials from `.env` (if present) so end-to-end tests can run against real Transloadit accounts -Signature parity tests use `npx transloadit smart_sig` under the hood, matching the reference implementation used by our other SDKs. Our GitHub Actions workflow also runs the E2E upload against Python 3.14 on every push/PR using a dedicated Transloadit test account (wired through the `TRANSLOADIT_KEY` and `TRANSLOADIT_SECRET` secrets). +Signature parity tests use `npx transloadit smart_sig` under the hood, matching the reference implementation used by our other SDKs. Our GitHub Actions workflow also runs the E2E upload and quickstart examples against Python 3.14 on every push/PR using a dedicated Transloadit test account (wired through the `TRANSLOADIT_KEY` and `TRANSLOADIT_SECRET` secrets). Pass `--python 3.14` (or set `PYTHON_VERSIONS`) to restrict the matrix, or append a custom command after `--`, for example `scripts/test-in-docker.sh -- pytest -k smartcdn`. To exercise the optional end-to-end upload against a real Transloadit account, provide `TRANSLOADIT_KEY` and `TRANSLOADIT_SECRET` (via environment variables or `.env`) and set `PYTHON_SDK_E2E=1`: ```bash -PYTHON_SDK_E2E=1 scripts/test-in-docker.sh --python 3.14 -- pytest tests/test_e2e_upload.py +PYTHON_SDK_E2E=1 scripts/test-in-docker.sh --python 3.14 -- pytest tests/test_e2e_upload.py tests/test_examples.py ``` -The test uploads `chameleon.jpg`, resizes it, and asserts on the live assembly results. +The tests upload `chameleon.jpg`, run the copy/paste quickstart examples, and assert on the live assembly results. If you have a global installation of `poetry`, you can run the tests with: diff --git a/docs/source/index.rst b/docs/source/index.rst index 67c428f..9d987b4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -66,9 +66,31 @@ Usage # or print(assembly_response.data['assembly_id']) -Example -------- +Async usage +----------- + +.. code:: python + + from transloadit.async_client import AsyncTransloadit + + async with AsyncTransloadit('TRANSLOADIT_KEY', 'TRANSLOADIT_SECRET') as tl: + response = await tl.get_assembly(assembly_id='abc') + print(response.data['ok']) + + assembly = tl.new_assembly() + assembly.add_step('resize', '/image/resize', {'width': 70, 'height': 70}) + with open('PATH/TO/FILE.jpg', 'rb') as upload: + assembly.add_file(upload) + response = await assembly.create(wait=True, resumable=False) + +If you do not use ``async with``, call ``await tl.aclose()`` when you are done with the session. + +Examples +-------- + +For copy/paste runnable examples, take a look at `examples/`_. -For fully working examples, take a look at `examples/`_. +The examples cover sync uploads, async uploads, resumable uploads, Template usage, +sync and async Template lifecycle management, and Smart CDN URL signing. .. _examples/: https://github.com/transloadit/python-sdk/tree/HEAD/examples diff --git a/docs/source/transloadit.rst b/docs/source/transloadit.rst index b8e1a71..aba5aab 100644 --- a/docs/source/transloadit.rst +++ b/docs/source/transloadit.rst @@ -49,6 +49,38 @@ transloadit.request module :undoc-members: :show-inheritance: +transloadit.async_client module +------------------------------- + +.. automodule:: transloadit.async_client + :members: + :undoc-members: + :show-inheritance: + +transloadit.async_assembly module +---------------------------------- + +.. automodule:: transloadit.async_assembly + :members: + :undoc-members: + :show-inheritance: + +transloadit.async_template module +---------------------------------- + +.. automodule:: transloadit.async_template + :members: + :undoc-members: + :show-inheritance: + +transloadit.async_request module +-------------------------------- + +.. automodule:: transloadit.async_request + :members: + :undoc-members: + :show-inheritance: + transloadit.response module --------------------------- @@ -57,4 +89,3 @@ transloadit.response module :undoc-members: :show-inheritance: - diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..4253a14 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,41 @@ +# Transloadit Python SDK Examples + +Run the examples from the repository root after installing the project: + +```bash +poetry install +export TRANSLOADIT_KEY="YOUR_TRANSLOADIT_KEY" +export TRANSLOADIT_SECRET="YOUR_TRANSLOADIT_SECRET" +``` + +## Quickstart Examples + +```bash +poetry run python examples/image_resize.py +poetry run python examples/async_image_resize.py +poetry run python examples/resumable_upload.py +poetry run python examples/assembly_with_template.py +poetry run python examples/template_lifecycle.py +poetry run python examples/async_template_lifecycle.py +poetry run python examples/smart_cdn_url.py +``` + +`smart_cdn_url.py` only signs a URL locally. The other quickstart examples contact +Transloadit and may create temporary Assemblies or Templates in your account. + +These quickstart examples run in CI against a dedicated Transloadit test account, so they +are kept in sync with the SDK and API. + +## Advanced Examples + +These examples require pre-created Templates and, depending on your Template, third-party +provider configuration: + +```bash +export TRANSLOADIT_TTS_TEMPLATE_ID="YOUR_TEMPLATE_ID" +poetry run python examples/file_to_tts.py + +export TRANSLOADIT_TRANSCRIBE_TEMPLATE_ID="YOUR_TRANSCRIBE_TEMPLATE_ID" +export TRANSLOADIT_TRANSLATE_TEMPLATE_ID="YOUR_TRANSLATE_TEMPLATE_ID" +poetry run python examples/video_translator.py +``` diff --git a/examples/assembly_with_template.py b/examples/assembly_with_template.py new file mode 100644 index 0000000..f76f59f --- /dev/null +++ b/examples/assembly_with_template.py @@ -0,0 +1,78 @@ +"""Create a temporary Template and use it to process an uploaded image. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/assembly_with_template.py +""" + +import os +from pathlib import Path +from uuid import uuid4 + +from transloadit.client import Transloadit + + +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def get_example_image_path(): + return Path(__file__).resolve().parent / "fixtures" / "lol_cat.jpg" + + +def extract_template_id(response_data): + template_id = response_data.get("id") or response_data.get("template_id") + if not template_id: + raise RuntimeError(f"Template response did not contain an id: {response_data}") + return template_id + + +def first_result_url(response_data, step_name): + results = (response_data.get("results") or {}).get(step_name) or [] + if not results: + raise RuntimeError(f"No results found for step {step_name!r}: {response_data}") + url = results[0].get("ssl_url") or results[0].get("url") + if not url: + raise RuntimeError(f"No result URL found for step {step_name!r}: {response_data}") + return url + + +def create_resize_template(client): + template = client.new_template(f"python-sdk-template-example-{uuid4().hex[:12]}") + template.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": 120, + "height": 120, + "resize_strategy": "fit", + "format": "png", + }, + ) + return extract_template_id(template.create().data) + + +def main(): + key, secret = get_credentials() + client = Transloadit(key, secret) + template_id = create_resize_template(client) + + try: + assembly = client.new_assembly({"template_id": template_id}) + with get_example_image_path().open("rb") as upload: + assembly.add_file(upload, "image") + response = assembly.create(wait=True, resumable=False) + + print("Assembly:", response.data.get("assembly_ssl_url") or response.data.get("assembly_url")) + print("Template result:", first_result_url(response.data, "resize")) + finally: + client.delete_template(template_id) + + +if __name__ == "__main__": + main() diff --git a/examples/async_image_resize.py b/examples/async_image_resize.py new file mode 100644 index 0000000..5b240e9 --- /dev/null +++ b/examples/async_image_resize.py @@ -0,0 +1,62 @@ +"""Upload and resize an image with the async client. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/async_image_resize.py +""" + +import asyncio +import os +from pathlib import Path + +from transloadit.async_client import AsyncTransloadit + + +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def get_example_image_path(): + return Path(__file__).resolve().parent / "fixtures" / "lol_cat.jpg" + + +def first_result_url(response_data, step_name): + results = (response_data.get("results") or {}).get(step_name) or [] + if not results: + raise RuntimeError(f"No results found for step {step_name!r}: {response_data}") + url = results[0].get("ssl_url") or results[0].get("url") + if not url: + raise RuntimeError(f"No result URL found for step {step_name!r}: {response_data}") + return url + + +async def main(): + key, secret = get_credentials() + + async with AsyncTransloadit(key, secret) as client: + assembly = client.new_assembly() + with get_example_image_path().open("rb") as upload: + assembly.add_file(upload, "image") + assembly.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": 120, + "height": 120, + "resize_strategy": "fit", + "format": "png", + }, + ) + response = await assembly.create(wait=True, resumable=False) + + print("Assembly:", response.data.get("assembly_ssl_url") or response.data.get("assembly_url")) + print("Resized image:", first_result_url(response.data, "resize")) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/async_template_lifecycle.py b/examples/async_template_lifecycle.py new file mode 100644 index 0000000..db4f4e8 --- /dev/null +++ b/examples/async_template_lifecycle.py @@ -0,0 +1,86 @@ +"""Create, fetch, update, and delete a Template with the async client. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/async_template_lifecycle.py +""" + +import asyncio +import os +from uuid import uuid4 + +from transloadit.async_client import AsyncTransloadit + + +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def extract_template_id(response_data): + template_id = response_data.get("id") or response_data.get("template_id") + if not template_id: + raise RuntimeError(f"Template response did not contain an id: {response_data}") + return template_id + + +async def main(): + key, secret = get_credentials() + template_id = None + deleted = False + template_name = f"python-sdk-async-example-{uuid4().hex[:12]}" + + async with AsyncTransloadit(key, secret) as client: + try: + template = client.new_template(template_name) + template.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": 120, + "height": 120, + "resize_strategy": "fit", + "format": "png", + }, + ) + created = await template.create() + template_id = extract_template_id(created.data) + print("Created template:", template_id) + + fetched = await client.get_template(template_id) + print("Fetched template:", fetched.data.get("name") or fetched.data.get("template_name")) + + updated = await client.update_template( + template_id, + { + "name": f"{template_name}-updated", + "template": { + "steps": { + "resize": { + "use": ":original", + "robot": "/image/resize", + "width": 96, + "height": 96, + "resize_strategy": "fit", + "format": "jpg", + }, + }, + }, + }, + ) + print("Updated template:", updated.data.get("ok")) + + deleted_response = await client.delete_template(template_id) + print("Deleted template:", deleted_response.data.get("ok")) + deleted = True + finally: + if template_id and not deleted: + await client.delete_template(template_id) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/file_to_tts.py b/examples/file_to_tts.py index a3b0127..bfec770 100644 --- a/examples/file_to_tts.py +++ b/examples/file_to_tts.py @@ -1,18 +1,11 @@ -### A simple Transloadit Assembly that converts a .doc file to a .txt file. It then generates a .mp3 based on the .txt file using a text-to-speech Robot. +"""Advanced example: process a document with a pre-created text-to-speech template. -''' -Template: +This requires a Template in your Transloadit account with steps similar to: { "steps": { - ":original": { - "robot": "/upload/handle" - }, - "convert": { - "use": ":original", - "robot": "/document/convert", - "format": "txt" - }, + ":original": {"robot": "/upload/handle"}, + "convert": {"use": ":original", "robot": "/document/convert", "format": "txt"}, "speech": { "use": "convert", "robot": "/text/speak", @@ -23,22 +16,51 @@ } } } -''' -from transloadit import client +Run from the repository root: -tl = client.Transloadit('TRANSLOADIT_KEY', 'TRANSLOADIT_SECRET') + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy TRANSLOADIT_TTS_TEMPLATE_ID=xxx \ + poetry run python examples/file_to_tts.py +""" -def useTemplate(templateID, file_path='', result_name='', get_url=True, fields=''): - assembly = tl.new_assembly({'template_id': templateID, 'fields': fields}) +import os +from pathlib import Path - if file_path != '': - assembly.add_file(open(file_path, 'rb')) +from transloadit.client import Transloadit - assembly_response = assembly.create(retries=5, wait=True) - if get_url: - assembly_url = assembly_response.data.get('results').get(result_name)[0].get('ssl_url') - print(assembly_url) - return assembly_url - -useTemplate ('TEMPLATE_ID', file_path='fixtures/document.doc', result_name='speech', get_url=True) \ No newline at end of file + +def get_required_env(name): + value = os.getenv(name) + if not value: + raise RuntimeError(f"Please set {name}.") + return value + + +def first_result_url(response_data, step_name): + results = (response_data.get("results") or {}).get(step_name) or [] + if not results: + raise RuntimeError(f"No results found for step {step_name!r}: {response_data}") + url = results[0].get("ssl_url") or results[0].get("url") + if not url: + raise RuntimeError(f"No result URL found for step {step_name!r}: {response_data}") + return url + + +def main(): + client = Transloadit( + get_required_env("TRANSLOADIT_KEY"), + get_required_env("TRANSLOADIT_SECRET"), + ) + template_id = get_required_env("TRANSLOADIT_TTS_TEMPLATE_ID") + document_path = Path(__file__).resolve().parent / "fixtures" / "document.doc" + + assembly = client.new_assembly({"template_id": template_id}) + with document_path.open("rb") as upload: + assembly.add_file(upload, "document") + response = assembly.create(retries=5, wait=True) + + print("Generated speech:", first_result_url(response.data, "speech")) + + +if __name__ == "__main__": + main() diff --git a/examples/image_resize.py b/examples/image_resize.py index 8fc9bcf..0bb91b0 100644 --- a/examples/image_resize.py +++ b/examples/image_resize.py @@ -1,12 +1,61 @@ -### A simple Transloadit assembly that changes the size of an image. +"""Upload an image and resize it. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/image_resize.py +""" + +import os +from pathlib import Path from transloadit.client import Transloadit -client = Transloadit("TRANSLOADIT_KEY", "TRANSLOADIT_SECRET") -assembly = client.new_assembly() -assembly.add_file(open("fixtures/lol_cat.jpg", "rb")) -assembly.add_step("resize", "/image/resize", {"width": 70, "height": 70}) -response = assembly.create(wait=True) -result_url = response.data.get("results").get("resize")[0].get("ssl_url") -print("Your result:", result_url) +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def get_example_image_path(): + return Path(__file__).resolve().parent / "fixtures" / "lol_cat.jpg" + + +def first_result_url(response_data, step_name): + results = (response_data.get("results") or {}).get(step_name) or [] + if not results: + raise RuntimeError(f"No results found for step {step_name!r}: {response_data}") + url = results[0].get("ssl_url") or results[0].get("url") + if not url: + raise RuntimeError(f"No result URL found for step {step_name!r}: {response_data}") + return url + + +def main(): + key, secret = get_credentials() + client = Transloadit(key, secret) + assembly = client.new_assembly() + + with get_example_image_path().open("rb") as upload: + assembly.add_file(upload, "image") + assembly.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": 120, + "height": 120, + "resize_strategy": "fit", + "format": "png", + }, + ) + response = assembly.create(wait=True, resumable=False) + + print("Assembly:", response.data.get("assembly_ssl_url") or response.data.get("assembly_url")) + print("Resized image:", first_result_url(response.data, "resize")) + + +if __name__ == "__main__": + main() diff --git a/examples/resumable_upload.py b/examples/resumable_upload.py new file mode 100644 index 0000000..3a3e2f5 --- /dev/null +++ b/examples/resumable_upload.py @@ -0,0 +1,61 @@ +"""Upload an image with resumable TUS upload enabled. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/resumable_upload.py +""" + +import os +from pathlib import Path + +from transloadit.client import Transloadit + + +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def get_example_image_path(): + return Path(__file__).resolve().parent / "fixtures" / "lol_cat.jpg" + + +def first_result_url(response_data, step_name): + results = (response_data.get("results") or {}).get(step_name) or [] + if not results: + raise RuntimeError(f"No results found for step {step_name!r}: {response_data}") + url = results[0].get("ssl_url") or results[0].get("url") + if not url: + raise RuntimeError(f"No result URL found for step {step_name!r}: {response_data}") + return url + + +def main(): + key, secret = get_credentials() + client = Transloadit(key, secret) + assembly = client.new_assembly({"fields": {"example": "python-sdk-resumable-upload"}}) + + with get_example_image_path().open("rb") as upload: + assembly.add_file(upload, "image") + assembly.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": 120, + "height": 120, + "resize_strategy": "fit", + "format": "png", + }, + ) + response = assembly.create(wait=True, resumable=True) + + print("Assembly:", response.data.get("assembly_ssl_url") or response.data.get("assembly_url")) + print("Resumable upload result:", first_result_url(response.data, "resize")) + + +if __name__ == "__main__": + main() diff --git a/examples/smart_cdn_url.py b/examples/smart_cdn_url.py new file mode 100644 index 0000000..de2be36 --- /dev/null +++ b/examples/smart_cdn_url.py @@ -0,0 +1,36 @@ +"""Generate a signed Smart CDN URL. + +This example does not contact Transloadit. It only signs a URL locally. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/smart_cdn_url.py +""" + +import os + +from transloadit.client import Transloadit + + +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def main(): + key, secret = get_credentials() + client = Transloadit(key, secret) + url = client.get_signed_smart_cdn_url( + workspace=os.getenv("TRANSLOADIT_WORKSPACE", "example-workspace"), + template=os.getenv("TRANSLOADIT_TEMPLATE", "example-template"), + input=os.getenv("TRANSLOADIT_INPUT", "image.jpg"), + url_params={"width": 320, "height": 240, "fit": "crop"}, + ) + print(url) + + +if __name__ == "__main__": + main() diff --git a/examples/template_lifecycle.py b/examples/template_lifecycle.py new file mode 100644 index 0000000..4b3b333 --- /dev/null +++ b/examples/template_lifecycle.py @@ -0,0 +1,85 @@ +"""Create, fetch, update, and delete a Template. + +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy poetry run python examples/template_lifecycle.py +""" + +import os +from uuid import uuid4 + +from transloadit.client import Transloadit + + +def get_credentials(): + key = os.getenv("TRANSLOADIT_KEY") + secret = os.getenv("TRANSLOADIT_SECRET") + if not key or not secret: + raise RuntimeError("Please set TRANSLOADIT_KEY and TRANSLOADIT_SECRET.") + return key, secret + + +def extract_template_id(response_data): + template_id = response_data.get("id") or response_data.get("template_id") + if not template_id: + raise RuntimeError(f"Template response did not contain an id: {response_data}") + return template_id + + +def main(): + key, secret = get_credentials() + client = Transloadit(key, secret) + template_id = None + deleted = False + template_name = f"python-sdk-example-{uuid4().hex[:12]}" + + try: + template = client.new_template(template_name) + template.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": 120, + "height": 120, + "resize_strategy": "fit", + "format": "png", + }, + ) + created = template.create() + template_id = extract_template_id(created.data) + print("Created template:", template_id) + + fetched = client.get_template(template_id) + print("Fetched template:", fetched.data.get("name") or fetched.data.get("template_name")) + + updated = client.update_template( + template_id, + { + "name": f"{template_name}-updated", + "template": { + "steps": { + "resize": { + "use": ":original", + "robot": "/image/resize", + "width": 96, + "height": 96, + "resize_strategy": "fit", + "format": "jpg", + }, + }, + }, + }, + ) + print("Updated template:", updated.data.get("ok")) + + deleted_response = client.delete_template(template_id) + print("Deleted template:", deleted_response.data.get("ok")) + deleted = True + finally: + if template_id and not deleted: + client.delete_template(template_id) + + +if __name__ == "__main__": + main() diff --git a/examples/video_translator.py b/examples/video_translator.py index 02e9ff3..a016b3e 100644 --- a/examples/video_translator.py +++ b/examples/video_translator.py @@ -1,168 +1,135 @@ -### A Transloadit Assembly that adds a translated TTS-voice (english to dutch) to an input video of hermit crabs. - -''' -Template 1: - -{ - "steps": { - "transcribe_json": { - "use": ":original", - "robot": "/speech/transcribe", - "provider": "aws", - "source_language": "${fields.language}", - "format": "json", - "result": true - } - } -} -''' - -''' -Template 2: - -{ - "steps": { - ":original": { - "robot": "/upload/handle" - }, - "import_video": { - "robot": "/http/import", - "url": "${fields.video}" - }, - "translate": { - "use": ":original", - "robot": "/text/translate", - "provider": "gcp", - "target_language": "${fields.target_language}", - "source_language": "${fields.source_language}", - "result": true - }, - "speech": { - "use": "translate", - "robot": "/text/speak", - "provider": "gcp", - "target_language": "${fields.target_language}", - "voice": "female-1", - "ssml": true, - "result": true - }, - "extract_audio": { - "use": "import_video", - "robot": "/video/encode", - "result": true, - "preset": "mp3", - "ffmpeg": { - "af": "${fields.ffmpeg}" - }, - "ffmpeg_stack": "v3.3.3" - }, - "merged_audio": { - "robot": "/audio/merge", - "preset": "mp3", - "result": "true", - "ffmpeg_stack": "v4.3.1", - "use": { - "steps": [ - { - "name": "extract_audio", - "as": "audio" - }, - { - "name": "speech", - "as": "audio" - } - ], - "volume": "sum", - "bundle_steps": true - } - }, - "merged_video": { - "robot": "/video/merge", - "preset": "hls-720p", - "ffmpeg_stack": "v4.3.1", - "use": { - "steps": [ - { - "name": "merged_audio", - "as": "audio" - }, - { - "name": "import_video", - "as": "video" - } - ], - "bundle_steps": true - } - } - } -} -''' - -from transloadit import client -import urllib.request -import json +"""Advanced example: translate speech and merge translated audio into a video. + +This requires two pre-created Templates in your Transloadit account: + +1. A transcription Template that produces a `transcribe_json` result. +2. A video merge Template that accepts `video`, `target_language`, `source_language`, and + `ffmpeg` fields and produces a `merged_video` result. -tl = client.Transloadit('TRANSLOADIT_KEY', 'TRANSLOADIT_SECRET') - -source_language = 'en-GB' -target_language = 'nl-NL' - -def useTemplate(templateID, file_path='', result_name='', get_url=True, fields=''): - assembly = tl.new_assembly({'template_id': templateID, 'fields': fields}) - - if file_path != '': - assembly.add_file(open(file_path, 'rb')) - - assembly_response = assembly.create(retries=5, wait=True) - if get_url: - result_url = assembly_response.data.get('results').get(result_name)[0].get('ssl_url') - print(result_url) - return result_url - else: - return assembly_response - -response = useTemplate ('TEMPLATE_1_ID', file_path='medium_crab.mp4', get_url=False, fields={"language":source_language}) -transcription_result_url = response.data.get('results').get('transcribe_json')[0].get('ssl_url') -video_url = response.data.get('uploads')[0].get('ssl_url') - -urllib.request.urlretrieve(transcription_result_url, 'transcribe_json') - -with open('transcribe_json') as f: - data = json.load(f) - -ffmpeg = "volume=enable:volume=1" - -startTimes = [] -endTimes = [] -sentences = [] -currentSentence = '' - -startTimes.append(data['words'][0]['startTime']) - -for x in range(len(data['words'])): - if (data['words'][x]['text'] == '.') and (x != len(data['words']) - 1): - time = data['words'][x+1]['startTime'] - startTimes.append(time) - if (data['words'][x]['text'] != '.'): - currentSentence = currentSentence + ' ' + data['words'][x]['text'] - else: - sentences.append(currentSentence + '.') - time = data['words'][x-1]['endTime'] - endTimes.append(time) - currentSentence = '' - -print('startTimes: ' + str(startTimes)) -print('endTimes: ' + str(startTimes)) -print(sentences) - -f = open("words/text.txt", "w") -f.write("") - -for x in range(len(sentences)): - f.write('{text}'.format(start=startTimes[x], text=sentences[x])) - ffmpeg += ", volume=enable='between(t,{start},{end})':volume=0.2".format(start=startTimes[x], end=endTimes[x]) - -f.write("") -f.close() - -final_result_url = useTemplate ('TEMPLATE_2_ID', file_path='words/text.txt', result_name='merged_video', get_url=True, fields={"target_language":target_language, "source_language":source_language, "video":video_url, "ffmpeg":ffmpeg}) \ No newline at end of file +Run from the repository root: + + TRANSLOADIT_KEY=xxx TRANSLOADIT_SECRET=yyy \ + TRANSLOADIT_TRANSCRIBE_TEMPLATE_ID=xxx TRANSLOADIT_TRANSLATE_TEMPLATE_ID=yyy \ + poetry run python examples/video_translator.py +""" + +import json +import os +import tempfile +import urllib.request +from pathlib import Path + +from transloadit.client import Transloadit + + +def get_required_env(name): + value = os.getenv(name) + if not value: + raise RuntimeError(f"Please set {name}.") + return value + + +def first_result_url(response_data, step_name): + results = (response_data.get("results") or {}).get(step_name) or [] + if not results: + raise RuntimeError(f"No results found for step {step_name!r}: {response_data}") + url = results[0].get("ssl_url") or results[0].get("url") + if not url: + raise RuntimeError(f"No result URL found for step {step_name!r}: {response_data}") + return url + + +def create_assembly_with_template(client, template_id, file_path=None, fields=None): + assembly = client.new_assembly({"template_id": template_id, "fields": fields or {}}) + + if file_path is None: + return assembly.create(retries=5, wait=True) + + with Path(file_path).open("rb") as upload: + assembly.add_file(upload, Path(file_path).name) + return assembly.create(retries=5, wait=True) + + +def download_url(url, path, timeout=60): + with urllib.request.urlopen(url, timeout=timeout) as response: + Path(path).write_bytes(response.read()) + + +def build_ssml_and_ffmpeg(words): + if not words: + raise RuntimeError("Transcription result did not contain any words.") + + sentences = [] + start_times = [] + end_times = [] + current_sentence = [] + + start_times.append(words[0]["startTime"]) + for index, word in enumerate(words): + if word["text"] == "." and index != len(words) - 1: + start_times.append(words[index + 1]["startTime"]) + if word["text"] != ".": + current_sentence.append(word["text"]) + continue + sentences.append(" ".join(current_sentence) + ".") + end_times.append(words[index - 1]["endTime"]) + current_sentence = [] + + ffmpeg = "volume=enable:volume=1" + ssml_parts = [""] + for index, sentence in enumerate(sentences): + ssml_parts.append(f'{sentence}') + ffmpeg += ( + f", volume=enable='between(t,{start_times[index]},{end_times[index]})':volume=0.2" + ) + ssml_parts.append("") + return "".join(ssml_parts), ffmpeg + + +def main(): + client = Transloadit( + get_required_env("TRANSLOADIT_KEY"), + get_required_env("TRANSLOADIT_SECRET"), + ) + transcribe_template_id = get_required_env("TRANSLOADIT_TRANSCRIBE_TEMPLATE_ID") + translate_template_id = get_required_env("TRANSLOADIT_TRANSLATE_TEMPLATE_ID") + source_language = os.getenv("TRANSLOADIT_SOURCE_LANGUAGE", "en-GB") + target_language = os.getenv("TRANSLOADIT_TARGET_LANGUAGE", "nl-NL") + example_dir = Path(__file__).resolve().parent + + transcribe_response = create_assembly_with_template( + client, + transcribe_template_id, + file_path=example_dir / "fixtures" / "crab.mp4", + fields={"language": source_language}, + ) + transcription_url = first_result_url(transcribe_response.data, "transcribe_json") + video_url = transcribe_response.data["uploads"][0]["ssl_url"] + + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir) + transcript_path = tmpdir_path / "transcribe_json.json" + download_url(transcription_url, transcript_path) + with transcript_path.open() as transcript: + transcript_data = json.load(transcript) + + ssml, ffmpeg = build_ssml_and_ffmpeg(transcript_data["words"]) + text_path = tmpdir_path / "text.txt" + text_path.write_text(ssml) + + translated_response = create_assembly_with_template( + client, + translate_template_id, + file_path=text_path, + fields={ + "target_language": target_language, + "source_language": source_language, + "video": video_url, + "ffmpeg": ffmpeg, + }, + ) + print("Translated video:", first_result_url(translated_response.data, "merged_video")) + + +if __name__ == "__main__": + main() diff --git a/examples/words/text.txt b/examples/words/text.txt deleted file mode 100644 index e69de29..0000000 diff --git a/poetry.lock b/poetry.lock index 3d496e1..087caf0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2012,4 +2012,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">=3.12" -content-hash = "722f633916f1df64f0091de35bb80488044c1d9ee3d20f840ba188756c9c7ada" +content-hash = "d929dd1019e9b2bdb1be768433f47c693177ed886d6c1975e76cb328a9a725c2" diff --git a/pyproject.toml b/pyproject.toml index cca89e8..53fbc49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ "Topic :: Multimedia :: Sound/Audio :: Conversion", ] dependencies = [ + "aiohttp>=3.13.5,<4", "requests>=2.33,<3", "tuspy>=1.0.0,<2.0.0", "urllib3>=2.7,<3", diff --git a/tests/test_assembly.py b/tests/test_assembly.py index 6dc3fe4..0a034ae 100644 --- a/tests/test_assembly.py +++ b/tests/test_assembly.py @@ -1,9 +1,12 @@ +import io import unittest +from unittest import mock import requests_mock from . import request_body_matcher from transloadit.client import Transloadit +from transloadit.response import Response class AssemblyTest(unittest.TestCase): @@ -63,3 +66,67 @@ def test_save_resumable(self, mock): assembly = self.assembly.create() self.assertEqual(assembly.data["ok"], "ASSEMBLY_COMPLETED") self.assertEqual(assembly.data["assembly_id"], "abcdef45673") + + @requests_mock.Mocker() + def test_save_resumable_uses_field_name_for_nameless_stream(self, mock_requests): + url = f"{self.transloadit.service}/assemblies" + mock_requests.post( + url, + text=( + '{"ok":"ASSEMBLY_UPLOADING",' + '"assembly_ssl_url":"https://api2.example/assemblies/abc",' + '"tus_url":"https://api2.example/uploads"}' + ), + ) + upload = io.BytesIO(b"payload") + self.assembly.add_file(upload, "payload_field") + + with mock.patch("transloadit.assembly.tus.TusClient") as tus_client: + uploader = tus_client.return_value.uploader.return_value + assembly = self.assembly.create(resumable=True) + + self.assertEqual(assembly.data["ok"], "ASSEMBLY_UPLOADING") + tus_client.return_value.uploader.assert_called_once() + self.assertEqual( + tus_client.return_value.uploader.call_args.kwargs["metadata"]["filename"], + "payload_field", + ) + uploader.upload.assert_called_once() + + def test_save_resumable_retries_rate_limit_before_tus_upload(self): + rate_limited = Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + success = Response( + data={ + "ok": "ASSEMBLY_UPLOADING", + "assembly_ssl_url": "https://api2.example/assemblies/abc", + "tus_url": "https://api2.example/uploads", + }, + status_code=200, + headers={}, + ) + self.assembly.add_file(io.BytesIO(b"payload"), "payload_field") + + with mock.patch.object( + self.transloadit.request, + "post", + side_effect=[rate_limited, success], + ) as post_mock: + with mock.patch("transloadit.assembly.tus.TusClient") as tus_client: + uploader = tus_client.return_value.uploader.return_value + assembly = self.assembly.create(resumable=True, retries=1) + + self.assertEqual(assembly.data["ok"], "ASSEMBLY_UPLOADING") + self.assertEqual(post_mock.call_count, 2) + tus_client.assert_called_once_with("https://api2.example/uploads") + self.assertEqual( + tus_client.return_value.uploader.call_args.kwargs["metadata"]["assembly_url"], + "https://api2.example/assemblies/abc", + ) + uploader.upload.assert_called_once() diff --git a/tests/test_async_client.py b/tests/test_async_client.py new file mode 100644 index 0000000..bc12310 --- /dev/null +++ b/tests/test_async_client.py @@ -0,0 +1,1720 @@ +import asyncio +import io +import json +import threading +from pathlib import Path +from unittest import IsolatedAsyncioTestCase, mock + +from aiohttp import payload, web + +from transloadit.async_client import AsyncTransloadit +from transloadit.async_request import _NonClosingUploadStream +from transloadit.client import Transloadit +from transloadit.response import Response + + +class _AsyncApiServer: + def __init__(self): + self.requests = [] + self.app = web.Application() + self.app.router.add_get("/assemblies/{assembly_id}", self.handle_get_assembly) + self.app.router.add_get("/assemblies-text/{assembly_id}", self.handle_get_assembly_text) + self.app.router.add_get("/assemblies-plain/{assembly_id}", self.handle_get_assembly_plain) + self.app.router.add_get("/assemblies", self.handle_list_assemblies) + self.app.router.add_delete("/assemblies/{assembly_id}", self.handle_cancel_assembly) + self.app.router.add_get("/templates/{template_id}", self.handle_get_template) + self.app.router.add_delete("/templates/{template_id}", self.handle_delete_template) + self.app.router.add_get("/templates", self.handle_list_templates) + self.app.router.add_put("/templates/{template_id}", self.handle_update_template) + self.app.router.add_post("/templates", self.handle_create_template) + self.app.router.add_get("/bill/{year}-{month}", self.handle_get_bill) + self.app.router.add_post("/assemblies", self.handle_create_assembly) + self.runner = None + self.site = None + self.base_url = None + + async def start(self): + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, "127.0.0.1", 0) + await self.site.start() + + sock = self.site._server.sockets[0] + host, port = sock.getsockname()[:2] + self.base_url = f"http://{host}:{port}" + return self + + async def close(self): + if self.runner is not None: + await self.runner.cleanup() + + def _record(self, request, body=None): + entry = { + "method": request.method, + "path": request.path, + "query": dict(request.query), + "headers": dict(request.headers), + } + if body is not None: + entry["body"] = body + self.requests.append(entry) + return entry + + async def _parse_body(self, request): + post = await request.post() + body = {} + for key, value in post.items(): + if hasattr(value, "filename") and hasattr(value, "file"): + value.file.seek(0) + body[key] = { + "filename": value.filename, + "content": value.file.read(), + "content_type": value.content_type, + } + else: + body[key] = value + return body + + async def handle_get_assembly(self, request): + self._record(request) + return web.json_response( + { + "ok": "ASSEMBLY_COMPLETED", + "assembly_id": request.match_info["assembly_id"], + }, + headers={"X-Async-Route": "get_assembly"}, + ) + + async def handle_get_assembly_text(self, request): + self._record(request) + payload = { + "ok": "ASSEMBLY_COMPLETED", + "assembly_id": request.match_info["assembly_id"], + } + return web.Response( + text=json.dumps(payload), + content_type="text/plain", + headers={"X-Async-Route": "get_assembly_text"}, + ) + + async def handle_get_assembly_plain(self, request): + self._record(request) + return web.Response( + text="plain assembly response", + content_type="text/plain", + headers={"X-Async-Route": "get_assembly_plain"}, + ) + + async def handle_list_assemblies(self, request): + self._record(request) + return web.json_response( + {"items": [], "count": 0}, + headers={"X-Async-Route": "list_assemblies"}, + ) + + async def handle_cancel_assembly(self, request): + self._record(request) + return web.json_response( + { + "ok": "ASSEMBLY_CANCELED", + "assembly_id": request.match_info["assembly_id"], + }, + headers={"X-Async-Route": "cancel_assembly"}, + ) + + async def handle_get_template(self, request): + self._record(request) + return web.json_response( + { + "ok": "TEMPLATE_FOUND", + "template_id": request.match_info["template_id"], + }, + headers={"X-Async-Route": "get_template"}, + ) + + async def handle_delete_template(self, request): + self._record(request) + return web.json_response( + { + "ok": "TEMPLATE_DELETED", + "template_id": request.match_info["template_id"], + }, + headers={"X-Async-Route": "delete_template"}, + ) + + async def handle_list_templates(self, request): + self._record(request) + return web.json_response( + {"items": [{"template_id": "tpl-1"}], "count": 1}, + headers={"X-Async-Route": "list_templates"}, + ) + + async def handle_update_template(self, request): + body = await self._parse_body(request) + self._record(request, body) + return web.json_response( + { + "ok": "TEMPLATE_UPDATED", + "template_id": request.match_info["template_id"], + }, + headers={"X-Async-Route": "update_template"}, + ) + + async def handle_create_template(self, request): + body = await self._parse_body(request) + self._record(request, body) + params = json.loads(body["params"]) + return web.json_response( + { + "ok": "TEMPLATE_CREATED", + "template_name": params["name"], + }, + headers={"X-Async-Route": "create_template"}, + ) + + async def handle_get_bill(self, request): + self._record(request) + return web.json_response( + { + "ok": "BILL_FOUND", + "period": f"{request.match_info['year']}-{request.match_info['month']}", + }, + headers={"X-Async-Route": "get_bill"}, + ) + + async def handle_create_assembly(self, request): + body = await self._parse_body(request) + self._record(request, body) + return web.json_response( + { + "ok": "ASSEMBLY_COMPLETED", + "assembly_id": "assembly-123", + "assembly_ssl_url": f"{self.base_url}/assemblies/assembly-123", + "tus_url": f"{self.base_url}/uploads", + }, + headers={"X-Async-Route": "create_assembly"}, + ) + + +class _FakeResponseContext: + def __init__(self, payload): + self.payload = payload + self.status = 200 + self.headers = {"X-Async-Route": "fake"} + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def json(self, **kwargs): + return self.payload + + async def text(self): + if isinstance(self.payload, str): + return self.payload + return json.dumps(self.payload) + + +class _UndecodableResponse: + async def json(self, **kwargs): + raise UnicodeDecodeError("utf-8", b"\xff", 0, 1, "invalid start byte") + + async def text(self): + raise UnicodeDecodeError("utf-8", b"\xff", 0, 1, "invalid start byte") + + async def read(self): + return b"\xff" + + +class _RecordingSession: + def __init__(self, payload): + self.calls = [] + self.closed = False + self.payload = payload + + def delete(self, url, **kwargs): + self.calls.append((url, kwargs)) + return _FakeResponseContext(self.payload) + + def get(self, url, **kwargs): + self.calls.append((url, kwargs)) + return _FakeResponseContext(self.payload) + + def post(self, url, **kwargs): + self.calls.append((url, kwargs)) + return _FakeResponseContext(self.payload) + + def put(self, url, **kwargs): + self.calls.append((url, kwargs)) + return _FakeResponseContext(self.payload) + + async def close(self): + self.closed = True + + +class _NeverOwnedSession: + def __init__(self): + self.closed = False + self.close_calls = 0 + + async def close(self): + self.close_calls += 1 + self.closed = True + + +class _BrokenStream: + def __init__(self, name="broken.bin"): + self.name = name + + def tell(self): + raise OSError("tell failed") + + def seek(self, position): + raise OSError("seek failed") + + +class AsyncClientTest(IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.server = await _AsyncApiServer().start() + + async def asyncTearDown(self): + await self.server.close() + + async def test_async_client_methods_and_context_manager(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + response = await client.get_assembly(assembly_id="abc123") + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(response.data["assembly_id"], "abc123") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.headers["X-Async-Route"], "get_assembly") + self.assertEqual(response.headers["x-async-route"], "get_assembly") + + response = await client.list_assemblies() + self.assertEqual(response.data["items"], []) + self.assertEqual(response.data["count"], 0) + self.assertEqual(response.headers["X-Async-Route"], "list_assemblies") + + response = await client.cancel_assembly(assembly_id="abc123") + self.assertEqual(response.data["ok"], "ASSEMBLY_CANCELED") + self.assertEqual(response.data["assembly_id"], "abc123") + + response = await client.get_template("tpl-1") + self.assertEqual(response.data["ok"], "TEMPLATE_FOUND") + self.assertEqual(response.data["template_id"], "tpl-1") + + response = await client.list_templates() + self.assertEqual(response.data["items"], [{"template_id": "tpl-1"}]) + self.assertEqual(response.data["count"], 1) + + response = await client.update_template("tpl-1", {"name": "foo_bar"}) + self.assertEqual(response.data["ok"], "TEMPLATE_UPDATED") + self.assertEqual(response.data["template_id"], "tpl-1") + + template = client.new_template("foo") + template.add_step("resize", "/image/resize", {"width": 70, "height": 70}) + response = await template.create() + self.assertEqual(response.data["ok"], "TEMPLATE_CREATED") + self.assertEqual(response.data["template_name"], "foo") + + self.assertIsNone(client.request.session) + + self.assertGreaterEqual(len(self.server.requests), 7) + first_request = self.server.requests[0] + self.assertEqual(first_request["method"], "GET") + self.assertEqual(first_request["path"], "/assemblies/abc123") + + update_request = next( + entry for entry in self.server.requests if entry["path"] == "/templates/tpl-1" and entry["method"] == "PUT" + ) + update_params = json.loads(update_request["body"]["params"]) + self.assertEqual(update_params["name"], "foo_bar") + + create_request = next( + entry for entry in self.server.requests if entry["path"] == "/templates" and entry["method"] == "POST" + ) + create_params = json.loads(create_request["body"]["params"]) + self.assertEqual(create_params["name"], "foo") + self.assertEqual(create_params["template"]["steps"]["resize"]["robot"], "/image/resize") + + async def test_async_client_accepts_json_with_text_content_type(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + response = await client.get_assembly( + assembly_url=f"{self.server.base_url}/assemblies-text/abc123" + ) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(response.data["assembly_id"], "abc123") + self.assertEqual(response.headers["X-Async-Route"], "get_assembly_text") + + async def test_async_client_normalizes_service_and_rejects_missing_ids(self): + session = _NeverOwnedSession() + client = AsyncTransloadit( + "key", + "secret", + service="api2.transloadit.com", + session=session, + ) + + self.assertEqual(client.service, "https://api2.transloadit.com") + + for service in ("", " ", "https://", "ftp://api2.transloadit.com"): + with self.assertRaises(ValueError): + AsyncTransloadit("key", "secret", service=service, session=session) + + with self.assertRaises(ValueError): + await client.get_assembly() + + with self.assertRaises(ValueError): + await client.cancel_assembly() + + external_session = _RecordingSession({"ok": "ASSEMBLY_COMPLETED"}) + external_client = AsyncTransloadit( + "key", + "secret", + service="https://api2.transloadit.com", + session=external_session, + ) + await external_client.get_assembly(assembly_url="https://example.com/assemblies/abc123") + await external_client.cancel_assembly(assembly_url="https://example.com/assemblies/abc123") + self.assertEqual( + [call[0] for call in external_session.calls], + [ + "https://example.com/assemblies/abc123", + "https://example.com/assemblies/abc123", + ], + ) + self.assertIsNone(external_session.calls[0][1]["params"]) + self.assertEqual(external_session.calls[1][1]["data"], []) + + transloadit_session = _RecordingSession({"ok": "ASSEMBLY_COMPLETED"}) + transloadit_client = AsyncTransloadit( + "key", + "secret", + service="https://api2.transloadit.com", + session=transloadit_session, + ) + await transloadit_client.get_assembly( + assembly_url="https://api2-region.transloadit.com/assemblies/abc123" + ) + self.assertEqual( + transloadit_session.calls[0][0], + "https://api2-region.transloadit.com/assemblies/abc123", + ) + self.assertIn("signature", transloadit_session.calls[0][1]["params"]) + + await client.close() + + self.assertFalse(session.closed) + self.assertEqual(session.close_calls, 0) + + closed_session = _NeverOwnedSession() + closed_session.closed = True + closed_client = AsyncTransloadit( + "key", + "secret", + service=self.server.base_url, + session=closed_session, + ) + + with self.assertRaises(RuntimeError): + await closed_client.get_assembly(assembly_id="abc123") + + async def test_async_client_quotes_path_ids(self): + session = _RecordingSession({"ok": "ASSEMBLY_COMPLETED"}) + client = AsyncTransloadit("key", "secret", service=self.server.base_url, session=session) + + await client.get_assembly(assembly_id="assembly/with?chars") + await client.cancel_assembly(assembly_id="cancel/with?chars") + await client.get_template("template/with?chars") + await client.update_template("update/with?chars", {"name": "foo"}) + await client.delete_template("delete/with?chars") + + urls = [call[0] for call in session.calls] + self.assertEqual( + urls, + [ + f"{self.server.base_url}/assemblies/assembly%2Fwith%3Fchars", + f"{self.server.base_url}/assemblies/cancel%2Fwith%3Fchars", + f"{self.server.base_url}/templates/template%2Fwith%3Fchars", + f"{self.server.base_url}/templates/update%2Fwith%3Fchars", + f"{self.server.base_url}/templates/delete%2Fwith%3Fchars", + ], + ) + + async def test_async_client_rejects_empty_template_ids(self): + session = _RecordingSession({"ok": "TEMPLATE_FOUND"}) + client = AsyncTransloadit("key", "secret", service=self.server.base_url, session=session) + + for template_id in ("", None): + with self.assertRaises(ValueError): + await client.get_template(template_id) + with self.assertRaises(ValueError): + await client.update_template(template_id, {"name": "foo"}) + with self.assertRaises(ValueError): + await client.delete_template(template_id) + + self.assertEqual(session.calls, []) + + async def test_async_client_close_reopens_owned_session(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + + first_session = await client.request._ensure_session() + self.assertFalse(first_session.closed) + + await client.close() + self.assertTrue(first_session.closed) + self.assertIsNone(client.request.session) + + second_session = await client.request._ensure_session() + self.assertIsNot(first_session, second_session) + self.assertFalse(second_session.closed) + + await client.close() + + async def test_async_request_owned_sessions_trust_environment(self): + session = _NeverOwnedSession() + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + + with mock.patch("aiohttp.ClientSession", return_value=session) as session_mock: + ensured_session = await client.request._ensure_session() + + self.assertIs(ensured_session, session) + session_mock.assert_called_once_with(trust_env=True) + + await client.close() + + async def test_async_client_reopens_owned_session_when_session_is_closed(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + + first_session = await client.request._ensure_session() + self.assertFalse(first_session.closed) + + await first_session.close() + reopened_session = await client.request._ensure_session() + + self.assertIsNot(first_session, reopened_session) + self.assertFalse(reopened_session.closed) + + await client.close() + + async def test_async_client_delete_template_get_bill_and_plain_text_fallback(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + response = await client.delete_template("tpl-1") + self.assertEqual(response.data["ok"], "TEMPLATE_DELETED") + self.assertEqual(response.data["template_id"], "tpl-1") + self.assertEqual(response.headers["X-Async-Route"], "delete_template") + + response = await client.get_bill(9, 2017) + self.assertEqual(response.data["ok"], "BILL_FOUND") + self.assertEqual(response.data["period"], "2017-09") + self.assertEqual(response.headers["X-Async-Route"], "get_bill") + + response = await client.get_assembly( + assembly_url=f"{self.server.base_url}/assemblies-plain/abc123" + ) + + self.assertEqual(response.data, "plain assembly response") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.headers["X-Async-Route"], "get_assembly_plain") + + async def test_async_request_falls_back_to_bytes_when_text_decode_fails(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + + data = await client.request._read_response_data(_UndecodableResponse()) + + self.assertEqual(data, b"\xff") + + async def test_async_assembly_create_raises_on_plain_text_error_response(self): + plain_response = Response( + data="plain assembly response", + status_code=502, + headers={"X-Async-Route": "plain"}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=plain_response)) as post_mock: + with self.assertRaises(RuntimeError): + await assembly.create(wait=True, resumable=False) + + post_mock.assert_awaited_once() + + async def test_async_assembly_create_returns_plain_text_success_response(self): + plain_response = Response( + data="plain assembly response", + status_code=200, + headers={"X-Async-Route": "plain"}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=plain_response)) as post_mock: + response = await assembly.create(wait=False, resumable=False) + + self.assertIs(response, plain_response) + post_mock.assert_awaited_once() + + async def test_async_assembly_resumable_plain_text_success_response_raises_before_tus_upload(self): + calls = [] + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + raise AssertionError("TUS upload should not start without upload URLs") + + plain_response = Response( + data="plain assembly response", + status_code=200, + headers={"X-Async-Route": "plain"}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(io.BytesIO(b"payload")) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=plain_response)) as post_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + with self.assertRaises(RuntimeError): + await assembly.create(resumable=True) + + post_mock.assert_awaited_once() + self.assertEqual(calls, []) + + async def test_async_assembly_wait_raises_on_plain_text_poll_response(self): + initial_response = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + plain_response = Response( + data="plain assembly response", + status_code=502, + headers={"X-Async-Route": "plain"}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial_response)) as post_mock: + with mock.patch.object(client, "get_assembly", new=mock.AsyncMock(return_value=plain_response)) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + with self.assertRaises(RuntimeError): + await assembly.create(wait=True, resumable=False) + + post_mock.assert_awaited_once() + get_mock.assert_awaited_once_with( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ) + sleep_mock.assert_awaited_once_with(0) + + async def test_async_assembly_wait_raises_on_plain_text_success_poll_response(self): + initial_response = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + plain_response = Response( + data="plain assembly response", + status_code=200, + headers={"X-Async-Route": "plain"}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial_response)) as post_mock: + with mock.patch.object(client, "get_assembly", new=mock.AsyncMock(return_value=plain_response)) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + with self.assertRaises(RuntimeError): + await assembly.create(wait=True, resumable=False) + + post_mock.assert_awaited_once() + get_mock.assert_awaited_once_with( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ) + sleep_mock.assert_awaited_once_with(0) + + def test_async_signed_smart_cdn_url_matches_sync_and_rejects_bad_types(self): + async_client = AsyncTransloadit("test-key", "test-secret") + sync_client = Transloadit("test-key", "test-secret") + params = {"width": 100, "tags": ["a", "b"], "enabled": True, "flags": [True, False], "skip": None} + + with mock.patch("time.time", return_value=1732550672.867): + async_url = async_client.get_signed_smart_cdn_url( + "acme-workspace", + "My Template", + "folder/file name.jpg", + params, + ) + explicit_async_url = async_client.get_signed_smart_cdn_url( + "acme-workspace", + "My Template", + "folder/file name.jpg", + params, + expires_at_ms=1732550672867, + ) + sync_url = sync_client.get_signed_smart_cdn_url( + "acme-workspace", + "My Template", + "folder/file name.jpg", + params, + ) + explicit_sync_url = sync_client.get_signed_smart_cdn_url( + "acme-workspace", + "My Template", + "folder/file name.jpg", + params, + expires_at_ms=1732550672867, + ) + bare_async_url = async_client.get_signed_smart_cdn_url( + "acme-workspace", + "My Template", + "folder/file name.jpg", + ) + bare_sync_url = sync_client.get_signed_smart_cdn_url( + "acme-workspace", + "My Template", + "folder/file name.jpg", + ) + + self.assertEqual(async_url, sync_url) + self.assertEqual(explicit_async_url, explicit_sync_url) + self.assertEqual(bare_async_url, bare_sync_url) + self.assertIn("auth_key=test-key", async_url) + self.assertIn("exp=1732554272867", async_url) + self.assertIn("width=100", async_url) + self.assertIn("tags=a", async_url) + self.assertIn("tags=b", async_url) + self.assertIn("enabled=true", async_url) + self.assertIn("flags=true", async_url) + self.assertIn("flags=false", async_url) + self.assertIn("exp=1732550672867", explicit_async_url) + self.assertNotIn("width=", bare_async_url) + self.assertNotIn("skip=", async_url) + + with self.assertRaises(ValueError): + async_client.get_signed_smart_cdn_url("workspace", "template", "input", {"bad": object()}) + with self.assertRaises(ValueError): + async_client.get_signed_smart_cdn_url("Acme Workspace", "template", "input") + with self.assertRaises(ValueError): + sync_client.get_signed_smart_cdn_url("bad.workspace", "template", "input") + for reserved_key in ("auth_key", "exp", "sig"): + with self.assertRaises(ValueError): + async_client.get_signed_smart_cdn_url( + "workspace", + "template", + "input", + {reserved_key: "override"}, + ) + + async def test_async_assembly_create_non_resumable_upload(self): + fixture_path = Path(__file__).resolve().parents[1] / "LICENSE" + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_step("resize", "/image/resize", {"use": ":original", "width": 128}) + + with fixture_path.open("rb") as upload: + assembly.add_file(upload) + response = await assembly.create(resumable=False) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(response.data["assembly_id"], "assembly-123") + + create_request = next( + entry for entry in self.server.requests if entry["path"] == "/assemblies" and entry["method"] == "POST" + ) + create_params = json.loads(create_request["body"]["params"]) + self.assertEqual(create_params["steps"]["resize"]["robot"], "/image/resize") + self.assertIn("signature", create_request["body"]) + + uploaded_file = create_request["body"]["file"] + self.assertEqual(uploaded_file["filename"], "LICENSE") + self.assertEqual(uploaded_file["content"], fixture_path.read_bytes()) + + async def test_async_assembly_wait_polls_with_async_sleep(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0.25}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + rate_limited = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "error": "ASSEMBLY_STATUS_FETCHING_RATE_LIMIT_REACHED", + "info": {"retryIn": 0.25}, + }, + status_code=200, + headers={"X-Async-Route": "rate_limited"}, + ) + completed = Response( + data={"ok": "ASSEMBLY_COMPLETED", "assembly_id": "assembly-123"}, + status_code=200, + headers={"X-Async-Route": "completed"}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial)) as post_mock: + with mock.patch.object( + client, + "get_assembly", + new=mock.AsyncMock(side_effect=[rate_limited, completed]), + ) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + response = await assembly.create(wait=True, resumable=False) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + post_mock.assert_awaited_once() + self.assertEqual( + get_mock.await_args_list, + [ + mock.call( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ), + mock.call( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ), + ], + ) + self.assertEqual(sleep_mock.await_args_list, [mock.call(0.25), mock.call(0.25)]) + + async def test_async_assembly_wait_polls_zero_file_resumable_assembly_without_tus(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0.25}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + completed = Response( + data={"ok": "ASSEMBLY_COMPLETED", "assembly_id": "assembly-123"}, + status_code=200, + headers={"X-Async-Route": "completed"}, + ) + + class _TusClient: + def __init__(self, tus_url): + raise AssertionError("TUS upload should not start for zero-file resumable assemblies") + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial)) as post_mock: + with mock.patch.object( + client, + "get_assembly", + new=mock.AsyncMock(return_value=completed), + ) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + response = await assembly.create(wait=True, resumable=True) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + post_mock.assert_awaited_once() + self.assertEqual( + post_mock.await_args.kwargs["extra_data"], + {"tus_num_expected_upload_files": 0}, + ) + get_mock.assert_awaited_once_with( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ) + self.assertEqual(sleep_mock.await_args_list, [mock.call(0.25)]) + + async def test_async_assembly_resumable_rate_limit_retries_before_tus_upload(self): + calls = [] + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + calls.append(("uploader", kwargs["metadata"], kwargs["retries"])) + + class _Uploader: + def upload(self_inner): + calls.append(("upload", kwargs["metadata"])) + + return _Uploader() + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + upload = io.BytesIO(b"payload") + upload.name = b"payload.bin" + assembly.add_file(upload) + + rate_limited = Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + success = Response( + data={ + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + "tus_url": f"{self.server.base_url}/uploads", + }, + status_code=200, + headers={}, + ) + + with mock.patch.object( + client.request, + "post", + new=mock.AsyncMock(side_effect=[rate_limited, success]), + ) as post_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + with mock.patch("asyncio.to_thread", new=mock.AsyncMock(side_effect=lambda func, *args: func(*args))) as to_thread_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + response = await assembly.create(resumable=True, retries=2) + + self.assertEqual(response.data["assembly_ssl_url"], f"{self.server.base_url}/assemblies/assembly-123") + self.assertEqual(post_mock.await_count, 2) + self.assertEqual(to_thread_mock.await_count, 1) + self.assertEqual(calls[0], ("client", f"{self.server.base_url}/uploads")) + self.assertEqual(calls[1], ("uploader", {"assembly_url": f"{self.server.base_url}/assemblies/assembly-123", "fieldname": "file", "filename": "payload.bin"}, 2)) + + async def test_async_assembly_resumable_rate_limit_skips_rewind_before_retrying(self): + calls = [] + + class _BrokenRewindStream(io.BytesIO): + def seek(self, position, *args, **kwargs): + raise OSError("seek failed") + + class _Uploader: + def __init__(self, metadata): + self.metadata = metadata + + def upload(self): + calls.append(("upload", dict(self.metadata))) + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + calls.append(("uploader", dict(kwargs["metadata"]))) + return _Uploader(kwargs["metadata"]) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + upload = _BrokenRewindStream(b"payload") + upload.name = "payload.bin" + assembly.add_file(upload) + + rate_limited = Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + success = Response( + data={ + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + "tus_url": f"{self.server.base_url}/uploads", + }, + status_code=200, + headers={}, + ) + + with mock.patch.object( + client.request, + "post", + new=mock.AsyncMock(side_effect=[rate_limited, success]), + ) as post_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + with mock.patch("asyncio.to_thread", new=mock.AsyncMock(side_effect=lambda func, *args: func(*args))) as to_thread_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + response = await assembly.create(resumable=True, retries=2) + + self.assertEqual(response.data["assembly_ssl_url"], f"{self.server.base_url}/assemblies/assembly-123") + self.assertEqual(post_mock.await_count, 2) + self.assertEqual(to_thread_mock.await_count, 1) + self.assertEqual(calls[0], ("client", f"{self.server.base_url}/uploads")) + self.assertEqual(calls[1][0], "uploader") + + async def test_async_assembly_non_resumable_rate_limit_raises_when_stream_cannot_be_snapshotted(self): + class _NonSeekableStream(io.BytesIO): + def tell(self): + raise OSError("tell failed") + + reads = [] + + async def fake_post(path, data=None, extra_data=None, files=None): + file_stream = files["file"] + reads.append(file_stream.read()) + return Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(_NonSeekableStream(b"payload")) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(side_effect=fake_post)) as post_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + with self.assertRaises(RuntimeError): + await assembly.create(resumable=False, retries=1) + + self.assertEqual(reads, [b"payload"]) + post_mock.assert_awaited_once() + sleep_mock.assert_not_awaited() + + async def test_async_assembly_resumable_rate_limit_returns_response_without_upload_when_retries_exhausted(self): + calls = [] + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + raise AssertionError("TUS upload should not start when retries are exhausted") + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(io.BytesIO(b"payload")) + + rate_limited = Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=rate_limited)) as post_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + response = await assembly.create(resumable=True, retries=0) + + self.assertEqual(response.data["error"], "RATE_LIMIT_REACHED") + post_mock.assert_awaited_once() + sleep_mock.assert_not_awaited() + self.assertEqual(calls, []) + + async def test_async_assembly_resumable_error_response_skips_tus_upload(self): + calls = [] + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + raise AssertionError("TUS upload should not start for error responses") + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(io.BytesIO(b"payload")) + + error_response = Response( + data={ + "error": "ASSEMBLY_NOT_AUTHORIZED", + }, + status_code=401, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=error_response)) as post_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + response = await assembly.create(resumable=True) + + self.assertIs(response, error_response) + post_mock.assert_awaited_once() + self.assertEqual(calls, []) + + async def test_async_assembly_resumable_response_without_upload_urls_raises_before_tus_upload(self): + calls = [] + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + raise AssertionError("TUS upload should not start when upload URLs are missing") + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(io.BytesIO(b"payload")) + + incomplete_response = Response( + data={"ok": "ASSEMBLY_PROCESSING"}, + status_code=200, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=incomplete_response)) as post_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + with self.assertRaisesRegex(RuntimeError, "ASSEMBLY_PROCESSING"): + await assembly.create(resumable=True) + + post_mock.assert_awaited_once() + self.assertEqual(calls, []) + + async def test_async_assembly_resumable_response_allows_configured_service_tus_url(self): + calls = [] + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + calls.append(("upload", kwargs["metadata"])) + return self + + def upload(self): + calls.append(("uploaded",)) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(io.BytesIO(b"payload")) + + response = Response( + data={ + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + "tus_url": "https://example.com/uploads", + }, + status_code=200, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=response)) as post_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + await assembly.create(resumable=True) + + post_mock.assert_awaited_once() + self.assertEqual(calls[0], ("client", "https://example.com/uploads")) + self.assertEqual(calls[1][0], "upload") + self.assertEqual(calls[2], ("uploaded",)) + + async def test_async_assembly_wait_returns_response_without_assembly_url(self): + incomplete_response = Response( + data={"ok": "ASSEMBLY_PROCESSING"}, + status_code=200, + headers={}, + ) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=incomplete_response)) as post_mock: + with mock.patch.object(client, "get_assembly", new=mock.AsyncMock()) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + response = await assembly.create(wait=True, resumable=False) + + self.assertIs(response, incomplete_response) + post_mock.assert_awaited_once() + get_mock.assert_not_awaited() + sleep_mock.assert_not_awaited() + + async def test_async_resumable_upload_posts_extra_data_and_uses_tus_metadata(self): + calls = [] + + class _Uploader: + def __init__(self, metadata): + self.metadata = metadata + + def upload(self): + calls.append(("upload", dict(self.metadata))) + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + + def uploader(self, **kwargs): + calls.append(("uploader", dict(kwargs["metadata"]))) + return _Uploader(kwargs["metadata"]) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + upload = io.BytesIO(b"payload") + upload.name = 123 + assembly.add_file(upload, "explicit_field") + + with mock.patch( + "asyncio.to_thread", + new=mock.AsyncMock(side_effect=lambda func, *args: func(*args)), + ) as to_thread_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + response = await assembly.create(resumable=True) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + tus_upload_calls = [ + call + for call in to_thread_mock.await_args_list + if getattr(call.args[0], "__name__", "") == "_do_tus_upload" + ] + self.assertEqual(len(tus_upload_calls), 1) + + create_request = next( + entry for entry in self.server.requests if entry["path"] == "/assemblies" and entry["method"] == "POST" + ) + self.assertEqual(create_request["body"]["tus_num_expected_upload_files"], "1") + create_params = json.loads(create_request["body"]["params"]) + self.assertEqual(create_params["auth"]["key"], "key") + + self.assertEqual(calls[0], ("client", f"{self.server.base_url}/uploads")) + self.assertEqual(calls[1][0], "uploader") + metadata = calls[1][1] + self.assertEqual(metadata["assembly_url"], f"{self.server.base_url}/assemblies/assembly-123") + self.assertEqual(metadata["fieldname"], "explicit_field") + self.assertEqual(metadata["filename"], "explicit_field") + self.assertEqual(calls[2][0], "upload") + + async def test_async_assembly_wait_retries_after_polling_rate_limit(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + rate_limited = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "rate_limited"}, + ) + completed = Response( + data={ + "ok": "ASSEMBLY_COMPLETED", + "assembly_id": "assembly-123", + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "completed"}, + ) + + with mock.patch.object( + client.request, + "post", + new=mock.AsyncMock(side_effect=[initial, initial]), + ) as post_mock: + with mock.patch.object( + client, + "get_assembly", + new=mock.AsyncMock(side_effect=[rate_limited, completed]), + ) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + response = await assembly.create(wait=True, resumable=False, retries=2) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(post_mock.await_count, 1) + self.assertEqual( + get_mock.await_args_list, + [ + mock.call(assembly_url=f"{self.server.base_url}/assemblies/assembly-123"), + mock.call(assembly_url=f"{self.server.base_url}/assemblies/assembly-123"), + ], + ) + self.assertEqual(sleep_mock.await_args_list, [mock.call(0), mock.call(0)]) + + async def test_async_assembly_wait_resets_poll_rate_limit_retry_budget(self): + assembly_url = f"{self.server.base_url}/assemblies/assembly-123" + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": assembly_url, + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + rate_limited = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "error": "ASSEMBLY_STATUS_FETCHING_RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + "assembly_ssl_url": assembly_url, + }, + status_code=200, + headers={"X-Async-Route": "rate_limited"}, + ) + processing = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": assembly_url, + }, + status_code=200, + headers={"X-Async-Route": "processing"}, + ) + completed = Response( + data={ + "ok": "ASSEMBLY_COMPLETED", + "assembly_id": "assembly-123", + "assembly_ssl_url": assembly_url, + }, + status_code=200, + headers={"X-Async-Route": "completed"}, + ) + + with mock.patch.object( + client.request, + "post", + new=mock.AsyncMock(return_value=initial), + ) as post_mock: + with mock.patch.object( + client, + "get_assembly", + new=mock.AsyncMock( + side_effect=[rate_limited, processing, rate_limited, completed] + ), + ) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + response = await assembly.create(wait=True, resumable=False, retries=1) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(post_mock.await_count, 1) + self.assertEqual(get_mock.await_count, 4) + + async def test_async_assembly_wait_does_not_follow_poll_response_assembly_url(self): + initial_url = f"{self.server.base_url}/assemblies/assembly-123" + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": initial_url, + }, + status_code=200, + headers={}, + ) + malicious_poll = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "error": "ASSEMBLY_STATUS_FETCHING_RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + "assembly_ssl_url": "https://example.invalid/assemblies/evil", + }, + status_code=200, + headers={}, + ) + completed = Response( + data={"ok": "ASSEMBLY_COMPLETED", "assembly_id": "assembly-123"}, + status_code=200, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial)): + with mock.patch.object( + client, + "get_assembly", + new=mock.AsyncMock(side_effect=[malicious_poll, completed]), + ) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + response = await assembly.create(wait=True, resumable=False, retries=2) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual( + get_mock.await_args_list, + [ + mock.call(assembly_url=initial_url), + mock.call(assembly_url=initial_url), + ], + ) + + async def test_async_assembly_wait_returns_last_poll_response_when_budget_exhausted(self): + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "info": {"retryIn": 0}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "initial"}, + ) + rate_limited = Response( + data={ + "ok": "ASSEMBLY_PROCESSING", + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + }, + status_code=200, + headers={"X-Async-Route": "rate_limited"}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial)) as post_mock: + with mock.patch.object( + client, + "get_assembly", + new=mock.AsyncMock(return_value=rate_limited), + ) as get_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) as sleep_mock: + response = await assembly.create(wait=True, resumable=False, retries=1) + + self.assertEqual(response.data["error"], "RATE_LIMIT_REACHED") + post_mock.assert_awaited_once() + self.assertEqual( + get_mock.await_args_list, + [ + mock.call( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ), + mock.call( + assembly_url=f"{self.server.base_url}/assemblies/assembly-123" + ), + ], + ) + self.assertEqual(sleep_mock.await_args_list, [mock.call(0), mock.call(0)]) + + async def test_async_assembly_non_resumable_rate_limit_rewinds_files_for_retry(self): + reads = [] + upload = io.BytesIO(b"payload") + + async def fake_post(path, data=None, extra_data=None, files=None): + file_stream = files["file"] + reads.append(file_stream.read()) + if len(reads) == 1: + return Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + return Response( + data={"ok": "ASSEMBLY_COMPLETED", "assembly_id": "assembly-123"}, + status_code=200, + headers={}, + ) + + async def fake_sleep(delay): + self.assertEqual(upload.tell(), 0) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(upload) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(side_effect=fake_post)): + with mock.patch("asyncio.sleep", new=mock.AsyncMock(side_effect=fake_sleep)) as sleep_mock: + response = await assembly.create(resumable=False, retries=2) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(reads, [b"payload", b"payload"]) + sleep_mock.assert_awaited_once_with(0) + + async def test_async_assembly_non_resumable_rate_limit_raises_when_rewind_fails(self): + class _BrokenRewindStream(io.BytesIO): + def seek(self, position, *args, **kwargs): + raise OSError("seek failed") + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + assembly.add_file(_BrokenRewindStream(b"payload")) + + rate_limited = Response( + data={ + "error": "RATE_LIMIT_REACHED", + "info": {"retryIn": 0}, + }, + status_code=200, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=rate_limited)) as post_mock: + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + with self.assertRaises(RuntimeError): + await assembly.create(resumable=False, retries=1) + + post_mock.assert_awaited_once() + + async def test_async_assembly_rate_limit_ignores_malformed_error_values(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + assembly = client.new_assembly() + + self.assertFalse(assembly._rate_limit_reached({"error": ["RATE_LIMIT_REACHED"]})) + self.assertFalse(assembly._rate_limit_reached({"error": {"code": "RATE_LIMIT_REACHED"}})) + + async def test_async_assembly_retry_delay_sanitizes_response_info(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + assembly = client.new_assembly() + + self.assertEqual(assembly._retry_delay({}), 1) + self.assertEqual(assembly._retry_delay({"info": None}), 1) + self.assertEqual(assembly._retry_delay({"info": {"retryIn": "bad"}}), 1) + self.assertEqual(assembly._retry_delay({"info": {"retryIn": float("nan")}}), 1) + self.assertEqual(assembly._retry_delay({"info": {"retryIn": -2}}), 0) + self.assertEqual(assembly._retry_delay({"info": {"retryIn": 0.25}}), 0.25) + self.assertEqual(assembly._retry_delay({"info": {"retryIn": 9999}}), 9999) + + async def test_async_tus_upload_cancellation_waits_for_thread_to_finish(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + assembly = client.new_assembly() + started = threading.Event() + release = threading.Event() + finished = threading.Event() + + def blocking_upload(assembly_url, tus_url, retries): + started.set() + release.wait(timeout=5) + finished.set() + + assembly._do_tus_upload = blocking_upload + upload_task = asyncio.create_task( + assembly._do_tus_upload_async( + f"{self.server.base_url}/assemblies/assembly-123", + f"{self.server.base_url}/uploads", + retries=1, + ) + ) + + await asyncio.to_thread(started.wait, 5) + upload_task.cancel() + await asyncio.sleep(0.05) + + self.assertFalse(upload_task.done()) + self.assertFalse(finished.is_set()) + + release.set() + with self.assertRaises(asyncio.CancelledError): + await upload_task + + await asyncio.to_thread(finished.wait, 5) + self.assertTrue(finished.is_set()) + + async def test_async_request_uses_connect_and_read_timeouts_for_uploads(self): + session = _RecordingSession({"ok": "ASSEMBLY_COMPLETED"}) + client = AsyncTransloadit("key", "secret", service=self.server.base_url, session=session) + upload = io.BytesIO(b"payload") + upload.name = "clip.jpg" + + response = await client.request.post("/assemblies", data={"foo": "bar"}, files={"file": upload}) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + timeout = session.calls[0][1]["timeout"] + self.assertIsNone(timeout.total) + self.assertEqual(timeout.sock_connect, 60) + self.assertEqual(timeout.sock_read, 60) + self.assertEqual(session.calls[0][1]["data"]._fields[2][1]["Content-Type"], "image/jpeg") + + async def test_async_request_upload_does_not_close_caller_stream(self): + fixture_path = Path(__file__).resolve().parents[1] / "LICENSE" + upload = fixture_path.open("rb") + + try: + upload_payload = payload.get_payload(_NonClosingUploadStream(upload)) + await upload_payload.close() + await asyncio.sleep(0.05) + + self.assertFalse(upload.closed) + upload.seek(0) + self.assertEqual(upload.read(5), fixture_path.read_bytes()[:5]) + finally: + if not upload.closed: + upload.close() + + async def test_async_request_payload_preserves_custom_auth_constraints(self): + client = AsyncTransloadit("key", "secret", service=self.server.base_url) + + payload = client.request._to_payload( + { + "auth": { + "max_size": 1024, + "referer": "https://example.com", + }, + "foo": "bar", + } + ) + + params = json.loads(payload["params"]) + self.assertEqual(params["auth"]["key"], "key") + self.assertIn("expires", params["auth"]) + self.assertEqual(params["auth"]["max_size"], 1024) + self.assertEqual(params["auth"]["referer"], "https://example.com") + + with self.assertRaises(ValueError): + client.request._to_payload({"auth": "not-a-dict"}) + + async def test_async_request_filters_none_and_matches_sync_booleans_in_extra_data(self): + session = _RecordingSession({"ok": "ASSEMBLY_COMPLETED"}) + client = AsyncTransloadit("key", "secret", service=self.server.base_url, session=session) + upload = io.BytesIO(b"payload") + upload.name = "clip.jpg" + + response = await client.request.post( + "/assemblies", + data={"foo": "bar"}, + extra_data={"enabled": True, "skip": None, "tags": ["a", "b"]}, + files={"file": upload}, + ) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + fields = {field[0]["name"]: field for field in session.calls[0][1]["data"]._fields} + self.assertIn("enabled", fields) + self.assertNotIn("skip", fields) + self.assertEqual(fields["enabled"][2], "True") + tag_values = [field[2] for field in session.calls[0][1]["data"]._fields if field[0]["name"] == "tags"] + self.assertEqual(tag_values, ["a", "b"]) + + def test_non_closing_upload_stream_reflects_seekability(self): + class _NonSeekableUpload(io.BytesIO): + def seekable(self): + return False + + class _BrokenSeekableUpload(io.BytesIO): + def seekable(self): + raise OSError("seekable failed") + + class _WriteOnlyUpload: + def readable(self): + return False + + self.assertTrue(_NonClosingUploadStream(io.BytesIO(b"payload")).seekable()) + self.assertFalse(_NonClosingUploadStream(_NonSeekableUpload(b"payload")).seekable()) + self.assertFalse(_NonClosingUploadStream(_BrokenSeekableUpload(b"payload")).seekable()) + self.assertFalse(_NonClosingUploadStream(_WriteOnlyUpload()).readable()) + + async def test_async_request_uses_filename_fallback_for_trailing_slash_stream_name(self): + session = _RecordingSession({"ok": "ASSEMBLY_COMPLETED"}) + client = AsyncTransloadit("key", "secret", service=self.server.base_url, session=session) + upload = io.BytesIO(b"payload") + upload.name = "/tmp/" + + response = await client.request.post("/assemblies", data={"foo": "bar"}, files={"file": upload}) + + self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") + self.assertEqual(session.calls[0][1]["data"]._fields[2][0]["filename"], "file") + + async def test_async_resumable_upload_uses_to_thread(self): + calls = [] + + class _Uploader: + def __init__(self, metadata): + self.metadata = metadata + + def upload(self): + calls.append(("upload", dict(self.metadata))) + + class _TusClient: + def __init__(self, tus_url): + calls.append(("client", tus_url)) + self.tus_url = tus_url + + def uploader(self, **kwargs): + calls.append(("uploader", kwargs["metadata"], kwargs["chunk_size"], kwargs["retries"])) + return _Uploader(kwargs["metadata"]) + + async with AsyncTransloadit("key", "secret", service=self.server.base_url) as client: + assembly = client.new_assembly() + + initial = Response( + data={ + "assembly_ssl_url": f"{self.server.base_url}/assemblies/assembly-123", + "tus_url": f"{self.server.base_url}/uploads", + }, + status_code=200, + headers={}, + ) + + with mock.patch.object(client.request, "post", new=mock.AsyncMock(return_value=initial)): + with mock.patch("asyncio.to_thread", new=mock.AsyncMock(side_effect=lambda func, *args: func(*args))) as to_thread_mock: + with mock.patch("transloadit.async_assembly.tus.TusClient", new=_TusClient): + assembly.add_file(io.BytesIO(b"payload")) + response = await assembly.create(resumable=True, retries=5) + + self.assertEqual(response.data["assembly_ssl_url"], f"{self.server.base_url}/assemblies/assembly-123") + to_thread_mock.assert_awaited_once() + self.assertEqual(calls[0], ("client", f"{self.server.base_url}/uploads")) + self.assertEqual(calls[1][0], "uploader") + metadata = calls[1][1] + self.assertEqual(metadata["assembly_url"], f"{self.server.base_url}/assemblies/assembly-123") + self.assertEqual(metadata["fieldname"], "file") + self.assertEqual(metadata["filename"], "file") + self.assertEqual(calls[2][0], "upload") + + def test_async_assembly_helpers_cover_duplicate_names_and_rewind_edges(self): + client = AsyncTransloadit("key", "secret") + assembly = client.new_assembly() + + first = io.BytesIO(b"abc") + second = io.BytesIO(b"xyz") + third = io.BytesIO(b"456") + explicit = io.BytesIO(b"123") + + assembly.add_file(first) + assembly.add_file(second) + assembly.add_file(third) + assembly.add_file(explicit, "explicit") + + self.assertIs(assembly.files["file"], first) + self.assertIs(assembly.files["file_1"], second) + self.assertIs(assembly.files["file_2"], third) + self.assertIs(assembly.files["explicit"], explicit) + + assembly.remove_file("explicit") + self.assertIsNone(assembly.files.get("explicit")) + + first.read(1) + second.read(2) + positions, missing = assembly._snapshot_file_positions() + self.assertEqual(positions["file"], 1) + self.assertEqual(positions["file_1"], 2) + self.assertEqual(missing, []) + + first.read(1) + second.read(1) + assembly._rewind_files(positions) + self.assertEqual(first.tell(), 1) + self.assertEqual(second.tell(), 2) + + broken = _BrokenStream() + assembly.files["broken"] = broken + positions, missing = assembly._snapshot_file_positions() + self.assertNotIn("broken", positions) + self.assertEqual(missing, ["broken"]) + + assembly._rewind_files({"missing": 4}) + with self.assertRaises(RuntimeError): + assembly._rewind_files({"broken": 7}) diff --git a/tests/test_client.py b/tests/test_client.py index 584109a..df6ebc1 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -62,6 +62,11 @@ def assert_parity_with_node(self, url, params, message=''): if expected_url is not None: self.assertEqual(expected_url, url, message or 'URL should match Node.js reference implementation') + def test_rejects_invalid_service_url(self): + for service in ("", " ", "https://", "ftp://api2.transloadit.com"): + with self.assertRaises(ValueError): + Transloadit("key", "secret", service=service) + @requests_mock.Mocker() def test_get_assembly(self, mock): id_ = "abcdef12345" @@ -72,6 +77,36 @@ def test_get_assembly(self, mock): self.assertEqual(response.data["ok"], "ASSEMBLY_COMPLETED") self.assertEqual(response.data["assembly_id"], "abcdef12345") + def test_quotes_path_ids(self): + with mock.patch.object(self.transloadit.request, 'get') as get_mock: + self.transloadit.get_assembly(assembly_id='assembly/with?chars') + self.transloadit.get_template('template/with?chars') + + self.assertEqual( + get_mock.call_args_list, + [ + mock.call('/assemblies/assembly%2Fwith%3Fchars'), + mock.call('/templates/template%2Fwith%3Fchars'), + ], + ) + + def test_rejects_empty_template_ids(self): + invalid_ids = ("", None) + with mock.patch.object(self.transloadit.request, "get"): + for template_id in invalid_ids: + with self.assertRaises(ValueError): + self.transloadit.get_template(template_id) + + with mock.patch.object(self.transloadit.request, "put"): + for template_id in invalid_ids: + with self.assertRaises(ValueError): + self.transloadit.update_template(template_id, {"name": "foo"}) + + with mock.patch.object(self.transloadit.request, "delete"): + for template_id in invalid_ids: + with self.assertRaises(ValueError): + self.transloadit.delete_template(template_id) + @requests_mock.Mocker() def test_list_assemblies(self, mock): url = f"{self.transloadit.service}/assemblies" @@ -274,3 +309,19 @@ def test_get_signed_smart_cdn_url(self): # For parity test, set the exact expiry time to match Node.js params['expire_at_ms'] = expiry self.assert_parity_with_node(url, params) + + def test_get_signed_smart_cdn_url_rejects_invalid_workspace_and_reserved_params(self): + client = Transloadit("test-key", "test-secret") + + for workspace in ("", "-workspace", "workspace-", "Acme Workspace", "bad.workspace"): + with self.assertRaises(ValueError): + client.get_signed_smart_cdn_url(workspace, "template", "file.jpg") + + for reserved_key in ("auth_key", "exp", "sig"): + with self.assertRaises(ValueError): + client.get_signed_smart_cdn_url( + "workspace", + "template", + "file.jpg", + {reserved_key: "override"}, + ) diff --git a/tests/test_e2e_upload.py b/tests/test_e2e_upload.py index 77042f0..f3c6ec7 100644 --- a/tests/test_e2e_upload.py +++ b/tests/test_e2e_upload.py @@ -1,8 +1,11 @@ import os from pathlib import Path +from uuid import uuid4 +from unittest import IsolatedAsyncioTestCase import pytest +from transloadit.async_client import AsyncTransloadit from transloadit.client import Transloadit @@ -17,38 +20,39 @@ def _is_enabled(): ] -def test_e2e_image_resize(): +def _get_e2e_credentials(): key = os.getenv("TRANSLOADIT_KEY") secret = os.getenv("TRANSLOADIT_SECRET") if not key or not secret: pytest.skip("TRANSLOADIT_KEY and TRANSLOADIT_SECRET must be set to run E2E tests") + return key, secret + + +def _get_fixture_path(): fixture_path = Path(__file__).resolve().parents[1] / "chameleon.jpg" if not fixture_path.exists(): pytest.skip("chameleon.jpg fixture missing; run from repository root") - client = Transloadit(key, secret) + return fixture_path - assembly = client.new_assembly() - with fixture_path.open("rb") as upload: - assembly.add_file(upload) - assembly.add_step( - "resize", - "/image/resize", - { - "use": ":original", - "width": 128, - "height": 128, - "resize_strategy": "fit", - "format": "png", - }, - ) +def _add_resize_step(assembly, width=128, height=128): + assembly.add_step( + "resize", + "/image/resize", + { + "use": ":original", + "width": width, + "height": height, + "resize_strategy": "fit", + "format": "png", + }, + ) - response = assembly.create(wait=True, resumable=False) - data = response.data +def _assert_e2e_image_resize(data, fixture_path, expected_field=None, expected_fields=None): assembly_ssl_url = data.get("assembly_ssl_url") or data.get("assembly_url") assembly_id = data.get("assembly_id") print(f"[python-sdk][e2e] Assembly URL: {assembly_ssl_url} (id={assembly_id})") @@ -63,6 +67,8 @@ def test_e2e_image_resize(): filename = upload_info.get("name") if filename: assert filename == fixture_path.name + if expected_field: + assert upload_info.get("field") == expected_field results = (data.get("results") or {}).get("resize") or [] assert results, f"Expected resize results in assembly response: {data}" @@ -80,8 +86,155 @@ def test_e2e_image_resize(): height = int(height) assert width and height, f"Missing dimensions in result metadata: {meta}" assert 0 < width <= 128 and 0 < height <= 128 + if expected_fields: + fields = data.get("fields") or {} + for key, value in expected_fields.items(): + assert fields.get(key) == value, f"Expected field {key}={value!r}, got {fields!r}" print( "[python-sdk][e2e] Result dimensions: " f"{width}x{height}, ssl_url={ssl_url}, basename={upload_info.get('basename')}, " f"filename={upload_info.get('name')}" ) + + +def _extract_template_id(data): + template_id = data.get("id") or data.get("template_id") + assert template_id, f"Template response did not contain an id: {data}" + return template_id + + +def _extract_template_name(data): + return data.get("name") or data.get("template_name") + + +def _extract_template_content(data): + return data.get("content") or data.get("template_content") or data.get("template") + + +def _assert_template(data, expected_name, expected_width): + assert _extract_template_name(data) == expected_name, data + content = _extract_template_content(data) + assert isinstance(content, dict), data + steps = content.get("steps") or {} + resize = steps.get("resize") or {} + assert resize.get("robot") == "/image/resize", data + assert int(resize.get("width")) == expected_width, data + + +def test_e2e_image_resize(): + key, secret = _get_e2e_credentials() + fixture_path = _get_fixture_path() + client = Transloadit(key, secret) + + assembly = client.new_assembly() + + with fixture_path.open("rb") as upload: + assembly.add_file(upload) + _add_resize_step(assembly) + response = assembly.create(wait=True, resumable=False) + + _assert_e2e_image_resize(response.data, fixture_path) + + +def test_e2e_resumable_image_resize(): + key, secret = _get_e2e_credentials() + fixture_path = _get_fixture_path() + expected_fields = {"python_sdk_e2e": "sync-resumable"} + client = Transloadit(key, secret) + + assembly = client.new_assembly(params={"fields": expected_fields}) + + with fixture_path.open("rb") as upload: + assembly.add_file(upload, "resumable_file") + _add_resize_step(assembly) + response = assembly.create(wait=True, resumable=True) + + _assert_e2e_image_resize( + response.data, + fixture_path, + expected_field="resumable_file", + expected_fields=expected_fields, + ) + + +class TestAsyncE2EUpload(IsolatedAsyncioTestCase): + async def test_e2e_image_resize(self): + key, secret = _get_e2e_credentials() + fixture_path = _get_fixture_path() + + async with AsyncTransloadit(key, secret) as client: + assembly = client.new_assembly() + + with fixture_path.open("rb") as upload: + assembly.add_file(upload) + _add_resize_step(assembly) + response = await assembly.create(wait=True, resumable=False) + + _assert_e2e_image_resize(response.data, fixture_path) + + async def test_e2e_resumable_image_resize(self): + key, secret = _get_e2e_credentials() + fixture_path = _get_fixture_path() + expected_fields = {"python_sdk_e2e": "async-resumable"} + + async with AsyncTransloadit(key, secret) as client: + assembly = client.new_assembly(params={"fields": expected_fields}) + + with fixture_path.open("rb") as upload: + assembly.add_file(upload, "async_resumable_file") + _add_resize_step(assembly) + response = await assembly.create(wait=True, resumable=True) + + _assert_e2e_image_resize( + response.data, + fixture_path, + expected_field="async_resumable_file", + expected_fields=expected_fields, + ) + + async def test_e2e_template_lifecycle(self): + key, secret = _get_e2e_credentials() + template_name = f"python-sdk-e2e-{uuid4().hex[:12]}" + updated_name = f"{template_name}-updated" + template_id = None + deleted = False + + async with AsyncTransloadit(key, secret) as client: + try: + template = client.new_template(template_name) + _add_resize_step(template, width=64, height=64) + created = await template.create() + + template_id = _extract_template_id(created.data) + fetched = await client.get_template(template_id) + _assert_template(fetched.data, template_name, 64) + + updated = await client.update_template( + template_id, + { + "name": updated_name, + "template": { + "steps": { + "resize": { + "robot": "/image/resize", + "use": ":original", + "width": 96, + "height": 96, + "resize_strategy": "fit", + "format": "jpg", + }, + }, + }, + }, + ) + assert updated.data.get("ok") == "TEMPLATE_UPDATED", updated.data + + refetched = await client.get_template(template_id) + _assert_template(refetched.data, updated_name, 96) + + deleted_response = await client.delete_template(template_id) + assert deleted_response.data.get("ok") == "TEMPLATE_DELETED", deleted_response.data + deleted = True + finally: + if template_id and not deleted: + await client.delete_template(template_id) diff --git a/tests/test_examples.py b/tests/test_examples.py new file mode 100644 index 0000000..25a090b --- /dev/null +++ b/tests/test_examples.py @@ -0,0 +1,117 @@ +import os +import runpy +import subprocess +import sys +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parents[1] +EXAMPLES_ROOT = REPO_ROOT / "examples" +QUICKSTART_EXAMPLES = [ + "image_resize.py", + "async_image_resize.py", + "resumable_upload.py", + "assembly_with_template.py", + "template_lifecycle.py", + "async_template_lifecycle.py", + "smart_cdn_url.py", +] + + +def _is_e2e_enabled(): + flag = os.getenv("PYTHON_SDK_E2E", "") + return flag.lower() in {"1", "true", "yes", "on"} + + +def _has_credentials(): + return bool(os.getenv("TRANSLOADIT_KEY") and os.getenv("TRANSLOADIT_SECRET")) + + +def test_examples_import_without_side_effects(): + for example_path in sorted(EXAMPLES_ROOT.glob("*.py")): + if example_path.name == "__init__.py": + continue + runpy.run_path(str(example_path), run_name="__example_import__") + + +def test_smart_cdn_example_runs_without_network(): + env = { + **os.environ, + "PYTHONPATH": str(REPO_ROOT), + "TRANSLOADIT_KEY": "test-key", + "TRANSLOADIT_SECRET": "test-secret", + "TRANSLOADIT_WORKSPACE": "workspace", + "TRANSLOADIT_TEMPLATE": "template", + "TRANSLOADIT_INPUT": "image.jpg", + } + result = subprocess.run( + [sys.executable, "examples/smart_cdn_url.py"], + cwd=REPO_ROOT, + env=env, + text=True, + capture_output=True, + check=True, + ) + + assert result.stdout.startswith("https://workspace.tlcdn.com/template/image.jpg?") + assert "sig=sha256%3A" in result.stdout + + +def test_video_translator_download_uses_timeout(tmp_path, monkeypatch): + namespace = runpy.run_path( + str(EXAMPLES_ROOT / "video_translator.py"), + run_name="__example_import__", + ) + calls = [] + + class _Response: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def read(self): + return b'{"words":[]}' + + def urlopen(url, timeout): + calls.append((url, timeout)) + return _Response() + + monkeypatch.setattr(namespace["urllib"].request, "urlopen", urlopen) + output_path = tmp_path / "transcribe_json.json" + + namespace["download_url"]("https://example.com/transcribe_json.json", output_path, timeout=12) + + assert calls == [("https://example.com/transcribe_json.json", 12)] + assert output_path.read_text() == '{"words":[]}' + + +@pytest.mark.e2e +@pytest.mark.skipif(not _is_e2e_enabled(), reason="Set PYTHON_SDK_E2E=1 to run live examples") +@pytest.mark.skipif( + not _has_credentials(), + reason="TRANSLOADIT_KEY and TRANSLOADIT_SECRET must be set to run live examples", +) +@pytest.mark.parametrize("example_name", QUICKSTART_EXAMPLES) +def test_quickstart_example_runs_against_transloadit(example_name): + env = { + **os.environ, + "PYTHONPATH": str(REPO_ROOT), + } + result = subprocess.run( + [sys.executable, f"examples/{example_name}"], + cwd=REPO_ROOT, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, ( + f"{example_name} failed with exit code {result.returncode}\n" + f"STDOUT:\n{result.stdout}\n" + f"STDERR:\n{result.stderr}" + ) + assert result.stdout.strip(), f"{example_name} completed without printing output" diff --git a/tests/test_request.py b/tests/test_request.py index 5debdb4..e3daaaa 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -1,3 +1,4 @@ +import json import unittest import urllib.parse @@ -38,6 +39,52 @@ def test_post(self, mock): response = self.request.post("/foo", data={"foo": "bar"}) self.assertEqual(response.data["ok"], "it works") + def test_payload_preserves_custom_auth_constraints(self): + payload = self.request._to_payload( + { + "auth": { + "max_size": 1024, + "referer": "https://example.com", + }, + "foo": "bar", + } + ) + + params = json.loads(payload["params"]) + self.assertEqual(params["auth"]["key"], "key") + self.assertIn("expires", params["auth"]) + self.assertEqual(params["auth"]["max_size"], 1024) + self.assertEqual(params["auth"]["referer"], "https://example.com") + + def test_payload_rejects_malformed_auth_constraints(self): + with self.assertRaises(ValueError): + self.request._to_payload({"auth": "not-a-dict"}) + + def test_full_url_allows_explicit_absolute_urls(self): + self.assertEqual( + self.request._get_full_url(f"{self.transloadit.service}/foo"), + f"{self.transloadit.service}/foo", + ) + self.assertEqual( + self.request._get_full_url("https://api2-region.transloadit.com/foo"), + "https://api2-region.transloadit.com/foo", + ) + self.assertEqual( + self.request._get_full_url("https://example.com/foo"), + "https://example.com/foo", + ) + + @requests_mock.Mocker() + def test_external_absolute_url_does_not_receive_signed_payload(self, mock): + url = "https://example.com/foo" + mock.get(url, text='{"ok": true}') + + response = self.request.get(url) + + self.assertTrue(response.data["ok"]) + self.assertNotIn("params", mock.last_request.qs) + self.assertNotIn("signature", mock.last_request.qs) + @requests_mock.Mocker() def test_put(self, mock): url = f"{self.transloadit.service}/foo" diff --git a/tests/test_response.py b/tests/test_response.py new file mode 100644 index 0000000..c5b28d3 --- /dev/null +++ b/tests/test_response.py @@ -0,0 +1,63 @@ +import unittest +from unittest import mock + +from transloadit.response import Response, _MISSING + + +class ResponseTest(unittest.TestCase): + def test_response_data_is_assignable_and_eager_for_sync_responses(self): + raw = mock.Mock() + raw.json.return_value = {"ok": "original"} + raw.status_code = 200 + raw.headers = {"X-Test": "1"} + + response = Response(raw) + + raw.json.assert_called_once() + + response.data = {"ok": "changed"} + + self.assertEqual(response.data, {"ok": "changed"}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.headers, {"X-Test": "1"}) + + def test_response_uses_text_fallback_for_sync_non_json_responses(self): + raw = mock.Mock() + raw.json.side_effect = ValueError("not json") + raw.text = "bad gateway" + raw.status_code = 502 + raw.headers = {"Content-Type": "text/html"} + + response = Response(raw) + + self.assertEqual(response.data, "bad gateway") + self.assertEqual(response.status_code, 502) + + def test_response_lazily_rehydrates_data_when_missing(self): + raw = mock.Mock() + raw.json.return_value = {"ok": "lazy"} + raw.status_code = 204 + raw.headers = {"X-Test": "1"} + + response = Response() + response._response = raw + response._data = _MISSING + + self.assertEqual(response.data, {"ok": "lazy"}) + raw.json.assert_called_once() + + def test_response_supports_async_preloaded_values_and_empty_default(self): + empty = Response() + self.assertIsNone(empty.data) + self.assertIsNone(empty.status_code) + self.assertIsNone(empty.headers) + + response = Response( + data={"ok": "async"}, + status_code=202, + headers={"X-Test": "1"}, + ) + + self.assertEqual(response.data, {"ok": "async"}) + self.assertEqual(response.status_code, 202) + self.assertEqual(response.headers, {"X-Test": "1"}) diff --git a/tests/test_template.py b/tests/test_template.py index 87712a5..3db4b62 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -15,7 +15,10 @@ def setUp(self): @requests_mock.Mocker() def test_save(self, mock): url = f"{self.transloadit.service}/templates" - sub_body = '"robot": "/image/resize"' + sub_body = ( + '"template": {"steps": {"resize": ' + '{"width": 70, "height": 70, "robot": "/image/resize"}}}' + ) mock.post( url, text='{"ok":"TEMPLATE_CREATED","template_name":"foo"}', diff --git a/transloadit/api_url.py b/transloadit/api_url.py new file mode 100644 index 0000000..192062d --- /dev/null +++ b/transloadit/api_url.py @@ -0,0 +1,47 @@ +from urllib.parse import urlparse + + +def normalize_service_url(service): + if not isinstance(service, str): + raise ValueError("service must be a URL string.") + + normalized_service = service.strip() + if not normalized_service: + raise ValueError("service must be a non-empty URL.") + if "://" in normalized_service and not normalized_service.startswith( + ("http://", "https://") + ): + raise ValueError("service must use http or https.") + if not normalized_service.startswith(("http://", "https://")): + normalized_service = "https://" + normalized_service + + parsed_service = urlparse(normalized_service) + if parsed_service.scheme not in {"http", "https"} or not parsed_service.netloc: + raise ValueError("service must include a valid host.") + return normalized_service + + +def require_path_id(value, name): + if value is None or not str(value).strip(): + raise ValueError(f"{name} cannot be empty.") + return str(value) + + +def should_sign_api_url(url, service): + if not url.startswith(("http://", "https://")): + return True + + parsed_url = urlparse(url) + parsed_service = urlparse(service) + # Only same-service URLs and Transloadit API regional hosts may receive auth params. + if ( + parsed_url.scheme == parsed_service.scheme + and parsed_url.netloc == parsed_service.netloc + ): + return True + + hostname = parsed_url.hostname or "" + return parsed_url.scheme == "https" and ( + hostname == "api2.transloadit.com" + or (hostname.startswith("api2-") and hostname.endswith(".transloadit.com")) + ) diff --git a/transloadit/assembly.py b/transloadit/assembly.py index 7b7d674..3aecb72 100644 --- a/transloadit/assembly.py +++ b/transloadit/assembly.py @@ -1,9 +1,9 @@ -import os from time import sleep from tusclient import client as tus from . import optionbuilder +from .upload import get_upload_filename class Assembly(optionbuilder.OptionBuilder): @@ -69,7 +69,7 @@ def _do_tus_upload(self, assembly_url, tus_url, retries): metadata = {"assembly_url": assembly_url} for key in self.files: metadata["fieldname"] = key - metadata["filename"] = os.path.basename(self.files[key].name) + metadata["filename"] = get_upload_filename(self.files[key], key) tus_client.uploader( file_stream=self.files[key], chunk_size=5 * 1024 * 1024, @@ -96,16 +96,28 @@ def create(self, wait=False, resumable=True, retries=3): response = self.transloadit.request.post( "/assemblies", extra_data=extra_data, data=data ) - self._do_tus_upload( - response.data.get("assembly_ssl_url"), - response.data.get("tus_url"), - retries, - ) else: response = self.transloadit.request.post( "/assemblies", data=data, files=self.files ) + if self._rate_limit_reached(response) and retries: + # wait till rate limit is expired + sleep(response.data.get("info", {}).get("retryIn", 1)) + return self.create(wait, resumable, retries - 1) + + if resumable and isinstance(response.data, dict): + if response.data.get("error") is not None: + return response + if self.files: + assembly_url = response.data.get("assembly_ssl_url") + tus_url = response.data.get("tus_url") + if not assembly_url or not tus_url: + raise RuntimeError( + f"Resumable assembly response is missing upload URLs: {response.data!r}" + ) + self._do_tus_upload(assembly_url, tus_url, retries) + if wait: while not self._assembly_finished(response): # if a wait period is provided by the API due to polling @@ -117,11 +129,6 @@ def create(self, wait=False, resumable=True, retries=3): assembly_url=response.data.get("assembly_ssl_url") ) - if self._rate_limit_reached(response) and retries: - # wait till rate limit is expired - sleep(response.data.get("info", {}).get("retryIn", 1)) - return self.create(wait, resumable, retries - 1) - return response def _assembly_finished(self, response): @@ -135,4 +142,7 @@ def _assembly_finished(self, response): return is_aborted or is_canceled or is_completed or (is_failed and not is_fetch_rate_limit) def _rate_limit_reached(self, response): - return response.data.get("error") == "RATE_LIMIT_REACHED" + return ( + isinstance(response.data, dict) + and response.data.get("error") == "RATE_LIMIT_REACHED" + ) diff --git a/transloadit/async_assembly.py b/transloadit/async_assembly.py new file mode 100644 index 0000000..18ac00a --- /dev/null +++ b/transloadit/async_assembly.py @@ -0,0 +1,216 @@ +import asyncio +import math + +from tusclient import client as tus + +from . import optionbuilder +from .upload import get_upload_filename + + +class AsyncAssembly(optionbuilder.OptionBuilder): + """ + Object representation of a new Assembly to be created asynchronously. + """ + + def __init__(self, transloadit, files=None, options=None): + super().__init__(options) + self.transloadit = transloadit + self.files = files or {} + + def add_file(self, file_stream, field_name=None): + """ + Add a file to be uploaded along with the Assembly. + """ + if field_name is None: + field_name = self._get_field_name() + + self.files[field_name] = file_stream + + def _get_field_name(self): + name = "file" + if name not in self.files: + return name + + counter = 1 + while f"{name}_{counter}" in self.files: + counter += 1 + return f"{name}_{counter}" + + def remove_file(self, field_name): + """ + Remove the file with the specified field name from the set of files to be submitted. + """ + self.files.pop(field_name) + + def _snapshot_file_positions(self): + positions = {} + missing = [] + for key, file_stream in self.files.items(): + try: + positions[key] = file_stream.tell() + except (AttributeError, OSError, ValueError): + missing.append(key) + return positions, missing + + def _rewind_files(self, positions): + for key, position in positions.items(): + file_stream = self.files.get(key) + if file_stream is None: + continue + try: + file_stream.seek(position) + except (AttributeError, OSError, ValueError) as exc: + raise RuntimeError(f"Unable to rewind file stream {key!r}.") from exc + + def _do_tus_upload(self, assembly_url, tus_url, retries): + tus_client = tus.TusClient(tus_url) + for key, file_stream in self.files.items(): + filename = get_upload_filename(file_stream, key) + metadata = { + "assembly_url": assembly_url, + "fieldname": key, + "filename": filename, + } + tus_client.uploader( + file_stream=file_stream, + chunk_size=5 * 1024 * 1024, + metadata=metadata, + retries=retries, + ).upload() + + async def _do_tus_upload_async(self, assembly_url, tus_url, retries): + # tuspy is synchronous, so cancellation cannot abort an upload already running in + # the worker thread. Wait for the worker to release file streams before letting + # cancellation unwind caller-owned file context managers. + upload_task = asyncio.create_task( + asyncio.to_thread(self._do_tus_upload, assembly_url, tus_url, retries) + ) + try: + await asyncio.shield(upload_task) + except asyncio.CancelledError: + try: + await upload_task + except Exception: + pass + raise + + async def create(self, wait=False, resumable=True, retries=3): + """ + Save/Submit the assembly for processing. + """ + data = self.get_options() + file_positions, missing_file_positions = self._snapshot_file_positions() + tus_retries = retries + poll_retries = retries + + while True: + if resumable: + extra_data = {"tus_num_expected_upload_files": len(self.files)} + response = await self.transloadit.request.post( + "/assemblies", extra_data=extra_data, data=data + ) + else: + response = await self.transloadit.request.post( + "/assemblies", data=data, files=self.files + ) + + response_data = self._response_data(response) + if response_data is None: + if response.status_code >= 400 or wait or (resumable and self.files): + raise RuntimeError(f"Unexpected non-JSON response ({response.status_code}).") + return response + + if self._rate_limit_reached(response_data): + if retries: + if not resumable and missing_file_positions: + missing = ", ".join(repr(key) for key in missing_file_positions) + raise RuntimeError( + "Cannot retry non-resumable upload because these file streams are not seekable: " + f"{missing}" + ) + if not resumable: + self._rewind_files(file_positions) + await asyncio.sleep(self._retry_delay(response_data)) + retries -= 1 + continue + return response + + error = response_data.get("error") + assembly_url = response_data.get("assembly_ssl_url") + tus_url = response_data.get("tus_url") + + if error is not None: + return response + + if resumable and self.files: + if not assembly_url or not tus_url: + raise RuntimeError( + f"Resumable assembly response is missing upload URLs: {response_data!r}" + ) + await self._do_tus_upload_async(assembly_url, tus_url, tus_retries) + + if wait: + if not assembly_url: + return response + + poll_response = response + poll_data = response_data + remaining_rate_limit_retries = poll_retries + while not self._assembly_finished(poll_data): + if self._rate_limit_reached(poll_data): + if remaining_rate_limit_retries <= 0: + return poll_response + remaining_rate_limit_retries -= 1 + else: + remaining_rate_limit_retries = poll_retries + sleep_time = self._retry_delay(poll_data) + await asyncio.sleep(sleep_time) + poll_response = await self.transloadit.get_assembly( + assembly_url=assembly_url + ) + poll_data = self._response_data(poll_response) + if poll_data is None: + raise RuntimeError(f"Unexpected non-JSON response ({poll_response.status_code}).") + + return poll_response + + return response + + def _response_data(self, response): + data = response.data + return data if isinstance(data, dict) else None + + def _assembly_finished(self, response_data): + status = response_data.get("ok") + is_aborted = status == "REQUEST_ABORTED" + is_canceled = status == "ASSEMBLY_CANCELED" + is_completed = status == "ASSEMBLY_COMPLETED" + error = response_data.get("error") + is_failed = error is not None + is_fetch_rate_limit = error == "ASSEMBLY_STATUS_FETCHING_RATE_LIMIT_REACHED" + is_submit_rate_limit = error == "RATE_LIMIT_REACHED" + return ( + is_aborted + or is_canceled + or is_completed + or (is_failed and not (is_fetch_rate_limit or is_submit_rate_limit)) + ) + + def _rate_limit_reached(self, response_data): + error = response_data.get("error") + return isinstance(error, str) and error in { + "RATE_LIMIT_REACHED", + "ASSEMBLY_STATUS_FETCHING_RATE_LIMIT_REACHED", + } + + def _retry_delay(self, response_data): + info = response_data.get("info") + if not isinstance(info, dict): + return 1 + try: + delay = float(info.get("retryIn", 1)) + except (TypeError, ValueError): + return 1 + if not math.isfinite(delay): + return 1 + return max(delay, 0) diff --git a/transloadit/async_client.py b/transloadit/async_client.py new file mode 100644 index 0000000..683c5a7 --- /dev/null +++ b/transloadit/async_client.py @@ -0,0 +1,135 @@ +from typing import Optional +from urllib.parse import quote + +from . import async_assembly, async_request, async_template +from .api_url import normalize_service_url, require_path_id +from .smart_cdn import URL_PARAM_VALUES, build_signed_smart_cdn_url + + +def _quote_path_segment(value: str) -> str: + return quote(str(value), safe="") + + +class AsyncTransloadit: + """ + Asynchronous client interface to the Transloadit API. + """ + + def __init__( + self, + auth_key: str, + auth_secret: str, + service: str = "https://api2.transloadit.com", + duration: int = 300, + session=None, + ): + self.service = normalize_service_url(service) + self.auth_key = auth_key + self.auth_secret = auth_secret + self.duration = duration + self.request = async_request.AsyncRequest(self, session=session) + + async def __aenter__(self): + await self.request._ensure_session() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.aclose() + + async def aclose(self): + await self.request.aclose() + + async def close(self): + await self.aclose() + + def new_assembly(self, params: dict = None) -> async_assembly.AsyncAssembly: + """ + Return an instance of . + """ + return async_assembly.AsyncAssembly(self, options=params) + + async def get_assembly(self, assembly_id: str = None, assembly_url: str = None): + """ + Get the assembly specified by the 'assembly_id' or the 'assembly_url'. + """ + if not (assembly_id or assembly_url): + raise ValueError("Either 'assembly_id' or 'assembly_url' cannot be None.") + + url = assembly_url if assembly_url else f"/assemblies/{_quote_path_segment(assembly_id)}" + return await self.request.get(url) + + async def list_assemblies(self, params: dict = None): + """ + Get the list of assemblies. + """ + return await self.request.get("/assemblies", params=params) + + async def cancel_assembly(self, assembly_id: str = None, assembly_url: str = None): + """ + Cancel the assembly specified by the 'assembly_id' or the 'assembly_url'. + """ + if not (assembly_id or assembly_url): + raise ValueError("Either 'assembly_id' or 'assembly_url' cannot be None.") + + url = assembly_url if assembly_url else f"/assemblies/{_quote_path_segment(assembly_id)}" + return await self.request.delete(url) + + async def get_template(self, template_id: str): + """ + Get the template specified by the 'template_id'. + """ + template_id = require_path_id(template_id, "template_id") + return await self.request.get(f"/templates/{_quote_path_segment(template_id)}") + + async def list_templates(self, params: Optional[dict] = None): + """ + Get the list of templates. + """ + return await self.request.get("/templates", params=params) + + def new_template(self, name: str, params: Optional[dict] = None) -> async_template.AsyncTemplate: + """ + Return an instance of . + """ + return async_template.AsyncTemplate(self, name, options=params) + + async def update_template(self, template_id: str, data: dict): + """ + Update the template specified by the 'template_id'. + """ + template_id = require_path_id(template_id, "template_id") + return await self.request.put(f"/templates/{_quote_path_segment(template_id)}", data=data) + + async def delete_template(self, template_id: str): + """ + Delete the template specified by the 'template_id'. + """ + template_id = require_path_id(template_id, "template_id") + return await self.request.delete(f"/templates/{_quote_path_segment(template_id)}") + + async def get_bill(self, month: int, year: int): + """ + Get the bill for the specified month and year. + """ + return await self.request.get(f"/bill/{year}-{month:02d}") + + def get_signed_smart_cdn_url( + self, + workspace: str, + template: str, + input: str, + url_params: Optional[dict[str, URL_PARAM_VALUES]] = None, + expires_at_ms: Optional[int] = None, + ) -> str: + """ + Construct a signed Smart CDN URL. + """ + return build_signed_smart_cdn_url( + auth_key=self.auth_key, + auth_secret=self.auth_secret, + workspace=workspace, + template=template, + input=input, + url_params=url_params, + expires_at_ms=expires_at_ms, + ) diff --git a/transloadit/async_request.py b/transloadit/async_request.py new file mode 100644 index 0000000..65f8495 --- /dev/null +++ b/transloadit/async_request.py @@ -0,0 +1,270 @@ +import asyncio +import copy +import hashlib +import hmac +import io +import json +import mimetypes +import os +from datetime import datetime, timedelta, timezone +from types import MappingProxyType + +import aiohttp +from requests.structures import CaseInsensitiveDict + +from . import __version__ +from .api_url import should_sign_api_url +from .response import Response +from .upload import get_upload_filename + +TIMEOUT = 60 + + +class _NonClosingUploadStream(io.IOBase): + def __init__(self, file_stream): + self._file_stream = file_stream + + @property + def name(self): + return getattr(self._file_stream, "name", None) + + def close(self): + pass + + def fileno(self): + return self._file_stream.fileno() + + def read(self, *args): + return self._file_stream.read(*args) + + def readable(self): + readable = getattr(self._file_stream, "readable", None) + if callable(readable): + try: + return readable() + except (OSError, ValueError): + return False + return hasattr(self._file_stream, "read") + + def readline(self, *args): + return self._file_stream.readline(*args) + + def readlines(self, *args): + return self._file_stream.readlines(*args) + + def seek(self, *args): + return self._file_stream.seek(*args) + + def seekable(self): + seekable = getattr(self._file_stream, "seekable", None) + if callable(seekable): + try: + return seekable() + except (OSError, ValueError): + return False + return hasattr(self._file_stream, "seek") + + def tell(self): + return self._file_stream.tell() + + +class AsyncRequest: + """ + Transloadit tailored asynchronous HTTP request object. + """ + + HEADERS = MappingProxyType({"Transloadit-Client": "python-sdk:" + __version__}) + + def __init__(self, transloadit, session=None): + self.transloadit = transloadit + self._session = session + self._owns_session = session is None + self._session_lock = None + + @property + def session(self): + return self._session + + def _headers(self): + return dict(self.HEADERS) + + def _get_session_lock(self): + if self._session_lock is None: + # Create the lock lazily so the client can be instantiated before the loop starts. + self._session_lock = asyncio.Lock() + return self._session_lock + + async def _ensure_session(self): + if self._session is not None and not self._session.closed: + return self._session + async with self._get_session_lock(): + if self._session is None: + self._session = aiohttp.ClientSession(trust_env=True) + self._owns_session = True + elif self._session.closed: + if self._owns_session: + self._session = aiohttp.ClientSession(trust_env=True) + else: + raise RuntimeError("Injected aiohttp session is closed.") + return self._session + + async def aclose(self): + async with self._get_session_lock(): + if self._session is not None and not self._session.closed and self._owns_session: + await self._session.close() + self._session = None + + def _timeout(self): + # Keep total disabled for large request bodies, but still cap stalled responses. + return aiohttp.ClientTimeout( + total=None, + sock_connect=TIMEOUT, + sock_read=TIMEOUT, + ) + + def _normalize_payload(self, data): + normalized = [] + for key, value in data.items(): + if value is None: + continue + values = value if isinstance(value, (list, tuple)) else [value] + for item in values: + if item is None: + continue + normalized.append((key, str(item))) + return normalized + + async def _read_response_data(self, response): + try: + return await response.json(content_type=None) + except (aiohttp.ContentTypeError, json.JSONDecodeError, UnicodeDecodeError): + try: + return await response.text() + except UnicodeDecodeError: + return await response.read() + + async def get(self, path, params=None): + """ + Makes an asynchronous HTTP GET request. + """ + url = self._get_full_url(path) + session = await self._ensure_session() + async with session.get( + url, + params=self._to_request_payload(url, params), + headers=self._headers(), + timeout=self._timeout(), + ) as response: + return Response( + data=await self._read_response_data(response), + status_code=response.status, + headers=CaseInsensitiveDict(response.headers), + ) + + async def post(self, path, data=None, extra_data=None, files=None): + """ + Makes an asynchronous HTTP POST request. + """ + url = self._get_full_url(path) + session = await self._ensure_session() + data = self._to_request_payload(url, data) or {} + if extra_data: + data.update(extra_data) + + if files: + form = aiohttp.FormData() + for key, value in self._normalize_payload(data): + form.add_field(key, value) + + for key, file_stream in files.items(): + filename = get_upload_filename(file_stream, key) + content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" + form.add_field( + key, + _NonClosingUploadStream(file_stream), + filename=filename, + content_type=content_type, + ) + payload = form + else: + payload = self._normalize_payload(data) + + async with session.post( + url, + data=payload, + headers=self._headers(), + timeout=self._timeout(), + ) as response: + return Response( + data=await self._read_response_data(response), + status_code=response.status, + headers=CaseInsensitiveDict(response.headers), + ) + + async def put(self, path, data=None): + """ + Makes an asynchronous HTTP PUT request. + """ + url = self._get_full_url(path) + session = await self._ensure_session() + data = self._normalize_payload(self._to_request_payload(url, data) or {}) + async with session.put( + url, + data=data, + headers=self._headers(), + timeout=self._timeout(), + ) as response: + return Response( + data=await self._read_response_data(response), + status_code=response.status, + headers=CaseInsensitiveDict(response.headers), + ) + + async def delete(self, path, data=None): + """ + Makes an asynchronous HTTP DELETE request. + """ + url = self._get_full_url(path) + session = await self._ensure_session() + data = self._normalize_payload(self._to_request_payload(url, data) or {}) + async with session.delete( + url, + data=data, + headers=self._headers(), + timeout=self._timeout(), + ) as response: + return Response( + data=await self._read_response_data(response), + status_code=response.status, + headers=CaseInsensitiveDict(response.headers), + ) + + def _to_payload(self, data): + data = copy.deepcopy(data or {}) + expiry = datetime.now(timezone.utc) + timedelta(seconds=self.transloadit.duration) + if "auth" in data and not isinstance(data["auth"], dict): + raise ValueError("auth must be a dictionary when provided.") + auth = data.get("auth") or {} + auth.update({ + "key": self.transloadit.auth_key, + "expires": expiry.strftime("%Y/%m/%d %H:%M:%S+00:00"), + }) + data["auth"] = auth + json_data = json.dumps(data) + return {"params": json_data, "signature": self._sign_data(json_data)} + + def _to_request_payload(self, url, data): + if should_sign_api_url(url, self.transloadit.service): + return self._to_payload(data) + return copy.deepcopy(data) if data else None + + def _sign_data(self, message): + hash_string = hmac.new( + self.transloadit.auth_secret.encode("utf-8"), message.encode("utf-8"), hashlib.sha384 + ).hexdigest() + return f"sha384:{hash_string}" + + def _get_full_url(self, url): + if url.startswith(("http://", "https://")): + return url + return self.transloadit.service + url diff --git a/transloadit/async_template.py b/transloadit/async_template.py new file mode 100644 index 0000000..4925994 --- /dev/null +++ b/transloadit/async_template.py @@ -0,0 +1,24 @@ +from . import optionbuilder + + +class AsyncTemplate(optionbuilder.OptionBuilder): + """ + Object representation of a new Template to be created asynchronously. + """ + + def __init__(self, transloadit, name, options=None): + super().__init__(options) + self.transloadit = transloadit + self.name = name + + async def create(self): + """ + Save/Submit the template to the Transloadit server. + """ + data = self.get_options() + steps = data.pop("steps") + template_content = dict(data.pop("template", {}) or {}) + if steps: + template_content["steps"] = steps + data.update({"name": self.name, "template": template_content}) + return await self.transloadit.request.post("/templates", data=data) diff --git a/transloadit/client.py b/transloadit/client.py index eeeab24..854da46 100644 --- a/transloadit/client.py +++ b/transloadit/client.py @@ -1,17 +1,19 @@ import typing -import hmac -import hashlib -import time -from urllib.parse import urlencode, quote_plus - -from typing import Optional, Union, List +from typing import Optional +from urllib.parse import quote from . import assembly, request, template +from .api_url import normalize_service_url, require_path_id +from .smart_cdn import URL_PARAM_VALUES, build_signed_smart_cdn_url if typing.TYPE_CHECKING: from requests import Response +def _quote_path_segment(value: str) -> str: + return quote(str(value), safe="") + + class Transloadit: """ This class serves as a client interface to the Transloadit API. @@ -41,10 +43,7 @@ def __init__( service: str = "https://api2.transloadit.com", duration: int = 300, ): - if not service.startswith(("http://", "https://")): - service = "https://" + service - - self.service = service + self.service = normalize_service_url(service) self.auth_key = auth_key self.auth_secret = auth_secret self.duration = duration @@ -71,7 +70,7 @@ def get_assembly(self, assembly_id: str = None, assembly_url: str = None): if not (assembly_id or assembly_url): raise ValueError("Either 'assembly_id' or 'assembly_url' cannot be None.") - url = assembly_url if assembly_url else f"/assemblies/{assembly_id}" + url = assembly_url if assembly_url else f"/assemblies/{_quote_path_segment(assembly_id)}" return self.request.get(url) def list_assemblies(self, params: dict = None): @@ -101,7 +100,7 @@ def cancel_assembly(self, assembly_id: str = None, assembly_url: str = None): if not (assembly_id or assembly_url): raise ValueError("Either 'assembly_id' or 'assembly_url' cannot be None.") - url = assembly_url if assembly_url else f"/assemblies/{assembly_id}" + url = assembly_url if assembly_url else f"/assemblies/{_quote_path_segment(assembly_id)}" return self.request.delete(url) def get_template(self, template_id: str): @@ -113,7 +112,8 @@ def get_template(self, template_id: str): Return an instance of """ - return self.request.get(f"/templates/{template_id}") + template_id = require_path_id(template_id, "template_id") + return self.request.get(f"/templates/{_quote_path_segment(template_id)}") def list_templates(self, params: Optional[dict] = None): """ @@ -148,7 +148,8 @@ def update_template(self, template_id: str, data: dict): Return an instance of """ - return self.request.put(f"/templates/{template_id}", data=data) + template_id = require_path_id(template_id, "template_id") + return self.request.put(f"/templates/{_quote_path_segment(template_id)}", data=data) def delete_template(self, template_id: str): """ @@ -159,7 +160,8 @@ def delete_template(self, template_id: str): Return an instance of """ - return self.request.delete(f"/templates/{template_id}") + template_id = require_path_id(template_id, "template_id") + return self.request.delete(f"/templates/{_quote_path_segment(template_id)}") def get_bill(self, month: int, year: int): """ @@ -178,8 +180,8 @@ def get_signed_smart_cdn_url( workspace: str, template: str, input: str, - url_params: Optional[dict[str, Union[str, int, float, bool, List[Union[str, int, float, bool]], None]]] = None, - expires_at_ms: Optional[int] = None + url_params: Optional[dict[str, URL_PARAM_VALUES]] = None, + expires_at_ms: Optional[int] = None, ) -> str: """ Construct a signed Smart CDN URL. @@ -198,38 +200,12 @@ def get_signed_smart_cdn_url( :Raises: ValueError: If url_params contains values that are not strings, numbers, booleans, arrays, or None """ - workspace_slug = quote_plus(workspace) - template_slug = quote_plus(template) - input_field = quote_plus(input) - - expiry = expires_at_ms if expires_at_ms is not None else int(time.time() * 1000) + 60 * 60 * 1000 # 1 hour default - - params = [] - if url_params: - for k, v in url_params.items(): - if v is None: - continue # Skip None values - elif isinstance(v, (str, int, float, bool)): - params.append((k, str(v))) - elif isinstance(v, (list, tuple)): - params.append((k, [str(vv) for vv in v])) - else: - raise ValueError(f"URL parameter values must be strings, numbers, booleans, arrays, or None. Got {type(v)} for {k}") - - params.append(("auth_key", self.auth_key)) - params.append(("exp", str(expiry))) - - # Sort params alphabetically by key - sorted_params = sorted(params, key=lambda x: x[0]) - query_string = urlencode(sorted_params, doseq=True) - - string_to_sign = f"{workspace_slug}/{template_slug}/{input_field}?{query_string}" - algorithm = "sha256" - - signature = algorithm + ":" + hmac.new( - self.auth_secret.encode("utf-8"), - string_to_sign.encode("utf-8"), - hashlib.sha256 - ).hexdigest() - - return f"https://{workspace_slug}.tlcdn.com/{template_slug}/{input_field}?{query_string}&sig={quote_plus(signature)}" + return build_signed_smart_cdn_url( + auth_key=self.auth_key, + auth_secret=self.auth_secret, + workspace=workspace, + template=template, + input=input, + url_params=url_params, + expires_at_ms=expires_at_ms, + ) diff --git a/transloadit/request.py b/transloadit/request.py index 4cec55f..ca35236 100644 --- a/transloadit/request.py +++ b/transloadit/request.py @@ -1,13 +1,14 @@ +import copy import hashlib import hmac import json -import copy from datetime import datetime, timedelta, timezone import requests -from .response import as_response from . import __version__ +from .api_url import should_sign_api_url +from .response import as_response TIMEOUT = 60 @@ -40,9 +41,10 @@ def get(self, path, params=None): Return an instance of """ + url = self._get_full_url(path) return requests.get( - self._get_full_url(path), - params=self._to_payload(params), + url, + params=self._to_request_payload(url, params), headers=self.HEADERS, timeout=TIMEOUT, ) @@ -62,11 +64,14 @@ def post(self, path, data=None, extra_data=None, files=None): Return an instance of """ - data = self._to_payload(data) + url = self._get_full_url(path) + data = self._to_request_payload(url, data) if extra_data: + if data is None: + data = {} data.update(extra_data) return requests.post( - self._get_full_url(path), + url, data=data, files=files, headers=self.HEADERS, @@ -84,9 +89,10 @@ def put(self, path, data=None): Return an instance of """ - data = self._to_payload(data) + url = self._get_full_url(path) + data = self._to_request_payload(url, data) return requests.put( - self._get_full_url(path), + url, data=data, headers=self.HEADERS, timeout=TIMEOUT, @@ -103,9 +109,10 @@ def delete(self, path, data=None): Return an instance of """ - data = self._to_payload(data) + url = self._get_full_url(path) + data = self._to_request_payload(url, data) return requests.delete( - self._get_full_url(path), + url, data=data, headers=self.HEADERS, timeout=TIMEOUT, @@ -114,13 +121,22 @@ def delete(self, path, data=None): def _to_payload(self, data): data = copy.deepcopy(data or {}) expiry = datetime.now(timezone.utc) + timedelta(seconds=self.transloadit.duration) - data["auth"] = { + if "auth" in data and not isinstance(data["auth"], dict): + raise ValueError("auth must be a dictionary when provided.") + auth = data.get("auth") or {} + auth.update({ "key": self.transloadit.auth_key, "expires": expiry.strftime("%Y/%m/%d %H:%M:%S+00:00"), - } + }) + data["auth"] = auth json_data = json.dumps(data) return {"params": json_data, "signature": self._sign_data(json_data)} + def _to_request_payload(self, url, data): + if should_sign_api_url(url, self.transloadit.service): + return self._to_payload(data) + return copy.deepcopy(data) if data else None + def _sign_data(self, message): hash_string = hmac.new( self.transloadit.auth_secret.encode("utf-8"), message.encode("utf-8"), hashlib.sha384 diff --git a/transloadit/response.py b/transloadit/response.py index 6eb7f52..51b08f8 100644 --- a/transloadit/response.py +++ b/transloadit/response.py @@ -1,13 +1,17 @@ from functools import wraps +_MISSING = object() + + class Response: """ Transloadit http Response Object :Attributes: - - data (dict): - Dictionary representation of the returned JSON data. + - data (dict, str, bytes, None): + Parsed JSON data, text fallback, raw bytes for undecodable async + responses, or None when no response data is available. - status_code (int): HTTP response status code - headers (dict): @@ -15,17 +19,49 @@ class Response: :Constructor Args: - response (): The bare response object from the requests library. + - data (Optional[dict]): Preloaded JSON data for async responses. + - status_code (Optional[int]): Preloaded HTTP status code for async responses. + - headers (Optional[dict]): Preloaded response headers for async responses. """ - def __init__(self, response): + def __init__(self, response=None, data=_MISSING, status_code=_MISSING, headers=_MISSING): self._response = response - self.data = self._response.json() + if data is _MISSING and response is not None: + data = self._read_sync_response_data() + self._data = data + self._status_code = status_code + self._headers = headers + + def _read_sync_response_data(self): + try: + return self._response.json() + except ValueError: + try: + return self._response.text + except UnicodeDecodeError: + return self._response.content + + @property + def data(self): + if self._data is _MISSING: + if self._response is None: + return None + self._data = self._read_sync_response_data() + return self._data + + @data.setter + def data(self, value): + self._data = value @property def status_code(self): """ Return the http status code of the request. """ + if self._status_code is not _MISSING: + return self._status_code + if self._response is None: + return None return self._response.status_code @property @@ -33,6 +69,10 @@ def headers(self): """ Return the response headers. """ + if self._headers is not _MISSING: + return self._headers + if self._response is None: + return None return self._response.headers diff --git a/transloadit/smart_cdn.py b/transloadit/smart_cdn.py new file mode 100644 index 0000000..8c9880f --- /dev/null +++ b/transloadit/smart_cdn.py @@ -0,0 +1,89 @@ +import hashlib +import hmac +import re +import time +from typing import List, Optional, Tuple, Union +from urllib.parse import quote_plus, urlencode + +URL_PARAM_VALUE = Union[str, int, float, bool] +URL_PARAM_VALUES = Union[ + URL_PARAM_VALUE, + List[URL_PARAM_VALUE], + Tuple[URL_PARAM_VALUE, ...], + None, +] +RESERVED_URL_PARAMS = {"auth_key", "exp", "sig"} +WORKSPACE_SLUG_PATTERN = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?$") + + +def _stringify_url_param(value: URL_PARAM_VALUE) -> str: + if isinstance(value, bool): + return "true" if value else "false" + return str(value) + + +def _validate_workspace_slug(workspace: str) -> str: + if not WORKSPACE_SLUG_PATTERN.fullmatch(workspace): + raise ValueError( + "workspace must be a DNS-safe Smart CDN workspace slug: " + "letters, numbers, and hyphens only, without leading or trailing hyphens" + ) + return workspace + + +def build_signed_smart_cdn_url( + *, + auth_key: str, + auth_secret: str, + workspace: str, + template: str, + input: str, + url_params: Optional[dict[str, URL_PARAM_VALUES]] = None, + expires_at_ms: Optional[int] = None, +) -> str: + workspace_slug = quote_plus(_validate_workspace_slug(workspace)) + template_slug = quote_plus(template) + input_field = quote_plus(input) + + expiry = ( + expires_at_ms + if expires_at_ms is not None + else int(time.time() * 1000) + 60 * 60 * 1000 + ) + + params = [] + if url_params: + for key, value in url_params.items(): + if key.lower() in RESERVED_URL_PARAMS: + raise ValueError( + f"url_params must not include reserved Smart CDN parameter {key!r}" + ) + if value is None: + continue + if isinstance(value, (str, int, float, bool)): + params.append((key, _stringify_url_param(value))) + elif isinstance(value, (list, tuple)): + params.append((key, [_stringify_url_param(item) for item in value])) + else: + raise ValueError( + "URL parameter values must be strings, numbers, booleans, arrays, " + f"or None. Got {type(value)} for {key}" + ) + + params.append(("auth_key", auth_key)) + params.append(("exp", str(expiry))) + sorted_params = sorted(params, key=lambda item: item[0]) + query_string = urlencode(sorted_params, doseq=True) + + string_to_sign = f"{workspace_slug}/{template_slug}/{input_field}?{query_string}" + algorithm = "sha256" + signature = algorithm + ":" + hmac.new( + auth_secret.encode("utf-8"), + string_to_sign.encode("utf-8"), + hashlib.sha256, + ).hexdigest() + + return ( + f"https://{workspace_slug}.tlcdn.com/{template_slug}/{input_field}" + f"?{query_string}&sig={quote_plus(signature)}" + ) diff --git a/transloadit/template.py b/transloadit/template.py index c7254a7..90b458f 100644 --- a/transloadit/template.py +++ b/transloadit/template.py @@ -29,5 +29,9 @@ def create(self): Save/Submit the template to the Transloadit server. """ data = self.get_options() - data.update({"name": self.name}) + steps = data.pop("steps") + template_content = dict(data.pop("template", {}) or {}) + if steps: + template_content["steps"] = steps + data.update({"name": self.name, "template": template_content}) return self.transloadit.request.post("/templates", data=data) diff --git a/transloadit/upload.py b/transloadit/upload.py new file mode 100644 index 0000000..7803f68 --- /dev/null +++ b/transloadit/upload.py @@ -0,0 +1,13 @@ +import os + + +def get_upload_filename(file_stream, fallback): + name = getattr(file_stream, "name", None) + if isinstance(name, (bytes, os.PathLike)): + name = os.fsdecode(name) + + if isinstance(name, str): + filename = os.path.basename(name) + if filename: + return filename + return fallback