Skip to content

Commit 71f4907

Browse files
authored
Correct ls() for parquet backed nested reference sets (#1645)
1 parent 1e2591c commit 71f4907

File tree

2 files changed

+54
-29
lines changed

2 files changed

+54
-29
lines changed

fsspec/implementations/reference.py

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import math
77
import os
8+
from itertools import chain
89
from functools import lru_cache
910
from typing import TYPE_CHECKING
1011

@@ -16,10 +17,10 @@
1617
if not TYPE_CHECKING:
1718
import json
1819

19-
from ..asyn import AsyncFileSystem
20-
from ..callbacks import DEFAULT_CALLBACK
21-
from ..core import filesystem, open, split_protocol
22-
from ..utils import isfilelike, merge_offset_ranges, other_paths
20+
from fsspec.asyn import AsyncFileSystem
21+
from fsspec.callbacks import DEFAULT_CALLBACK
22+
from fsspec.core import filesystem, open, split_protocol
23+
from fsspec.utils import isfilelike, merge_offset_ranges, other_paths
2324

2425
logger = logging.getLogger("fsspec.reference")
2526

@@ -131,7 +132,6 @@ def __init__(
131132
self.out_root = out_root or self.root
132133
self.cat_thresh = categorical_threshold
133134
self.cache_size = cache_size
134-
self.dirs = None
135135
self.url = self.root + "/{field}/refs.{record}.parq"
136136
# TODO: derive fs from `root`
137137
self.fs = fsspec.filesystem("file") if fs is None else fs
@@ -195,32 +195,36 @@ def create(root, storage_options=None, fs=None, record_size=10000, **kwargs):
195195
fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode())
196196
return LazyReferenceMapper(root, fs, **kwargs)
197197

198-
def listdir(self, basename=True):
198+
@lru_cache()
199+
def listdir(self):
199200
"""List top-level directories"""
200-
# cache me?
201-
if self.dirs is None:
202-
dirs = [p.split("/", 1)[0] for p in self.zmetadata]
203-
self.dirs = {p for p in dirs if p and not p.startswith(".")}
204-
listing = self.dirs
205-
if basename:
206-
listing = [os.path.basename(path) for path in listing]
207-
return listing
201+
dirs = (p.rsplit("/", 1)[0] for p in self.zmetadata if not p.startswith(".z"))
202+
return set(dirs)
208203

209204
def ls(self, path="", detail=True):
210205
"""Shortcut file listings"""
211-
if not path:
212-
dirnames = self.listdir()
213-
others = set(
214-
[".zmetadata"]
215-
+ [name for name in self.zmetadata if "/" not in name]
216-
+ [name for name in self._items if "/" not in name]
217-
)
206+
path = path.rstrip("/")
207+
pathdash = path + "/" if path else ""
208+
dirnames = self.listdir()
209+
dirs = [
210+
d
211+
for d in dirnames
212+
if d.startswith(pathdash) and "/" not in d.lstrip(pathdash)
213+
]
214+
if dirs:
215+
others = {
216+
f
217+
for f in chain(
218+
[".zmetadata"],
219+
(name for name in self.zmetadata),
220+
(name for name in self._items),
221+
)
222+
if f.startswith(pathdash) and "/" not in f.lstrip(pathdash)
223+
}
218224
if detail is False:
219-
others.update(dirnames)
225+
others.update(dirs)
220226
return sorted(others)
221-
dirinfo = [
222-
{"name": name, "type": "directory", "size": 0} for name in dirnames
223-
]
227+
dirinfo = [{"name": name, "type": "directory", "size": 0} for name in dirs]
224228
fileinfo = [
225229
{
226230
"name": name,
@@ -234,10 +238,7 @@ def ls(self, path="", detail=True):
234238
for name in others
235239
]
236240
return sorted(dirinfo + fileinfo, key=lambda s: s["name"])
237-
parts = path.split("/", 1)
238-
if len(parts) > 1:
239-
raise FileNotFoundError("Cannot list within directories right now")
240-
field = parts[0]
241+
field = path
241242
others = set(
242243
[name for name in self.zmetadata if name.startswith(f"{path}/")]
243244
+ [name for name in self._items if name.startswith(f"{path}/")]

fsspec/implementations/tests/test_reference.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,3 +759,27 @@ def test_append_parquet(lazy_refs, m):
759759
with pytest.raises(KeyError):
760760
lazy2["data/0"]
761761
assert lazy2["data/1"] == b"Adata"
762+
763+
764+
def test_deep_parq(m):
765+
zarr = pytest.importorskip("zarr")
766+
lz = fsspec.implementations.reference.LazyReferenceMapper.create(
767+
"memory://out.parq", fs=m
768+
)
769+
g = zarr.open_group(lz, mode="w")
770+
g2 = g.create_group("instant")
771+
g2.create_dataset(name="one", data=[1, 2, 3])
772+
lz.flush()
773+
774+
lz = fsspec.implementations.reference.LazyReferenceMapper("memory://out.parq", fs=m)
775+
g = zarr.open_group(lz)
776+
assert g.instant.one[:].tolist() == [1, 2, 3]
777+
assert sorted(_["name"] for _ in lz.ls("")) == [".zgroup", ".zmetadata", "instant"]
778+
assert sorted(_["name"] for _ in lz.ls("instant")) == [
779+
"instant/.zgroup",
780+
"instant/one",
781+
]
782+
assert sorted(_["name"] for _ in lz.ls("instant/one")) == [
783+
"instant/one/.zarray",
784+
"instant/one/0",
785+
]

0 commit comments

Comments
 (0)