From 1937cc8ccbb0757141feb7156b2c58c0126e8cfe Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Wed, 17 Apr 2024 16:49:50 +0200 Subject: [PATCH 1/9] Initial commit ftptls --- fsspec/implementations/ftp_tls.py | 386 ++++++++++++++++++++++++++++++ fsspec/registry.py | 1 + 2 files changed, 387 insertions(+) create mode 100644 fsspec/implementations/ftp_tls.py diff --git a/fsspec/implementations/ftp_tls.py b/fsspec/implementations/ftp_tls.py new file mode 100644 index 000000000..aa1fc0e7c --- /dev/null +++ b/fsspec/implementations/ftp_tls.py @@ -0,0 +1,386 @@ +import os +import sys +import uuid +import warnings +from ftplib import FTP_TLS, Error, error_perm +from typing import Any + +from ..spec import AbstractBufferedFile, AbstractFileSystem +from ..utils import infer_storage_options, isfilelike + + +class FTPTLSFileSystem(AbstractFileSystem): + """A filesystem over classic FTP""" + + root_marker = "/" + cachable = False + protocol = "ftp" + + def __init__( + self, + host, + port=21, + username=None, + password=None, + acct=None, + block_size=None, + tempdir=None, + timeout=30, + encoding="utf-8", + **kwargs, + ): + """ + You can use _get_kwargs_from_urls to get some kwargs from + a reasonable FTP url. + + Authentication will be anonymous if username/password are not + given. + + Parameters + ---------- + host: str + The remote server name/ip to connect to + port: int + Port to connect with + username: str or None + If authenticating, the user's identifier + password: str of None + User's password on the server, if using + acct: str or None + Some servers also need an "account" string for auth + block_size: int or None + If given, the read-ahead or write buffer size. + tempdir: str + Directory on remote to put temporary files when in a transaction + timeout: int + Timeout of the ftp connection in seconds + encoding: str + Encoding to use for directories and filenames in FTP connection + """ + super().__init__(**kwargs) + self.host = host + self.port = port + self.tempdir = tempdir or "/tmp" + self.cred = username, password, acct + self.timeout = timeout + self.encoding = encoding + if block_size is not None: + self.blocksize = block_size + else: + self.blocksize = 2**16 + self._connect() + + def _connect(self): + if sys.version_info >= (3, 9): + self.ftp = FTP_TLS(timeout=self.timeout, encoding=self.encoding) + elif self.encoding: + warnings.warn("`encoding` not supported for python<3.9, ignoring") + self.ftp = FTP_TLS(timeout=self.timeout) + else: + self.ftp = FTP_TLS(timeout=self.timeout) + self.ftp.connect(self.host, self.port) + self.ftp.login(*self.cred) + self.ftp.prot_p() + + @classmethod + def _strip_protocol(cls, path): + return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/") + + @staticmethod + def _get_kwargs_from_urls(urlpath): + out = infer_storage_options(urlpath) + out.pop("path", None) + out.pop("protocol", None) + return out + + def ls(self, path, detail=True, **kwargs): + path = self._strip_protocol(path) + out = [] + if path not in self.dircache: + try: + try: + out = [ + (fn, details) + for (fn, details) in self.ftp.mlsd(path) + if fn not in [".", ".."] + and details["type"] not in ["pdir", "cdir"] + ] + except error_perm: + out = _mlsd2(self.ftp, path) # Not platform independent + for fn, details in out: + if path == "/": + path = "" # just for forming the names, below + details["name"] = "/".join([path, fn.lstrip("/")]) + if details["type"] == "file": + details["size"] = int(details["size"]) + else: + details["size"] = 0 + if details["type"] == "dir": + details["type"] = "directory" + self.dircache[path] = out + except Error: + try: + info = self.info(path) + if info["type"] == "file": + out = [(path, info)] + except (Error, IndexError): + raise FileNotFoundError(path) + files = self.dircache.get(path, out) + if not detail: + return sorted([fn for fn, details in files]) + return [details for fn, details in files] + + def info(self, path, **kwargs): + # implement with direct method + path = self._strip_protocol(path) + if path == "/": + # special case, since this dir has no real entry + return {"name": "/", "size": 0, "type": "directory"} + files = self.ls(self._parent(path).lstrip("/"), True) + try: + out = [f for f in files if f["name"] == path][0] + except IndexError: + raise FileNotFoundError(path) + return out + + def get_file(self, rpath, lpath, **kwargs): + if self.isdir(rpath): + if not os.path.exists(lpath): + os.mkdir(lpath) + return + if isfilelike(lpath): + outfile = lpath + else: + outfile = open(lpath, "wb") + + def cb(x): + outfile.write(x) + + self.ftp.retrbinary( + f"RETR {rpath}", + blocksize=self.blocksize, + callback=cb, + ) + if not isfilelike(lpath): + outfile.close() + + def cat_file(self, path, start=None, end=None, **kwargs): + if end is not None: + return super().cat_file(path, start, end, **kwargs) + out = [] + + def cb(x): + out.append(x) + + try: + self.ftp.retrbinary( + f"RETR {path}", + blocksize=self.blocksize, + rest=start, + callback=cb, + ) + except (Error, error_perm) as orig_exc: + raise FileNotFoundError(path) from orig_exc + return b"".join(out) + + def _open( + self, + path, + mode="rb", + block_size=None, + cache_options=None, + autocommit=True, + **kwargs, + ): + path = self._strip_protocol(path) + block_size = block_size or self.blocksize + return FTPFile( + self, + path, + mode=mode, + block_size=block_size, + tempdir=self.tempdir, + autocommit=autocommit, + cache_options=cache_options, + ) + + def _rm(self, path): + path = self._strip_protocol(path) + self.ftp.delete(path) + self.invalidate_cache(self._parent(path)) + + def rm(self, path, recursive=False, maxdepth=None): + paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) + for p in reversed(paths): + if self.isfile(p): + self.rm_file(p) + else: + self.rmdir(p) + + def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: + path = self._strip_protocol(path) + parent = self._parent(path) + if parent != self.root_marker and not self.exists(parent) and create_parents: + self.mkdir(parent, create_parents=create_parents) + + self.ftp.mkd(path) + self.invalidate_cache(self._parent(path)) + + def makedirs(self, path: str, exist_ok: bool = False) -> None: + path = self._strip_protocol(path) + if self.exists(path): + # NB: "/" does not "exist" as it has no directory entry + if not exist_ok: + raise FileExistsError(f"{path} exists without `exist_ok`") + # exists_ok=True -> no-op + else: + self.mkdir(path, create_parents=True) + + def rmdir(self, path): + path = self._strip_protocol(path) + self.ftp.rmd(path) + self.invalidate_cache(self._parent(path)) + + def mv(self, path1, path2, **kwargs): + path1 = self._strip_protocol(path1) + path2 = self._strip_protocol(path2) + self.ftp.rename(path1, path2) + self.invalidate_cache(self._parent(path1)) + self.invalidate_cache(self._parent(path2)) + + def __del__(self): + self.ftp.close() + + def invalidate_cache(self, path=None): + if path is None: + self.dircache.clear() + else: + self.dircache.pop(path, None) + super().invalidate_cache(path) + + +class TransferDone(Exception): + """Internal exception to break out of transfer""" + + pass + + +class FTPFile(AbstractBufferedFile): + """Interact with a remote FTP file with read/write buffering""" + + def __init__( + self, + fs, + path, + mode="rb", + block_size="default", + autocommit=True, + cache_type="readahead", + cache_options=None, + **kwargs, + ): + super().__init__( + fs, + path, + mode=mode, + block_size=block_size, + autocommit=autocommit, + cache_type=cache_type, + cache_options=cache_options, + **kwargs, + ) + if not autocommit: + self.target = self.path + self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())]) + + def commit(self): + self.fs.mv(self.path, self.target) + + def discard(self): + self.fs.rm(self.path) + + def _fetch_range(self, start, end): + """Get bytes between given byte limits + + Implemented by raising an exception in the fetch callback when the + number of bytes received reaches the requested amount. + + Will fail if the server does not respect the REST command on + retrieve requests. + """ + out = [] + total = [0] + + def callback(x): + total[0] += len(x) + if total[0] > end - start: + out.append(x[: (end - start) - total[0]]) + if end < self.size: + raise TransferDone + else: + out.append(x) + + if total[0] == end - start and end < self.size: + raise TransferDone + + try: + self.fs.ftp.retrbinary( + f"RETR {self.path}", + blocksize=self.blocksize, + rest=start, + callback=callback, + ) + except TransferDone: + try: + # stop transfer, we got enough bytes for this block + self.fs.ftp.abort() + self.fs.ftp.getmultiline() + except Error: + self.fs._connect() + + return b"".join(out) + + def _upload_chunk(self, final=False): + self.buffer.seek(0) + self.fs.ftp.storbinary( + f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset + ) + return True + + +def _mlsd2(ftp, path="."): + """ + Fall back to using `dir` instead of `mlsd` if not supported. + + This parses a Linux style `ls -l` response to `dir`, but the response may + be platform dependent. + + Parameters + ---------- + ftp: ftplib.FTP + path: str + Expects to be given path, but defaults to ".". + """ + lines = [] + minfo = [] + ftp.dir(path, lines.append) + for line in lines: + split_line = line.split() + if len(split_line) < 9: + continue + this = ( + split_line[-1], + { + "modify": " ".join(split_line[5:8]), + "unix.owner": split_line[2], + "unix.group": split_line[3], + "unix.mode": split_line[0], + "size": split_line[4], + }, + ) + if "d" == this[1]["unix.mode"][0]: + this[1]["type"] = "dir" + else: + this[1]["type"] = "file" + minfo.append(this) + return minfo diff --git a/fsspec/registry.py b/fsspec/registry.py index 85504d6f6..b7858b6ca 100644 --- a/fsspec/registry.py +++ b/fsspec/registry.py @@ -110,6 +110,7 @@ def register_implementation(name, cls, clobber=False, errtxt=None): "file": {"class": "fsspec.implementations.local.LocalFileSystem"}, "filecache": {"class": "fsspec.implementations.cached.WholeFileCacheFileSystem"}, "ftp": {"class": "fsspec.implementations.ftp.FTPFileSystem"}, + "ftptls": {"class": "fsspec.implementations.ftp_tls.FTPTLSFileSystem"}, "gcs": { "class": "gcsfs.GCSFileSystem", "err": "Please install gcsfs to access Google Storage", From 1445ec7d139ab685af750eb62fc94dbb5e17e508 Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Wed, 17 Apr 2024 16:50:53 +0200 Subject: [PATCH 2/9] Add FTPTLS to documentation --- docs/source/api.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/source/api.rst b/docs/source/api.rst index cb14fe7e1..ae94de103 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -162,6 +162,9 @@ Built-in Implementations .. autoclass:: fsspec.implementations.ftp.FTPFileSystem :members: __init__ +.. autoclass:: fsspec.implementations.ftp.FTPTLSFileSystem + :members: __init__ + .. autoclass:: fsspec.implementations.git.GitFileSystem :members: __init__ From 2f9fe6d1ba7d16b5bfcc233aab72b0737b46e81d Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Wed, 17 Apr 2024 17:16:43 +0200 Subject: [PATCH 3/9] black --- fsspec/implementations/reference.py | 8 +++++--- fsspec/implementations/tests/test_sftp.py | 6 ++---- fsspec/implementations/tests/test_smb.py | 4 +--- fsspec/implementations/webhdfs.py | 4 +++- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index e202c96d6..c81aa8ffc 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -986,9 +986,11 @@ def _process_gen(self, gens): out = {} for gen in gens: dimension = { - k: v - if isinstance(v, list) - else range(v.get("start", 0), v["stop"], v.get("step", 1)) + k: ( + v + if isinstance(v, list) + else range(v.get("start", 0), v["stop"], v.get("step", 1)) + ) for k, v in gen["dimensions"].items() } products = ( diff --git a/fsspec/implementations/tests/test_sftp.py b/fsspec/implementations/tests/test_sftp.py index 1381fdafc..e288d1887 100644 --- a/fsspec/implementations/tests/test_sftp.py +++ b/fsspec/implementations/tests/test_sftp.py @@ -113,10 +113,8 @@ def test_get_dir(protocol, ssh, root_path, tmpdir): assert os.path.isfile(f"{path}/deeper/afile") f.get( - protocol - + "://{username}:{password}@{host}:{port}" "{root_path}".format( - root_path=root_path, **ssh - ), + protocol + "://{username}:{password}@{host}:{port}" + "{root_path}".format(root_path=root_path, **ssh), f"{path}/test2", recursive=True, ) diff --git a/fsspec/implementations/tests/test_smb.py b/fsspec/implementations/tests/test_smb.py index da65127b0..12a7933c3 100644 --- a/fsspec/implementations/tests/test_smb.py +++ b/fsspec/implementations/tests/test_smb.py @@ -49,9 +49,7 @@ def smb_params(request): stop_docker(container) cfg = "-p -u 'testuser;testpass' -s 'home;/share;no;no;no;testuser'" port = request.param if request.param is not None else default_port - img = ( - f"docker run --name {container} --detach -p 139:139 -p {port}:445 dperson/samba" # noqa: E231 E501 - ) + img = f"docker run --name {container} --detach -p 139:139 -p {port}:445 dperson/samba" # noqa: E231 E501 cmd = f"{img} {cfg}" try: cid = subprocess.check_output(shlex.split(cmd)).strip().decode() diff --git a/fsspec/implementations/webhdfs.py b/fsspec/implementations/webhdfs.py index 4bac5d51a..5a6b901d1 100644 --- a/fsspec/implementations/webhdfs.py +++ b/fsspec/implementations/webhdfs.py @@ -102,7 +102,9 @@ def __init__( if self._cached: return super().__init__(**kwargs) - self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1" # noqa + self.url = ( + f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1" # noqa + ) self.kerb = kerberos self.kerb_kwargs = kerb_kwargs or {} self.pars = {} From f3a68537674b1cbb227703a3650932700d34f625 Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Wed, 17 Apr 2024 17:19:24 +0200 Subject: [PATCH 4/9] precommit --- docs/source/index.rst | 2 +- fsspec/implementations/tests/test_sftp.py | 6 ++++-- fsspec/implementations/tests/test_smb.py | 4 +++- fsspec/implementations/webhdfs.py | 4 +--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index c04d52eae..1e103fd04 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -53,7 +53,7 @@ The following libraries use ``fsspec`` internally for path and file handling: maintainable and modular data science code #. `pyxet`_, a Python library for mounting and accessing very large datasets from XetHub -#. `Huggingface🤗 Datasets`_, a popular library to +#. `Huggingface🤗 Datasets`_, a popular library to load&manipulate data for Deep Learning models ``fsspec`` filesystems are also supported by: diff --git a/fsspec/implementations/tests/test_sftp.py b/fsspec/implementations/tests/test_sftp.py index e288d1887..1381fdafc 100644 --- a/fsspec/implementations/tests/test_sftp.py +++ b/fsspec/implementations/tests/test_sftp.py @@ -113,8 +113,10 @@ def test_get_dir(protocol, ssh, root_path, tmpdir): assert os.path.isfile(f"{path}/deeper/afile") f.get( - protocol + "://{username}:{password}@{host}:{port}" - "{root_path}".format(root_path=root_path, **ssh), + protocol + + "://{username}:{password}@{host}:{port}" "{root_path}".format( + root_path=root_path, **ssh + ), f"{path}/test2", recursive=True, ) diff --git a/fsspec/implementations/tests/test_smb.py b/fsspec/implementations/tests/test_smb.py index 12a7933c3..da65127b0 100644 --- a/fsspec/implementations/tests/test_smb.py +++ b/fsspec/implementations/tests/test_smb.py @@ -49,7 +49,9 @@ def smb_params(request): stop_docker(container) cfg = "-p -u 'testuser;testpass' -s 'home;/share;no;no;no;testuser'" port = request.param if request.param is not None else default_port - img = f"docker run --name {container} --detach -p 139:139 -p {port}:445 dperson/samba" # noqa: E231 E501 + img = ( + f"docker run --name {container} --detach -p 139:139 -p {port}:445 dperson/samba" # noqa: E231 E501 + ) cmd = f"{img} {cfg}" try: cid = subprocess.check_output(shlex.split(cmd)).strip().decode() diff --git a/fsspec/implementations/webhdfs.py b/fsspec/implementations/webhdfs.py index 5a6b901d1..4bac5d51a 100644 --- a/fsspec/implementations/webhdfs.py +++ b/fsspec/implementations/webhdfs.py @@ -102,9 +102,7 @@ def __init__( if self._cached: return super().__init__(**kwargs) - self.url = ( - f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1" # noqa - ) + self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1" # noqa self.kerb = kerberos self.kerb_kwargs = kerb_kwargs or {} self.pars = {} From c0dba5d6fe1b6842aabfbc28e6192061aaa328a7 Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Thu, 18 Apr 2024 11:04:29 +0200 Subject: [PATCH 5/9] Move prot_p and add test for ftp_tls --- fsspec/implementations/ftp_tls.py | 5 +- fsspec/implementations/tests/test_ftp_tls.py | 186 +++++++++++++++++++ 2 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 fsspec/implementations/tests/test_ftp_tls.py diff --git a/fsspec/implementations/ftp_tls.py b/fsspec/implementations/ftp_tls.py index aa1fc0e7c..3eeb05485 100644 --- a/fsspec/implementations/ftp_tls.py +++ b/fsspec/implementations/ftp_tls.py @@ -27,6 +27,7 @@ def __init__( tempdir=None, timeout=30, encoding="utf-8", + prot_p=False, **kwargs, ): """ @@ -68,7 +69,10 @@ def __init__( self.blocksize = block_size else: self.blocksize = 2**16 + self.prot_p = prot_p self._connect() + if self.prot_p: + self.ftp.prot_p() def _connect(self): if sys.version_info >= (3, 9): @@ -80,7 +84,6 @@ def _connect(self): self.ftp = FTP_TLS(timeout=self.timeout) self.ftp.connect(self.host, self.port) self.ftp.login(*self.cred) - self.ftp.prot_p() @classmethod def _strip_protocol(cls, path): diff --git a/fsspec/implementations/tests/test_ftp_tls.py b/fsspec/implementations/tests/test_ftp_tls.py new file mode 100644 index 000000000..2ff42e548 --- /dev/null +++ b/fsspec/implementations/tests/test_ftp_tls.py @@ -0,0 +1,186 @@ +import os +import subprocess +import sys +import time + +import pytest + +import fsspec +from fsspec import open_files +from fsspec.implementations.ftp_tls import FTPTLSFileSystem + +ftplib = pytest.importorskip("ftplib") +here = os.path.dirname(os.path.abspath(__file__)) + + +@pytest.fixture() +def ftp(): + pytest.importorskip("pyftpdlib") + P = subprocess.Popen( + [sys.executable, "-m", "pyftpdlib", "-d", here], + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + ) + try: + time.sleep(1) + yield "localhost", 2121 + finally: + P.terminate() + P.wait() + + +def test_basic(ftp): + host, port = ftp + fs = FTPTLSFileSystem(host, port) + assert fs.ls("/", detail=False) == sorted(os.listdir(here)) + out = fs.cat(f"/{os.path.basename(__file__)}") + assert out == open(__file__, "rb").read() + + +def test_basic_prot_p(ftp): + host, port = ftp + fs = FTPTLSFileSystem(host, port, prot_p=True) + assert fs.ls("/", detail=False) == sorted(os.listdir(here)) + out = fs.cat(f"/{os.path.basename(__file__)}") + assert out == open(__file__, "rb").read() + + +def test_not_cached(ftp): + host, port = ftp + fs = FTPTLSFileSystem(host, port) + fs2 = FTPTLSFileSystem(host, port) + assert fs is not fs2 + + +@pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) +def test_complex(ftp_writable, cache_type): + from fsspec.core import BytesCache + + host, port, user, pw = ftp_writable + files = open_files( + "ftp:///ou*", + host=host, + port=port, + username=user, + password=pw, + block_size=10000, + cache_type=cache_type, + ) + assert len(files) == 1 + with files[0] as fo: + assert fo.read(10) == b"hellohello" + if isinstance(fo.cache, BytesCache): + assert len(fo.cache.cache) == 10010 + assert fo.read(2) == b"he" + assert fo.tell() == 12 + + +def test_write_small(ftp_writable): + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw) + with fs.open("/out2", "wb") as f: + f.write(b"oi") + assert fs.cat("/out2") == b"oi" + + +def test_with_url(ftp_writable): + host, port, user, pw = ftp_writable + fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out", "wb") + with fo as f: + f.write(b"hello") + fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out", "rb") + with fo as f: + assert f.read() == b"hello" + + +@pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) +def test_write_big(ftp_writable, cache_type): + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw, block_size=1000, cache_type=cache_type) + fn = "/bigger" + with fs.open(fn, "wb") as f: + f.write(b"o" * 500) + assert not fs.exists(fn) + f.write(b"o" * 1000) + fs.invalidate_cache() + assert fs.exists(fn) + f.write(b"o" * 200) + f.flush() + + assert fs.info(fn)["size"] == 1700 + assert fs.cat(fn) == b"o" * 1700 + + +def test_transaction(ftp_writable): + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw) + fs.mkdir("/tmp") + fn = "/tr" + with fs.transaction: + with fs.open(fn, "wb") as f: + f.write(b"not") + assert not fs.exists(fn) + assert fs.exists(fn) + assert fs.cat(fn) == b"not" + + fs.rm(fn) + assert not fs.exists(fn) + + +def test_transaction_with_cache(ftp_writable, tmpdir): + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw) + fs.mkdir("/tmp") + fs.mkdir("/tmp/dir") + assert "dir" in fs.ls("/tmp", detail=False) + + with fs.transaction: + fs.rmdir("/tmp/dir") + + assert "dir" not in fs.ls("/tmp", detail=False) + assert not fs.exists("/tmp/dir") + + +def test_cat_get(ftp_writable, tmpdir): + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw, block_size=500) + fs.mkdir("/tmp") + data = b"hello" * 500 + fs.pipe("/tmp/myfile", data) + assert fs.cat_file("/tmp/myfile") == data + + fn = os.path.join(tmpdir, "lfile") + fs.get_file("/tmp/myfile", fn) + assert open(fn, "rb").read() == data + + +def test_mkdir(ftp_writable): + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw) + with pytest.raises(ftplib.error_perm): + fs.mkdir("/tmp/not/exist", create_parents=False) + fs.mkdir("/tmp/not/exist") + assert fs.exists("/tmp/not/exist") + fs.makedirs("/tmp/not/exist", exist_ok=True) + with pytest.raises(FileExistsError): + fs.makedirs("/tmp/not/exist", exist_ok=False) + fs.makedirs("/tmp/not/exist/inner/inner") + assert fs.isdir("/tmp/not/exist/inner/inner") + + +def test_rm_get_recursive(ftp_writable, tmpdir): + tmpdir = str(tmpdir) + host, port, user, pw = ftp_writable + fs = FTPTLSFileSystem(host, port, user, pw) + fs.mkdir("/tmp/topdir") + fs.mkdir("/tmp/topdir/underdir") + fs.touch("/tmp/topdir/afile") + fs.touch("/tmp/topdir/underdir/afile") + + fs.get("/tmp/topdir", tmpdir, recursive=True) + + with pytest.raises(ftplib.error_perm): + fs.rmdir("/tmp/topdir") + + fs.rm("/tmp/topdir", recursive=True) + assert not fs.exists("/tmp/topdir") From a9420212867e46e74a7664923f544725ad4b55c3 Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Thu, 18 Apr 2024 15:24:10 +0200 Subject: [PATCH 6/9] Add tests for ftp_tls (not finished yet, some are 4 could be still failing) --- fsspec/implementations/tests/ftp_tls.py | 39 +++++++ fsspec/implementations/tests/keycert.pem | 24 ++++ fsspec/implementations/tests/test_ftp_tls.py | 115 +++++++++---------- 3 files changed, 120 insertions(+), 58 deletions(-) create mode 100644 fsspec/implementations/tests/ftp_tls.py create mode 100644 fsspec/implementations/tests/keycert.pem diff --git a/fsspec/implementations/tests/ftp_tls.py b/fsspec/implementations/tests/ftp_tls.py new file mode 100644 index 000000000..5a03f3c8d --- /dev/null +++ b/fsspec/implementations/tests/ftp_tls.py @@ -0,0 +1,39 @@ +import os + +from pyftpdlib.authorizers import DummyAuthorizer +from pyftpdlib.handlers import TLS_FTPHandler +from pyftpdlib.servers import FTPServer + + +def ftp(): + # Set up FTP server parameters + FTP_HOST = "0.0.0.0" + FTP_PORT = 2121 # Choose a free port for the FTP server + FTP_DIRECTORY = os.path.dirname(__file__) + print(FTP_DIRECTORY) + + # Instantiate a dummy authorizer + authorizer = DummyAuthorizer() + authorizer.add_user( + "user", + "pass", + FTP_DIRECTORY, + "elradfmwMT", + ) + authorizer.add_anonymous(FTP_DIRECTORY) + + # Instantiate TLS_FTPHandler with required parameters + handler = TLS_FTPHandler + handler.certfile = os.path.join(os.path.dirname(__file__), "keycert.pem") + handler.authorizer = authorizer + + # Instantiate FTP server with TLS handler and authorizer + server = FTPServer((FTP_HOST, FTP_PORT), handler) + server.authorizer = authorizer + + print("ftp", FTP_HOST, "-p", FTP_PORT) + server.serve_forever() + + +if __name__ == "__main__": + ftp() diff --git a/fsspec/implementations/tests/keycert.pem b/fsspec/implementations/tests/keycert.pem new file mode 100644 index 000000000..2093f1d15 --- /dev/null +++ b/fsspec/implementations/tests/keycert.pem @@ -0,0 +1,24 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIBTg1e61mzYYPJ+MDkOWCSevnT1HUaaK9iopgTGyDoIuoAoGCCqGSM49 +AwEHoUQDQgAEDy3E+4WgohcRUlaSZBndEZQBTyoRztCSoaDbhZkqsPFBbeaGJ5zA +E7qX+9LICDezAUsCiq2RYltOqDCsELteiQ== +-----END EC PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIICdzCCAh2gAwIBAgIUNN4kmTSxbLOoQXLFiYOs2XeK1jIwCgYIKoZIzj0EAwIw +gY8xCzAJBgNVBAYTAk5MMRUwEwYDVQQIDAxadWlkLUhvbGxhbmQxDjAMBgNVBAcM +BURlbGZ0MRAwDgYDVQQKDAdXaGlmZmxlMQ0wCwYDVQQLDARERVZBMRIwEAYDVQQD +DAlCYXJ0dmFuRXMxJDAiBgkqhkiG9w0BCQEWFWJhcnQudmFuZXNAd2hpZmZsZS5u +bDAgFw0yNDA0MTgxMDI0NDFaGA8yMjk4MDIwMTEwMjQ0MVowgY8xCzAJBgNVBAYT +Ak5MMRUwEwYDVQQIDAxadWlkLUhvbGxhbmQxDjAMBgNVBAcMBURlbGZ0MRAwDgYD +VQQKDAdXaGlmZmxlMQ0wCwYDVQQLDARERVZBMRIwEAYDVQQDDAlCYXJ0dmFuRXMx +JDAiBgkqhkiG9w0BCQEWFWJhcnQudmFuZXNAd2hpZmZsZS5ubDBZMBMGByqGSM49 +AgEGCCqGSM49AwEHA0IABA8txPuFoKIXEVJWkmQZ3RGUAU8qEc7QkqGg24WZKrDx +QW3mhiecwBO6l/vSyAg3swFLAoqtkWJbTqgwrBC7XomjUzBRMB0GA1UdDgQWBBRb +1nPqritk/P2cbDzTw9SQ9vO7JDAfBgNVHSMEGDAWgBRb1nPqritk/P2cbDzTw9SQ +9vO7JDAPBgNVHRMBAf8EBTADAQH/MAoGCCqGSM49BAMCA0gAMEUCIBcvCFS4AD3p +Ix1v8pp3hcMvGFIQLeczh4kXkPfZWvBkAiEAiTCqsdKhZi8k814H6FFkaoQVIjTe +iUtUlW6RfyDNZ9E= +-----END CERTIFICATE----- diff --git a/fsspec/implementations/tests/test_ftp_tls.py b/fsspec/implementations/tests/test_ftp_tls.py index 2ff42e548..b8309c049 100644 --- a/fsspec/implementations/tests/test_ftp_tls.py +++ b/fsspec/implementations/tests/test_ftp_tls.py @@ -15,30 +15,29 @@ @pytest.fixture() def ftp(): - pytest.importorskip("pyftpdlib") P = subprocess.Popen( - [sys.executable, "-m", "pyftpdlib", "-d", here], + [sys.executable, os.path.join(os.path.dirname(__file__), "ftp_tls.py")], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, ) try: time.sleep(1) - yield "localhost", 2121 + yield "localhost", 2121, "user", "pass" finally: P.terminate() P.wait() def test_basic(ftp): - host, port = ftp - fs = FTPTLSFileSystem(host, port) + host, port, _, _ = ftp + fs = FTPTLSFileSystem(host, port, timeout=1) assert fs.ls("/", detail=False) == sorted(os.listdir(here)) out = fs.cat(f"/{os.path.basename(__file__)}") assert out == open(__file__, "rb").read() def test_basic_prot_p(ftp): - host, port = ftp + host, port, _, _ = ftp fs = FTPTLSFileSystem(host, port, prot_p=True) assert fs.ls("/", detail=False) == sorted(os.listdir(here)) out = fs.cat(f"/{os.path.basename(__file__)}") @@ -46,19 +45,19 @@ def test_basic_prot_p(ftp): def test_not_cached(ftp): - host, port = ftp + host, port, _, _ = ftp fs = FTPTLSFileSystem(host, port) fs2 = FTPTLSFileSystem(host, port) assert fs is not fs2 @pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) -def test_complex(ftp_writable, cache_type): +def test_complex(ftp, cache_type): from fsspec.core import BytesCache - host, port, user, pw = ftp_writable + host, port, user, pw = ftp files = open_files( - "ftp:///ou*", + "ftptls:///fsspec/implementations/tests/ou*", host=host, port=port, username=user, @@ -75,29 +74,29 @@ def test_complex(ftp_writable, cache_type): assert fo.tell() == 12 -def test_write_small(ftp_writable): - host, port, user, pw = ftp_writable +def test_write_small(ftp): + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw) - with fs.open("/out2", "wb") as f: + with fs.open("/out_tls2", "wb") as f: f.write(b"oi") - assert fs.cat("/out2") == b"oi" + assert fs.cat("/out_tls2") == b"oi" -def test_with_url(ftp_writable): - host, port, user, pw = ftp_writable - fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out", "wb") +def test_with_url(ftp): + host, port, user, pw = ftp + fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out_tls", "wb") with fo as f: f.write(b"hello") - fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out", "rb") + fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out_tls", "rb") with fo as f: assert f.read() == b"hello" @pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) -def test_write_big(ftp_writable, cache_type): - host, port, user, pw = ftp_writable +def test_write_big(ftp, cache_type): + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw, block_size=1000, cache_type=cache_type) - fn = "/bigger" + fn = "/bigger_tls" with fs.open(fn, "wb") as f: f.write(b"o" * 500) assert not fs.exists(fn) @@ -111,11 +110,11 @@ def test_write_big(ftp_writable, cache_type): assert fs.cat(fn) == b"o" * 1700 -def test_transaction(ftp_writable): - host, port, user, pw = ftp_writable +def test_transaction(ftp): + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw) - fs.mkdir("/tmp") - fn = "/tr" + fs.mkdir("tmp_tls") + fn = "tr" with fs.transaction: with fs.open(fn, "wb") as f: f.write(b"not") @@ -127,60 +126,60 @@ def test_transaction(ftp_writable): assert not fs.exists(fn) -def test_transaction_with_cache(ftp_writable, tmpdir): - host, port, user, pw = ftp_writable +def test_transaction_with_cache(ftp, tmpdir): + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw) - fs.mkdir("/tmp") - fs.mkdir("/tmp/dir") - assert "dir" in fs.ls("/tmp", detail=False) + fs.mkdirs("tmp_tls", exist_ok=True) + fs.mkdir("tmp_tls/dir") + assert "dir" in fs.ls("tmp_tls", detail=False) with fs.transaction: - fs.rmdir("/tmp/dir") + fs.rmdir("tmp_tls/dir") - assert "dir" not in fs.ls("/tmp", detail=False) - assert not fs.exists("/tmp/dir") + assert "dir" not in fs.ls("tmp_tls", detail=False) + assert not fs.exists("tmp_tls/dir") -def test_cat_get(ftp_writable, tmpdir): - host, port, user, pw = ftp_writable +def test_cat_get(ftp, tmpdir): + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw, block_size=500) - fs.mkdir("/tmp") + fs.mkdirs("tmp_tls", exist_ok=True) data = b"hello" * 500 - fs.pipe("/tmp/myfile", data) - assert fs.cat_file("/tmp/myfile") == data + fs.pipe("tmp_tls/myfile_tls", data) + assert fs.cat_file("tmp_tls/myfile_tls") == data fn = os.path.join(tmpdir, "lfile") - fs.get_file("/tmp/myfile", fn) + fs.get_file("tmp_tls/myfile_tls", fn) assert open(fn, "rb").read() == data -def test_mkdir(ftp_writable): - host, port, user, pw = ftp_writable +def test_mkdir(ftp): + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw) with pytest.raises(ftplib.error_perm): - fs.mkdir("/tmp/not/exist", create_parents=False) - fs.mkdir("/tmp/not/exist") - assert fs.exists("/tmp/not/exist") - fs.makedirs("/tmp/not/exist", exist_ok=True) + fs.mkdir("tmp_tls/not/exist_tls", create_parents=False) + fs.mkdir("tmp_tls/not/exist") + assert fs.exists("tmp_tls/not/exist") + fs.makedirs("tmp_tls/not/exist", exist_ok=True) with pytest.raises(FileExistsError): - fs.makedirs("/tmp/not/exist", exist_ok=False) - fs.makedirs("/tmp/not/exist/inner/inner") - assert fs.isdir("/tmp/not/exist/inner/inner") + fs.makedirs("tmp_tls/not/exist", exist_ok=False) + fs.makedirs("tmp_tls/not/exist/inner/inner") + assert fs.isdir("tmp_tls/not/exist/inner/inner") -def test_rm_get_recursive(ftp_writable, tmpdir): +def test_rm_get_recursive(ftp, tmpdir): tmpdir = str(tmpdir) - host, port, user, pw = ftp_writable + host, port, user, pw = ftp fs = FTPTLSFileSystem(host, port, user, pw) - fs.mkdir("/tmp/topdir") - fs.mkdir("/tmp/topdir/underdir") - fs.touch("/tmp/topdir/afile") - fs.touch("/tmp/topdir/underdir/afile") + fs.mkdir("tmp_tls/topdir") + fs.mkdir("tmp_tls/topdir/underdir") + fs.touch("tmp_tls/topdir/afile") + fs.touch("tmp_tls/topdir/underdir/afile") - fs.get("/tmp/topdir", tmpdir, recursive=True) + fs.get("tmp_tls/topdir", tmpdir, recursive=True) with pytest.raises(ftplib.error_perm): - fs.rmdir("/tmp/topdir") + fs.rmdir("tmp_tls/topdir") - fs.rm("/tmp/topdir", recursive=True) - assert not fs.exists("/tmp/topdir") + fs.rm("tmp_tls/topdir", recursive=True) + assert not fs.exists("tmp_tls/topdir") From 76a3d2eeae7656aeb89bc288cd7136180fb547e5 Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Mon, 22 Apr 2024 12:26:33 +0200 Subject: [PATCH 7/9] Rewrite ftp_tls to ftp --- fsspec/implementations/ftp.py | 18 +- fsspec/implementations/ftp_tls.py | 389 ------------------- fsspec/implementations/tests/ftp_tls.py | 1 - fsspec/implementations/tests/test_ftp_tls.py | 32 +- fsspec/registry.py | 1 - 5 files changed, 32 insertions(+), 409 deletions(-) delete mode 100644 fsspec/implementations/ftp_tls.py diff --git a/fsspec/implementations/ftp.py b/fsspec/implementations/ftp.py index 415f48449..41b414d0b 100644 --- a/fsspec/implementations/ftp.py +++ b/fsspec/implementations/ftp.py @@ -2,7 +2,7 @@ import sys import uuid import warnings -from ftplib import FTP, Error, error_perm +from ftplib import FTP, FTP_TLS, Error, error_perm from typing import Any from ..spec import AbstractBufferedFile, AbstractFileSystem @@ -27,6 +27,8 @@ def __init__( tempdir=None, timeout=30, encoding="utf-8", + ssl=False, + prot_p=False, **kwargs, ): """ @@ -68,16 +70,24 @@ def __init__( self.blocksize = block_size else: self.blocksize = 2**16 + self.ssl = ssl + self.prot_p = prot_p self._connect() + if self.prot_p: + self.ftp.prot_p() def _connect(self): + if self.ssl: + ftp_cls = FTP_TLS + else: + ftp_cls = FTP if sys.version_info >= (3, 9): - self.ftp = FTP(timeout=self.timeout, encoding=self.encoding) + self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding) elif self.encoding: warnings.warn("`encoding` not supported for python<3.9, ignoring") - self.ftp = FTP(timeout=self.timeout) + self.ftp = ftp_cls(timeout=self.timeout) else: - self.ftp = FTP(timeout=self.timeout) + self.ftp = ftp_cls(timeout=self.timeout) self.ftp.connect(self.host, self.port) self.ftp.login(*self.cred) diff --git a/fsspec/implementations/ftp_tls.py b/fsspec/implementations/ftp_tls.py deleted file mode 100644 index 3eeb05485..000000000 --- a/fsspec/implementations/ftp_tls.py +++ /dev/null @@ -1,389 +0,0 @@ -import os -import sys -import uuid -import warnings -from ftplib import FTP_TLS, Error, error_perm -from typing import Any - -from ..spec import AbstractBufferedFile, AbstractFileSystem -from ..utils import infer_storage_options, isfilelike - - -class FTPTLSFileSystem(AbstractFileSystem): - """A filesystem over classic FTP""" - - root_marker = "/" - cachable = False - protocol = "ftp" - - def __init__( - self, - host, - port=21, - username=None, - password=None, - acct=None, - block_size=None, - tempdir=None, - timeout=30, - encoding="utf-8", - prot_p=False, - **kwargs, - ): - """ - You can use _get_kwargs_from_urls to get some kwargs from - a reasonable FTP url. - - Authentication will be anonymous if username/password are not - given. - - Parameters - ---------- - host: str - The remote server name/ip to connect to - port: int - Port to connect with - username: str or None - If authenticating, the user's identifier - password: str of None - User's password on the server, if using - acct: str or None - Some servers also need an "account" string for auth - block_size: int or None - If given, the read-ahead or write buffer size. - tempdir: str - Directory on remote to put temporary files when in a transaction - timeout: int - Timeout of the ftp connection in seconds - encoding: str - Encoding to use for directories and filenames in FTP connection - """ - super().__init__(**kwargs) - self.host = host - self.port = port - self.tempdir = tempdir or "/tmp" - self.cred = username, password, acct - self.timeout = timeout - self.encoding = encoding - if block_size is not None: - self.blocksize = block_size - else: - self.blocksize = 2**16 - self.prot_p = prot_p - self._connect() - if self.prot_p: - self.ftp.prot_p() - - def _connect(self): - if sys.version_info >= (3, 9): - self.ftp = FTP_TLS(timeout=self.timeout, encoding=self.encoding) - elif self.encoding: - warnings.warn("`encoding` not supported for python<3.9, ignoring") - self.ftp = FTP_TLS(timeout=self.timeout) - else: - self.ftp = FTP_TLS(timeout=self.timeout) - self.ftp.connect(self.host, self.port) - self.ftp.login(*self.cred) - - @classmethod - def _strip_protocol(cls, path): - return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/") - - @staticmethod - def _get_kwargs_from_urls(urlpath): - out = infer_storage_options(urlpath) - out.pop("path", None) - out.pop("protocol", None) - return out - - def ls(self, path, detail=True, **kwargs): - path = self._strip_protocol(path) - out = [] - if path not in self.dircache: - try: - try: - out = [ - (fn, details) - for (fn, details) in self.ftp.mlsd(path) - if fn not in [".", ".."] - and details["type"] not in ["pdir", "cdir"] - ] - except error_perm: - out = _mlsd2(self.ftp, path) # Not platform independent - for fn, details in out: - if path == "/": - path = "" # just for forming the names, below - details["name"] = "/".join([path, fn.lstrip("/")]) - if details["type"] == "file": - details["size"] = int(details["size"]) - else: - details["size"] = 0 - if details["type"] == "dir": - details["type"] = "directory" - self.dircache[path] = out - except Error: - try: - info = self.info(path) - if info["type"] == "file": - out = [(path, info)] - except (Error, IndexError): - raise FileNotFoundError(path) - files = self.dircache.get(path, out) - if not detail: - return sorted([fn for fn, details in files]) - return [details for fn, details in files] - - def info(self, path, **kwargs): - # implement with direct method - path = self._strip_protocol(path) - if path == "/": - # special case, since this dir has no real entry - return {"name": "/", "size": 0, "type": "directory"} - files = self.ls(self._parent(path).lstrip("/"), True) - try: - out = [f for f in files if f["name"] == path][0] - except IndexError: - raise FileNotFoundError(path) - return out - - def get_file(self, rpath, lpath, **kwargs): - if self.isdir(rpath): - if not os.path.exists(lpath): - os.mkdir(lpath) - return - if isfilelike(lpath): - outfile = lpath - else: - outfile = open(lpath, "wb") - - def cb(x): - outfile.write(x) - - self.ftp.retrbinary( - f"RETR {rpath}", - blocksize=self.blocksize, - callback=cb, - ) - if not isfilelike(lpath): - outfile.close() - - def cat_file(self, path, start=None, end=None, **kwargs): - if end is not None: - return super().cat_file(path, start, end, **kwargs) - out = [] - - def cb(x): - out.append(x) - - try: - self.ftp.retrbinary( - f"RETR {path}", - blocksize=self.blocksize, - rest=start, - callback=cb, - ) - except (Error, error_perm) as orig_exc: - raise FileNotFoundError(path) from orig_exc - return b"".join(out) - - def _open( - self, - path, - mode="rb", - block_size=None, - cache_options=None, - autocommit=True, - **kwargs, - ): - path = self._strip_protocol(path) - block_size = block_size or self.blocksize - return FTPFile( - self, - path, - mode=mode, - block_size=block_size, - tempdir=self.tempdir, - autocommit=autocommit, - cache_options=cache_options, - ) - - def _rm(self, path): - path = self._strip_protocol(path) - self.ftp.delete(path) - self.invalidate_cache(self._parent(path)) - - def rm(self, path, recursive=False, maxdepth=None): - paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) - for p in reversed(paths): - if self.isfile(p): - self.rm_file(p) - else: - self.rmdir(p) - - def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: - path = self._strip_protocol(path) - parent = self._parent(path) - if parent != self.root_marker and not self.exists(parent) and create_parents: - self.mkdir(parent, create_parents=create_parents) - - self.ftp.mkd(path) - self.invalidate_cache(self._parent(path)) - - def makedirs(self, path: str, exist_ok: bool = False) -> None: - path = self._strip_protocol(path) - if self.exists(path): - # NB: "/" does not "exist" as it has no directory entry - if not exist_ok: - raise FileExistsError(f"{path} exists without `exist_ok`") - # exists_ok=True -> no-op - else: - self.mkdir(path, create_parents=True) - - def rmdir(self, path): - path = self._strip_protocol(path) - self.ftp.rmd(path) - self.invalidate_cache(self._parent(path)) - - def mv(self, path1, path2, **kwargs): - path1 = self._strip_protocol(path1) - path2 = self._strip_protocol(path2) - self.ftp.rename(path1, path2) - self.invalidate_cache(self._parent(path1)) - self.invalidate_cache(self._parent(path2)) - - def __del__(self): - self.ftp.close() - - def invalidate_cache(self, path=None): - if path is None: - self.dircache.clear() - else: - self.dircache.pop(path, None) - super().invalidate_cache(path) - - -class TransferDone(Exception): - """Internal exception to break out of transfer""" - - pass - - -class FTPFile(AbstractBufferedFile): - """Interact with a remote FTP file with read/write buffering""" - - def __init__( - self, - fs, - path, - mode="rb", - block_size="default", - autocommit=True, - cache_type="readahead", - cache_options=None, - **kwargs, - ): - super().__init__( - fs, - path, - mode=mode, - block_size=block_size, - autocommit=autocommit, - cache_type=cache_type, - cache_options=cache_options, - **kwargs, - ) - if not autocommit: - self.target = self.path - self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())]) - - def commit(self): - self.fs.mv(self.path, self.target) - - def discard(self): - self.fs.rm(self.path) - - def _fetch_range(self, start, end): - """Get bytes between given byte limits - - Implemented by raising an exception in the fetch callback when the - number of bytes received reaches the requested amount. - - Will fail if the server does not respect the REST command on - retrieve requests. - """ - out = [] - total = [0] - - def callback(x): - total[0] += len(x) - if total[0] > end - start: - out.append(x[: (end - start) - total[0]]) - if end < self.size: - raise TransferDone - else: - out.append(x) - - if total[0] == end - start and end < self.size: - raise TransferDone - - try: - self.fs.ftp.retrbinary( - f"RETR {self.path}", - blocksize=self.blocksize, - rest=start, - callback=callback, - ) - except TransferDone: - try: - # stop transfer, we got enough bytes for this block - self.fs.ftp.abort() - self.fs.ftp.getmultiline() - except Error: - self.fs._connect() - - return b"".join(out) - - def _upload_chunk(self, final=False): - self.buffer.seek(0) - self.fs.ftp.storbinary( - f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset - ) - return True - - -def _mlsd2(ftp, path="."): - """ - Fall back to using `dir` instead of `mlsd` if not supported. - - This parses a Linux style `ls -l` response to `dir`, but the response may - be platform dependent. - - Parameters - ---------- - ftp: ftplib.FTP - path: str - Expects to be given path, but defaults to ".". - """ - lines = [] - minfo = [] - ftp.dir(path, lines.append) - for line in lines: - split_line = line.split() - if len(split_line) < 9: - continue - this = ( - split_line[-1], - { - "modify": " ".join(split_line[5:8]), - "unix.owner": split_line[2], - "unix.group": split_line[3], - "unix.mode": split_line[0], - "size": split_line[4], - }, - ) - if "d" == this[1]["unix.mode"][0]: - this[1]["type"] = "dir" - else: - this[1]["type"] = "file" - minfo.append(this) - return minfo diff --git a/fsspec/implementations/tests/ftp_tls.py b/fsspec/implementations/tests/ftp_tls.py index 5a03f3c8d..c15c153fb 100644 --- a/fsspec/implementations/tests/ftp_tls.py +++ b/fsspec/implementations/tests/ftp_tls.py @@ -31,7 +31,6 @@ def ftp(): server = FTPServer((FTP_HOST, FTP_PORT), handler) server.authorizer = authorizer - print("ftp", FTP_HOST, "-p", FTP_PORT) server.serve_forever() diff --git a/fsspec/implementations/tests/test_ftp_tls.py b/fsspec/implementations/tests/test_ftp_tls.py index b8309c049..1ee7afd6d 100644 --- a/fsspec/implementations/tests/test_ftp_tls.py +++ b/fsspec/implementations/tests/test_ftp_tls.py @@ -7,7 +7,7 @@ import fsspec from fsspec import open_files -from fsspec.implementations.ftp_tls import FTPTLSFileSystem +from fsspec.implementations.ftp import FTPFileSystem ftplib = pytest.importorskip("ftplib") here = os.path.dirname(os.path.abspath(__file__)) @@ -30,7 +30,7 @@ def ftp(): def test_basic(ftp): host, port, _, _ = ftp - fs = FTPTLSFileSystem(host, port, timeout=1) + fs = FTPFileSystem(host, port, timeout=1, ssl=True) assert fs.ls("/", detail=False) == sorted(os.listdir(here)) out = fs.cat(f"/{os.path.basename(__file__)}") assert out == open(__file__, "rb").read() @@ -38,7 +38,7 @@ def test_basic(ftp): def test_basic_prot_p(ftp): host, port, _, _ = ftp - fs = FTPTLSFileSystem(host, port, prot_p=True) + fs = FTPFileSystem(host, port, ssl=True, prot_p=True) assert fs.ls("/", detail=False) == sorted(os.listdir(here)) out = fs.cat(f"/{os.path.basename(__file__)}") assert out == open(__file__, "rb").read() @@ -46,8 +46,8 @@ def test_basic_prot_p(ftp): def test_not_cached(ftp): host, port, _, _ = ftp - fs = FTPTLSFileSystem(host, port) - fs2 = FTPTLSFileSystem(host, port) + fs = FTPFileSystem(host, port, ssl=True) + fs2 = FTPFileSystem(host, port, ssl=True) assert fs is not fs2 @@ -57,13 +57,14 @@ def test_complex(ftp, cache_type): host, port, user, pw = ftp files = open_files( - "ftptls:///fsspec/implementations/tests/ou*", + "ftp:///ou*", host=host, port=port, username=user, password=pw, block_size=10000, cache_type=cache_type, + ssl=True, ) assert len(files) == 1 with files[0] as fo: @@ -76,7 +77,7 @@ def test_complex(ftp, cache_type): def test_write_small(ftp): host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw) + fs = FTPFileSystem(host, port, user, pw, ssl=True) with fs.open("/out_tls2", "wb") as f: f.write(b"oi") assert fs.cat("/out_tls2") == b"oi" @@ -95,8 +96,10 @@ def test_with_url(ftp): @pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) def test_write_big(ftp, cache_type): host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw, block_size=1000, cache_type=cache_type) - fn = "/bigger_tls" + fs = FTPFileSystem( + host, port, user, pw, block_size=1000, cache_type=cache_type, ssl=True + ) + fn = f"/bigger_tls_{cache_type}" with fs.open(fn, "wb") as f: f.write(b"o" * 500) assert not fs.exists(fn) @@ -108,11 +111,12 @@ def test_write_big(ftp, cache_type): assert fs.info(fn)["size"] == 1700 assert fs.cat(fn) == b"o" * 1700 + fs.rm(fn) def test_transaction(ftp): host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw) + fs = FTPFileSystem(host, port, user, pw, ssl=True) fs.mkdir("tmp_tls") fn = "tr" with fs.transaction: @@ -128,7 +132,7 @@ def test_transaction(ftp): def test_transaction_with_cache(ftp, tmpdir): host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw) + fs = FTPFileSystem(host, port, user, pw, ssl=True) fs.mkdirs("tmp_tls", exist_ok=True) fs.mkdir("tmp_tls/dir") assert "dir" in fs.ls("tmp_tls", detail=False) @@ -142,7 +146,7 @@ def test_transaction_with_cache(ftp, tmpdir): def test_cat_get(ftp, tmpdir): host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw, block_size=500) + fs = FTPFileSystem(host, port, user, pw, block_size=500, ssl=True) fs.mkdirs("tmp_tls", exist_ok=True) data = b"hello" * 500 fs.pipe("tmp_tls/myfile_tls", data) @@ -155,7 +159,7 @@ def test_cat_get(ftp, tmpdir): def test_mkdir(ftp): host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw) + fs = FTPFileSystem(host, port, user, pw, ssl=True) with pytest.raises(ftplib.error_perm): fs.mkdir("tmp_tls/not/exist_tls", create_parents=False) fs.mkdir("tmp_tls/not/exist") @@ -170,7 +174,7 @@ def test_mkdir(ftp): def test_rm_get_recursive(ftp, tmpdir): tmpdir = str(tmpdir) host, port, user, pw = ftp - fs = FTPTLSFileSystem(host, port, user, pw) + fs = FTPFileSystem(host, port, user, pw, ssl=True) fs.mkdir("tmp_tls/topdir") fs.mkdir("tmp_tls/topdir/underdir") fs.touch("tmp_tls/topdir/afile") diff --git a/fsspec/registry.py b/fsspec/registry.py index b7858b6ca..85504d6f6 100644 --- a/fsspec/registry.py +++ b/fsspec/registry.py @@ -110,7 +110,6 @@ def register_implementation(name, cls, clobber=False, errtxt=None): "file": {"class": "fsspec.implementations.local.LocalFileSystem"}, "filecache": {"class": "fsspec.implementations.cached.WholeFileCacheFileSystem"}, "ftp": {"class": "fsspec.implementations.ftp.FTPFileSystem"}, - "ftptls": {"class": "fsspec.implementations.ftp_tls.FTPTLSFileSystem"}, "gcs": { "class": "gcsfs.GCSFileSystem", "err": "Please install gcsfs to access Google Storage", From 0c0b9b5fd1e3fba9e222d4ed5915d551e4662822 Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Thu, 8 Aug 2024 11:51:28 +0200 Subject: [PATCH 8/9] Pairprogramming with Matthijs --- docs/source/api.rst | 3 - fsspec/implementations/ftp.py | 15 +- fsspec/implementations/tests/test_ftp.py | 14 ++ fsspec/implementations/tests/test_ftp_tls.py | 189 ------------------- 4 files changed, 22 insertions(+), 199 deletions(-) delete mode 100644 fsspec/implementations/tests/test_ftp_tls.py diff --git a/docs/source/api.rst b/docs/source/api.rst index ae94de103..cb14fe7e1 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -162,9 +162,6 @@ Built-in Implementations .. autoclass:: fsspec.implementations.ftp.FTPFileSystem :members: __init__ -.. autoclass:: fsspec.implementations.ftp.FTPTLSFileSystem - :members: __init__ - .. autoclass:: fsspec.implementations.git.GitFileSystem :members: __init__ diff --git a/fsspec/implementations/ftp.py b/fsspec/implementations/ftp.py index 41b414d0b..f0fd91c8e 100644 --- a/fsspec/implementations/ftp.py +++ b/fsspec/implementations/ftp.py @@ -27,8 +27,7 @@ def __init__( tempdir=None, timeout=30, encoding="utf-8", - ssl=False, - prot_p=False, + tls=False, **kwargs, ): """ @@ -58,26 +57,28 @@ def __init__( Timeout of the ftp connection in seconds encoding: str Encoding to use for directories and filenames in FTP connection + tls: bool + Use FTP-TLS, by default False """ super().__init__(**kwargs) self.host = host self.port = port self.tempdir = tempdir or "/tmp" - self.cred = username, password, acct + self.cred = username or "", password or "", acct or "" + print(self.cred) self.timeout = timeout self.encoding = encoding if block_size is not None: self.blocksize = block_size else: self.blocksize = 2**16 - self.ssl = ssl - self.prot_p = prot_p + self.tls = tls self._connect() - if self.prot_p: + if self.tls: self.ftp.prot_p() def _connect(self): - if self.ssl: + if self.tls: ftp_cls = FTP_TLS else: ftp_cls = FTP diff --git a/fsspec/implementations/tests/test_ftp.py b/fsspec/implementations/tests/test_ftp.py index d443d865b..98561489b 100644 --- a/fsspec/implementations/tests/test_ftp.py +++ b/fsspec/implementations/tests/test_ftp.py @@ -2,6 +2,7 @@ import subprocess import sys import time +from ftplib import FTP, FTP_TLS import pytest @@ -29,6 +30,19 @@ def ftp(): P.wait() +@pytest.mark.parametrize( + "tls,exp_cls", + ( + (False, FTP), + (True, FTP_TLS), + ), +) +def test_tls(ftp, tls, exp_cls): + host, port = ftp + fs = FTPFileSystem(host, port, tls=tls) + assert isinstance(fs.ftp, exp_cls) + + def test_basic(ftp): host, port = ftp fs = FTPFileSystem(host, port) diff --git a/fsspec/implementations/tests/test_ftp_tls.py b/fsspec/implementations/tests/test_ftp_tls.py deleted file mode 100644 index 1ee7afd6d..000000000 --- a/fsspec/implementations/tests/test_ftp_tls.py +++ /dev/null @@ -1,189 +0,0 @@ -import os -import subprocess -import sys -import time - -import pytest - -import fsspec -from fsspec import open_files -from fsspec.implementations.ftp import FTPFileSystem - -ftplib = pytest.importorskip("ftplib") -here = os.path.dirname(os.path.abspath(__file__)) - - -@pytest.fixture() -def ftp(): - P = subprocess.Popen( - [sys.executable, os.path.join(os.path.dirname(__file__), "ftp_tls.py")], - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - ) - try: - time.sleep(1) - yield "localhost", 2121, "user", "pass" - finally: - P.terminate() - P.wait() - - -def test_basic(ftp): - host, port, _, _ = ftp - fs = FTPFileSystem(host, port, timeout=1, ssl=True) - assert fs.ls("/", detail=False) == sorted(os.listdir(here)) - out = fs.cat(f"/{os.path.basename(__file__)}") - assert out == open(__file__, "rb").read() - - -def test_basic_prot_p(ftp): - host, port, _, _ = ftp - fs = FTPFileSystem(host, port, ssl=True, prot_p=True) - assert fs.ls("/", detail=False) == sorted(os.listdir(here)) - out = fs.cat(f"/{os.path.basename(__file__)}") - assert out == open(__file__, "rb").read() - - -def test_not_cached(ftp): - host, port, _, _ = ftp - fs = FTPFileSystem(host, port, ssl=True) - fs2 = FTPFileSystem(host, port, ssl=True) - assert fs is not fs2 - - -@pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) -def test_complex(ftp, cache_type): - from fsspec.core import BytesCache - - host, port, user, pw = ftp - files = open_files( - "ftp:///ou*", - host=host, - port=port, - username=user, - password=pw, - block_size=10000, - cache_type=cache_type, - ssl=True, - ) - assert len(files) == 1 - with files[0] as fo: - assert fo.read(10) == b"hellohello" - if isinstance(fo.cache, BytesCache): - assert len(fo.cache.cache) == 10010 - assert fo.read(2) == b"he" - assert fo.tell() == 12 - - -def test_write_small(ftp): - host, port, user, pw = ftp - fs = FTPFileSystem(host, port, user, pw, ssl=True) - with fs.open("/out_tls2", "wb") as f: - f.write(b"oi") - assert fs.cat("/out_tls2") == b"oi" - - -def test_with_url(ftp): - host, port, user, pw = ftp - fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out_tls", "wb") - with fo as f: - f.write(b"hello") - fo = fsspec.open(f"ftp://{user}:{pw}@{host}:{port}/out_tls", "rb") - with fo as f: - assert f.read() == b"hello" - - -@pytest.mark.parametrize("cache_type", ["bytes", "mmap"]) -def test_write_big(ftp, cache_type): - host, port, user, pw = ftp - fs = FTPFileSystem( - host, port, user, pw, block_size=1000, cache_type=cache_type, ssl=True - ) - fn = f"/bigger_tls_{cache_type}" - with fs.open(fn, "wb") as f: - f.write(b"o" * 500) - assert not fs.exists(fn) - f.write(b"o" * 1000) - fs.invalidate_cache() - assert fs.exists(fn) - f.write(b"o" * 200) - f.flush() - - assert fs.info(fn)["size"] == 1700 - assert fs.cat(fn) == b"o" * 1700 - fs.rm(fn) - - -def test_transaction(ftp): - host, port, user, pw = ftp - fs = FTPFileSystem(host, port, user, pw, ssl=True) - fs.mkdir("tmp_tls") - fn = "tr" - with fs.transaction: - with fs.open(fn, "wb") as f: - f.write(b"not") - assert not fs.exists(fn) - assert fs.exists(fn) - assert fs.cat(fn) == b"not" - - fs.rm(fn) - assert not fs.exists(fn) - - -def test_transaction_with_cache(ftp, tmpdir): - host, port, user, pw = ftp - fs = FTPFileSystem(host, port, user, pw, ssl=True) - fs.mkdirs("tmp_tls", exist_ok=True) - fs.mkdir("tmp_tls/dir") - assert "dir" in fs.ls("tmp_tls", detail=False) - - with fs.transaction: - fs.rmdir("tmp_tls/dir") - - assert "dir" not in fs.ls("tmp_tls", detail=False) - assert not fs.exists("tmp_tls/dir") - - -def test_cat_get(ftp, tmpdir): - host, port, user, pw = ftp - fs = FTPFileSystem(host, port, user, pw, block_size=500, ssl=True) - fs.mkdirs("tmp_tls", exist_ok=True) - data = b"hello" * 500 - fs.pipe("tmp_tls/myfile_tls", data) - assert fs.cat_file("tmp_tls/myfile_tls") == data - - fn = os.path.join(tmpdir, "lfile") - fs.get_file("tmp_tls/myfile_tls", fn) - assert open(fn, "rb").read() == data - - -def test_mkdir(ftp): - host, port, user, pw = ftp - fs = FTPFileSystem(host, port, user, pw, ssl=True) - with pytest.raises(ftplib.error_perm): - fs.mkdir("tmp_tls/not/exist_tls", create_parents=False) - fs.mkdir("tmp_tls/not/exist") - assert fs.exists("tmp_tls/not/exist") - fs.makedirs("tmp_tls/not/exist", exist_ok=True) - with pytest.raises(FileExistsError): - fs.makedirs("tmp_tls/not/exist", exist_ok=False) - fs.makedirs("tmp_tls/not/exist/inner/inner") - assert fs.isdir("tmp_tls/not/exist/inner/inner") - - -def test_rm_get_recursive(ftp, tmpdir): - tmpdir = str(tmpdir) - host, port, user, pw = ftp - fs = FTPFileSystem(host, port, user, pw, ssl=True) - fs.mkdir("tmp_tls/topdir") - fs.mkdir("tmp_tls/topdir/underdir") - fs.touch("tmp_tls/topdir/afile") - fs.touch("tmp_tls/topdir/underdir/afile") - - fs.get("tmp_tls/topdir", tmpdir, recursive=True) - - with pytest.raises(ftplib.error_perm): - fs.rmdir("tmp_tls/topdir") - - fs.rm("tmp_tls/topdir", recursive=True) - assert not fs.exists("tmp_tls/topdir") From 817525bd325f50f6dd57bc0dff6b8caa8a2506fe Mon Sep 17 00:00:00 2001 From: Bart van Es Date: Thu, 8 Aug 2024 13:27:15 +0200 Subject: [PATCH 9/9] Final update to fix tests --- fsspec/implementations/tests/ftp_tls.py | 8 ++++---- fsspec/implementations/tests/test_ftp.py | 15 ++++++++++++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/fsspec/implementations/tests/ftp_tls.py b/fsspec/implementations/tests/ftp_tls.py index c15c153fb..6d1359bfa 100644 --- a/fsspec/implementations/tests/ftp_tls.py +++ b/fsspec/implementations/tests/ftp_tls.py @@ -6,11 +6,11 @@ def ftp(): + """Script to run FTP server that accepts TLS""" # Set up FTP server parameters - FTP_HOST = "0.0.0.0" - FTP_PORT = 2121 # Choose a free port for the FTP server - FTP_DIRECTORY = os.path.dirname(__file__) - print(FTP_DIRECTORY) + FTP_HOST = "localhost" + FTP_PORT = 2121 + FTP_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) # Instantiate a dummy authorizer authorizer = DummyAuthorizer() diff --git a/fsspec/implementations/tests/test_ftp.py b/fsspec/implementations/tests/test_ftp.py index 98561489b..ccac390d3 100644 --- a/fsspec/implementations/tests/test_ftp.py +++ b/fsspec/implementations/tests/test_ftp.py @@ -18,7 +18,7 @@ def ftp(): pytest.importorskip("pyftpdlib") P = subprocess.Popen( - [sys.executable, "-m", "pyftpdlib", "-d", here], + [sys.executable, os.path.join(here, "ftp_tls.py")], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, ) @@ -43,9 +43,18 @@ def test_tls(ftp, tls, exp_cls): assert isinstance(fs.ftp, exp_cls) -def test_basic(ftp): +@pytest.mark.parametrize( + "tls,username,password", + ( + (False, "", ""), + (True, "", ""), + (False, "user", "pass"), + (True, "user", "pass"), + ), +) +def test_basic(ftp, tls, username, password): host, port = ftp - fs = FTPFileSystem(host, port) + fs = FTPFileSystem(host, port, username, password, tls=tls) assert fs.ls("/", detail=False) == sorted(os.listdir(here)) out = fs.cat(f"/{os.path.basename(__file__)}") assert out == open(__file__, "rb").read()