|
| 1 | +import gzip |
| 2 | +import shutil |
| 3 | +import tarfile |
| 4 | +from io import BytesIO |
1 | 5 | from pathlib import Path |
2 | 6 |
|
3 | 7 | import netCDF4 |
|
9 | 13 |
|
10 | 14 | from processing import netcdf_comparer |
11 | 15 | from processing.netcdf_comparer import NCDiff |
| 16 | +from processing.utils import unzip_gz_file |
12 | 17 |
|
13 | 18 | test_file_path = Path(__file__).parent.absolute() |
14 | 19 |
|
@@ -614,3 +619,42 @@ def test_compression(tmp_path: Path) -> None: |
614 | 619 | nc1.createVariable("time", "f8", ("time",), zlib=False) |
615 | 620 | nc2.createVariable("time", "f8", ("time",), zlib=True) |
616 | 621 | assert netcdf_comparer.nc_difference(old_file, new_file) == NCDiff.MINOR |
| 622 | + |
| 623 | + |
| 624 | +def test_unzip_regular_gz_file(tmp_path: Path) -> None: |
| 625 | + regular_file = tmp_path / "test.txt" |
| 626 | + regular_file.write_text("This is a test file.") |
| 627 | + gz_file = tmp_path / "test.txt.gz" |
| 628 | + with open(regular_file, "rb") as f_in: |
| 629 | + with gzip.open(gz_file, "wb") as f_out: |
| 630 | + shutil.copyfileobj(f_in, f_out) |
| 631 | + |
| 632 | + result = list(unzip_gz_file(gz_file)) |
| 633 | + assert len(result) == 1 |
| 634 | + assert result[0].name == "test.txt" |
| 635 | + assert result[0].read_text() == "This is a test file." |
| 636 | + assert not gz_file.exists() # Ensure the .gz file is deleted |
| 637 | + |
| 638 | + |
| 639 | +def test_unzip_tar_gz_file(tmp_path: Path) -> None: |
| 640 | + tar_gz_file = tmp_path / "test.tar.gz" |
| 641 | + with tarfile.open(tar_gz_file, "w:gz") as tar: |
| 642 | + info = tarfile.TarInfo("subdir/test.txt") |
| 643 | + info.size = len(b"This is a test file inside a tar.") |
| 644 | + tar.addfile(info, fileobj=BytesIO(b"This is a test file inside a tar.")) |
| 645 | + |
| 646 | + result = list(unzip_gz_file(tar_gz_file)) |
| 647 | + assert len(result) == 1 |
| 648 | + assert result[0].name == "test.txt" |
| 649 | + assert result[0].read_text() == "This is a test file inside a tar." |
| 650 | + assert not tar_gz_file.exists() # Ensure the .tar.gz file is deleted |
| 651 | + |
| 652 | + |
| 653 | +def test_unzip_non_gz_file(tmp_path: Path) -> None: |
| 654 | + non_gz_file = tmp_path / "test.txt" |
| 655 | + non_gz_file.write_text("This is a non-gz file.") |
| 656 | + |
| 657 | + result = list(unzip_gz_file(non_gz_file)) |
| 658 | + assert len(result) == 1 |
| 659 | + assert result[0].name == "test.txt" |
| 660 | + assert result[0].read_text() == "This is a non-gz file." |
0 commit comments