Skip to content

Commit 7e1220a

Browse files
authored
Merge pull request #40 from fsspec/ls_v2
use CAR requests when requesting directory listings and file info
2 parents 8991536 + 3020211 commit 7e1220a

File tree

4 files changed

+192
-13
lines changed

4 files changed

+192
-13
lines changed

ipfsspec/async_ipfs.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from multiformats import CID, multicodec
2020
from . import unixfsv1
21+
from .car import read_car
2122

2223
import logging
2324

@@ -69,20 +70,30 @@ def __str__(self):
6970
return f"GW({self.url})"
7071

7172
async def info(self, path, session):
72-
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
73+
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
7374
self._raise_not_found_for_status(res, path)
74-
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
75+
76+
roots = res.headers["X-Ipfs-Roots"].split(",")
77+
if len(roots) != len(path.split("/")):
78+
raise FileNotFoundError(path)
79+
80+
cid = CID.decode(roots[-1])
7581
resdata = await res.read()
7682

83+
_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
84+
blocks = {cid: data for cid, data, _ in blocks}
85+
block = blocks[cid]
86+
7787
if cid.codec == RawCodec:
7888
return {
7989
"name": path,
8090
"CID": str(cid),
8191
"type": "file",
82-
"size": len(resdata),
92+
"size": len(block),
8393
}
8494
elif cid.codec == DagPbCodec:
85-
node = unixfsv1.PBNode.loads(resdata)
95+
96+
node = unixfsv1.PBNode.loads(block)
8697
data = unixfsv1.Data.loads(node.Data)
8798
if data.Type == unixfsv1.DataType.Raw:
8899
raise FileNotFoundError(path) # this is not a file, it's only a part of it
@@ -133,12 +144,20 @@ async def iter_chunked(self, path, session, chunk_size):
133144
yield size, res.content.iter_chunked(chunk_size)
134145

135146
async def ls(self, path, session, detail=False):
136-
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
147+
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
137148
self._raise_not_found_for_status(res, path)
138-
resdata = await res.read()
139-
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
149+
roots = res.headers["X-Ipfs-Roots"].split(",")
150+
if len(roots) != len(path.split("/")):
151+
raise FileNotFoundError(path)
152+
153+
cid = CID.decode(roots[-1])
140154
assert cid.codec == DagPbCodec, "this is not a directory"
141-
node = unixfsv1.PBNode.loads(resdata)
155+
156+
resdata = await res.read()
157+
158+
_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
159+
blocks = {cid: data for cid, data, _ in blocks}
160+
node = unixfsv1.PBNode.loads(blocks[cid])
142161
data = unixfsv1.Data.loads(node.Data)
143162
if data.Type != unixfsv1.DataType.Directory:
144163
# TODO: we might need support for HAMTShard here (for large directories)
@@ -180,13 +199,17 @@ def gateway_from_file(gateway_path, protocol="ipfs"):
180199

181200

182201
@lru_cache
183-
def get_gateway(protocol="ipfs"):
202+
def get_gateway(protocol="ipfs", gateway_addr=None):
184203
"""
185204
Get IPFS gateway according to IPIP-280
186205
187206
see: https://github.com/ipfs/specs/pull/280
188207
"""
189208

209+
if gateway_addr:
210+
logger.debug("using IPFS gateway as specified via function argument: %s", gateway_addr)
211+
return AsyncIPFSGateway(gateway_addr, protocol)
212+
190213
# IPFS_GATEWAY environment variable should override everything
191214
ipfs_gateway = os.environ.get("IPFS_GATEWAY", "")
192215
if ipfs_gateway:
@@ -263,19 +286,20 @@ class AsyncIPFSFileSystem(AsyncFileSystem):
263286
sep = "/"
264287
protocol = "ipfs"
265288

266-
def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_options):
289+
def __init__(self, asynchronous=False, loop=None, client_kwargs=None, gateway_addr=None, **storage_options):
267290
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
268291
self._session = None
269292

270293
self.client_kwargs = client_kwargs or {}
271294
self.get_client = get_client
295+
self.gateway_addr = gateway_addr
272296

273297
if not asynchronous:
274298
sync(self.loop, self.set_session)
275299

276300
@property
277301
def gateway(self):
278-
return get_gateway(self.protocol)
302+
return get_gateway(self.protocol, gateway_addr=self.gateway_addr)
279303

280304
@staticmethod
281305
def close_session(loop, session):

ipfsspec/car.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""
2+
CAR handling functions.
3+
"""
4+
5+
from typing import List, Optional, Tuple, Union, Iterator, BinaryIO
6+
import dataclasses
7+
8+
import dag_cbor
9+
from multiformats import CID, varint, multicodec, multihash
10+
11+
from .utils import is_cid_list, StreamLike, ensure_stream
12+
13+
DagPbCodec = multicodec.get("dag-pb")
14+
Sha256Hash = multihash.get("sha2-256")
15+
16+
@dataclasses.dataclass
17+
class CARBlockLocation:
18+
varint_size: int
19+
cid_size: int
20+
payload_size: int
21+
offset: int = 0
22+
23+
@property
24+
def cid_offset(self) -> int:
25+
return self.offset + self.varint_size
26+
27+
@property
28+
def payload_offset(self) -> int:
29+
return self.offset + self.varint_size + self.cid_size
30+
31+
@property
32+
def size(self) -> int:
33+
return self.varint_size + self.cid_size + self.payload_size
34+
35+
36+
def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]:
37+
"""
38+
Decodes a CAR header and returns the list of contained roots.
39+
"""
40+
header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
41+
header = dag_cbor.decode(stream.read(header_size))
42+
if not isinstance(header, dict):
43+
raise ValueError("no valid CAR header found")
44+
if header["version"] != 1:
45+
raise ValueError("CAR is not version 1")
46+
roots = header["roots"]
47+
if not isinstance(roots, list):
48+
raise ValueError("CAR header doesn't contain roots")
49+
if not is_cid_list(roots):
50+
raise ValueError("CAR roots do not only contain CIDs")
51+
return roots, visize + header_size
52+
53+
54+
def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]:
55+
try:
56+
block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
57+
except ValueError:
58+
# stream has likely been consumed entirely
59+
return None
60+
61+
data = stream.read(block_size)
62+
# as the size of the CID is variable but not explicitly given in
63+
# the CAR format, we need to partially decode each CID to determine
64+
# its size and the location of the payload data
65+
if data[0] == 0x12 and data[1] == 0x20:
66+
# this is CIDv0
67+
cid_version = 0
68+
default_base = "base58btc"
69+
cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec
70+
hash_codec: Union[int, multihash.Multihash] = Sha256Hash
71+
cid_digest = data[2:34]
72+
data = data[34:]
73+
else:
74+
# this is CIDv1(+)
75+
cid_version, _, data = varint.decode_raw(data)
76+
if cid_version != 1:
77+
raise ValueError(f"CIDv{cid_version} is currently not supported")
78+
default_base = "base32"
79+
cid_codec, _, data = multicodec.unwrap_raw(data)
80+
hash_codec, _, data = varint.decode_raw(data)
81+
digest_size, _, data = varint.decode_raw(data)
82+
cid_digest = data[:digest_size]
83+
data = data[digest_size:]
84+
cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest))
85+
86+
if not cid.hashfun.digest(data) == cid.digest:
87+
raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified")
88+
89+
return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data))
90+
91+
92+
def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]:
93+
"""
94+
Reads a CAR.
95+
96+
Parameters
97+
----------
98+
stream_or_bytes: StreamLike
99+
Stream to read CAR from
100+
101+
Returns
102+
-------
103+
roots : List[CID]
104+
Roots as given by the CAR header
105+
blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]]
106+
Iterator over all blocks contained in the CAR
107+
"""
108+
stream = ensure_stream(stream_or_bytes)
109+
roots, header_size = decode_car_header(stream)
110+
def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]:
111+
offset = header_size
112+
while (next_block := decode_raw_car_block(stream)) is not None:
113+
cid, data, sizes = next_block
114+
yield cid, data, dataclasses.replace(sizes, offset=offset)
115+
offset += sizes.size
116+
return roots, blocks()

ipfsspec/utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""
2+
Some utilities.
3+
"""
4+
5+
from io import BytesIO
6+
from typing import List, Union, BinaryIO
7+
8+
from multiformats import CID
9+
from typing_extensions import TypeGuard
10+
11+
StreamLike = Union[BinaryIO, bytes]
12+
13+
def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO:
14+
if isinstance(stream_or_bytes, bytes):
15+
return BytesIO(stream_or_bytes)
16+
else:
17+
return stream_or_bytes
18+
19+
20+
def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]:
21+
return all(isinstance(o, CID) for o in os)

test/test_async.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ async def get_client(**kwargs):
2222

2323

2424
@pytest_asyncio.fixture
25-
async def fs(get_client):
25+
async def fs(request, get_client):
2626
AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop
27-
return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client)
27+
gateway_addr = getattr(request, "param", None)
28+
return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client, gateway_addr=gateway_addr)
2829

2930

3031
@pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"])
@@ -100,3 +101,20 @@ async def test_isfile(fs):
100101
assert res is True
101102
res = await fs._isfile(TEST_ROOT)
102103
assert res is False
104+
105+
@pytest.mark.parametrize("detail", [False, True])
106+
@pytest.mark.parametrize("fs", ["http://127.0.0.1:8080", "https://ipfs.io"], indirect=True)
107+
@pytest.mark.asyncio
108+
async def test_ls_multi_gw(fs, detail):
109+
"""
110+
Test if ls works on different gateway implementations.
111+
112+
See also: https://github.com/fsspec/ipfsspec/issues/39
113+
"""
114+
res = await fs._ls("bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4", detail=detail)
115+
expected = "bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4/welcome-to-IPFS.jpg"
116+
if detail:
117+
assert len(res) == 1
118+
assert res[0]["name"] == expected
119+
else:
120+
assert res == [expected]

0 commit comments

Comments
 (0)