-
Notifications
You must be signed in to change notification settings - Fork 388
Add tls to ftp #1581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add tls to ftp #1581
Changes from 1 commit
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
1937cc8
Initial commit ftptls
bartvaneswhiffle 1445ec7
Add FTPTLS to documentation
bartvaneswhiffle 2f9fe6d
black
bartvaneswhiffle f3a6853
precommit
bartvaneswhiffle c0dba5d
Move prot_p and add test for ftp_tls
bartvaneswhiffle a942021
Add tests for ftp_tls (not finished yet, some are 4 could be still fa…
bartvaneswhiffle 76a3d2e
Rewrite ftp_tls to ftp
bartvaneswhiffle 0c0b9b5
Pairprogramming with Matthijs
bartvaneswhiffle 817525b
Final update to fix tests
bartvaneswhiffle b9f4a6f
Merge branch 'fsspec:master' into master
bartvaneswhiffle File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,386 @@ | ||
import os | ||
import sys | ||
import uuid | ||
import warnings | ||
from ftplib import FTP_TLS, Error, error_perm | ||
from typing import Any | ||
|
||
from ..spec import AbstractBufferedFile, AbstractFileSystem | ||
from ..utils import infer_storage_options, isfilelike | ||
|
||
|
||
class FTPTLSFileSystem(AbstractFileSystem): | ||
"""A filesystem over classic FTP""" | ||
|
||
root_marker = "/" | ||
cachable = False | ||
protocol = "ftp" | ||
|
||
def __init__( | ||
self, | ||
host, | ||
port=21, | ||
username=None, | ||
password=None, | ||
acct=None, | ||
block_size=None, | ||
tempdir=None, | ||
timeout=30, | ||
encoding="utf-8", | ||
**kwargs, | ||
): | ||
""" | ||
You can use _get_kwargs_from_urls to get some kwargs from | ||
a reasonable FTP url. | ||
|
||
Authentication will be anonymous if username/password are not | ||
given. | ||
|
||
Parameters | ||
---------- | ||
host: str | ||
The remote server name/ip to connect to | ||
port: int | ||
Port to connect with | ||
username: str or None | ||
If authenticating, the user's identifier | ||
password: str of None | ||
User's password on the server, if using | ||
acct: str or None | ||
Some servers also need an "account" string for auth | ||
block_size: int or None | ||
If given, the read-ahead or write buffer size. | ||
tempdir: str | ||
Directory on remote to put temporary files when in a transaction | ||
timeout: int | ||
Timeout of the ftp connection in seconds | ||
encoding: str | ||
Encoding to use for directories and filenames in FTP connection | ||
""" | ||
super().__init__(**kwargs) | ||
self.host = host | ||
self.port = port | ||
self.tempdir = tempdir or "/tmp" | ||
self.cred = username, password, acct | ||
self.timeout = timeout | ||
self.encoding = encoding | ||
if block_size is not None: | ||
self.blocksize = block_size | ||
else: | ||
self.blocksize = 2**16 | ||
self._connect() | ||
|
||
def _connect(self): | ||
if sys.version_info >= (3, 9): | ||
self.ftp = FTP_TLS(timeout=self.timeout, encoding=self.encoding) | ||
elif self.encoding: | ||
warnings.warn("`encoding` not supported for python<3.9, ignoring") | ||
self.ftp = FTP_TLS(timeout=self.timeout) | ||
else: | ||
self.ftp = FTP_TLS(timeout=self.timeout) | ||
self.ftp.connect(self.host, self.port) | ||
self.ftp.login(*self.cred) | ||
self.ftp.prot_p() | ||
|
||
@classmethod | ||
def _strip_protocol(cls, path): | ||
return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/") | ||
|
||
@staticmethod | ||
def _get_kwargs_from_urls(urlpath): | ||
out = infer_storage_options(urlpath) | ||
out.pop("path", None) | ||
out.pop("protocol", None) | ||
return out | ||
|
||
def ls(self, path, detail=True, **kwargs): | ||
path = self._strip_protocol(path) | ||
out = [] | ||
if path not in self.dircache: | ||
try: | ||
try: | ||
out = [ | ||
(fn, details) | ||
for (fn, details) in self.ftp.mlsd(path) | ||
if fn not in [".", ".."] | ||
and details["type"] not in ["pdir", "cdir"] | ||
] | ||
except error_perm: | ||
out = _mlsd2(self.ftp, path) # Not platform independent | ||
for fn, details in out: | ||
if path == "/": | ||
path = "" # just for forming the names, below | ||
details["name"] = "/".join([path, fn.lstrip("/")]) | ||
if details["type"] == "file": | ||
details["size"] = int(details["size"]) | ||
else: | ||
details["size"] = 0 | ||
if details["type"] == "dir": | ||
details["type"] = "directory" | ||
self.dircache[path] = out | ||
except Error: | ||
try: | ||
info = self.info(path) | ||
if info["type"] == "file": | ||
out = [(path, info)] | ||
except (Error, IndexError): | ||
raise FileNotFoundError(path) | ||
files = self.dircache.get(path, out) | ||
if not detail: | ||
return sorted([fn for fn, details in files]) | ||
return [details for fn, details in files] | ||
|
||
def info(self, path, **kwargs): | ||
# implement with direct method | ||
path = self._strip_protocol(path) | ||
if path == "/": | ||
# special case, since this dir has no real entry | ||
return {"name": "/", "size": 0, "type": "directory"} | ||
files = self.ls(self._parent(path).lstrip("/"), True) | ||
try: | ||
out = [f for f in files if f["name"] == path][0] | ||
except IndexError: | ||
raise FileNotFoundError(path) | ||
return out | ||
|
||
def get_file(self, rpath, lpath, **kwargs): | ||
if self.isdir(rpath): | ||
if not os.path.exists(lpath): | ||
os.mkdir(lpath) | ||
return | ||
if isfilelike(lpath): | ||
outfile = lpath | ||
else: | ||
outfile = open(lpath, "wb") | ||
|
||
def cb(x): | ||
outfile.write(x) | ||
|
||
self.ftp.retrbinary( | ||
f"RETR {rpath}", | ||
blocksize=self.blocksize, | ||
callback=cb, | ||
) | ||
if not isfilelike(lpath): | ||
outfile.close() | ||
|
||
def cat_file(self, path, start=None, end=None, **kwargs): | ||
if end is not None: | ||
return super().cat_file(path, start, end, **kwargs) | ||
out = [] | ||
|
||
def cb(x): | ||
out.append(x) | ||
|
||
try: | ||
self.ftp.retrbinary( | ||
f"RETR {path}", | ||
blocksize=self.blocksize, | ||
rest=start, | ||
callback=cb, | ||
) | ||
except (Error, error_perm) as orig_exc: | ||
raise FileNotFoundError(path) from orig_exc | ||
return b"".join(out) | ||
|
||
def _open( | ||
self, | ||
path, | ||
mode="rb", | ||
block_size=None, | ||
cache_options=None, | ||
autocommit=True, | ||
**kwargs, | ||
): | ||
path = self._strip_protocol(path) | ||
block_size = block_size or self.blocksize | ||
return FTPFile( | ||
self, | ||
path, | ||
mode=mode, | ||
block_size=block_size, | ||
tempdir=self.tempdir, | ||
autocommit=autocommit, | ||
cache_options=cache_options, | ||
) | ||
|
||
def _rm(self, path): | ||
path = self._strip_protocol(path) | ||
self.ftp.delete(path) | ||
self.invalidate_cache(self._parent(path)) | ||
|
||
def rm(self, path, recursive=False, maxdepth=None): | ||
paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) | ||
for p in reversed(paths): | ||
if self.isfile(p): | ||
self.rm_file(p) | ||
else: | ||
self.rmdir(p) | ||
|
||
def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: | ||
path = self._strip_protocol(path) | ||
parent = self._parent(path) | ||
if parent != self.root_marker and not self.exists(parent) and create_parents: | ||
self.mkdir(parent, create_parents=create_parents) | ||
|
||
self.ftp.mkd(path) | ||
self.invalidate_cache(self._parent(path)) | ||
|
||
def makedirs(self, path: str, exist_ok: bool = False) -> None: | ||
path = self._strip_protocol(path) | ||
if self.exists(path): | ||
# NB: "/" does not "exist" as it has no directory entry | ||
if not exist_ok: | ||
raise FileExistsError(f"{path} exists without `exist_ok`") | ||
# exists_ok=True -> no-op | ||
else: | ||
self.mkdir(path, create_parents=True) | ||
|
||
def rmdir(self, path): | ||
path = self._strip_protocol(path) | ||
self.ftp.rmd(path) | ||
self.invalidate_cache(self._parent(path)) | ||
|
||
def mv(self, path1, path2, **kwargs): | ||
path1 = self._strip_protocol(path1) | ||
path2 = self._strip_protocol(path2) | ||
self.ftp.rename(path1, path2) | ||
self.invalidate_cache(self._parent(path1)) | ||
self.invalidate_cache(self._parent(path2)) | ||
|
||
def __del__(self): | ||
self.ftp.close() | ||
|
||
def invalidate_cache(self, path=None): | ||
if path is None: | ||
self.dircache.clear() | ||
else: | ||
self.dircache.pop(path, None) | ||
super().invalidate_cache(path) | ||
|
||
|
||
class TransferDone(Exception): | ||
"""Internal exception to break out of transfer""" | ||
|
||
pass | ||
|
||
|
||
class FTPFile(AbstractBufferedFile): | ||
"""Interact with a remote FTP file with read/write buffering""" | ||
|
||
def __init__( | ||
self, | ||
fs, | ||
path, | ||
mode="rb", | ||
block_size="default", | ||
autocommit=True, | ||
cache_type="readahead", | ||
cache_options=None, | ||
**kwargs, | ||
): | ||
super().__init__( | ||
fs, | ||
path, | ||
mode=mode, | ||
block_size=block_size, | ||
autocommit=autocommit, | ||
cache_type=cache_type, | ||
cache_options=cache_options, | ||
**kwargs, | ||
) | ||
if not autocommit: | ||
self.target = self.path | ||
self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())]) | ||
|
||
def commit(self): | ||
self.fs.mv(self.path, self.target) | ||
|
||
def discard(self): | ||
self.fs.rm(self.path) | ||
|
||
def _fetch_range(self, start, end): | ||
"""Get bytes between given byte limits | ||
|
||
Implemented by raising an exception in the fetch callback when the | ||
number of bytes received reaches the requested amount. | ||
|
||
Will fail if the server does not respect the REST command on | ||
retrieve requests. | ||
""" | ||
out = [] | ||
total = [0] | ||
|
||
def callback(x): | ||
total[0] += len(x) | ||
if total[0] > end - start: | ||
out.append(x[: (end - start) - total[0]]) | ||
if end < self.size: | ||
raise TransferDone | ||
else: | ||
out.append(x) | ||
|
||
if total[0] == end - start and end < self.size: | ||
raise TransferDone | ||
|
||
try: | ||
self.fs.ftp.retrbinary( | ||
f"RETR {self.path}", | ||
blocksize=self.blocksize, | ||
rest=start, | ||
callback=callback, | ||
) | ||
except TransferDone: | ||
try: | ||
# stop transfer, we got enough bytes for this block | ||
self.fs.ftp.abort() | ||
self.fs.ftp.getmultiline() | ||
except Error: | ||
self.fs._connect() | ||
|
||
return b"".join(out) | ||
|
||
def _upload_chunk(self, final=False): | ||
self.buffer.seek(0) | ||
self.fs.ftp.storbinary( | ||
f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset | ||
) | ||
return True | ||
|
||
|
||
def _mlsd2(ftp, path="."): | ||
""" | ||
Fall back to using `dir` instead of `mlsd` if not supported. | ||
|
||
This parses a Linux style `ls -l` response to `dir`, but the response may | ||
be platform dependent. | ||
|
||
Parameters | ||
---------- | ||
ftp: ftplib.FTP | ||
path: str | ||
Expects to be given path, but defaults to ".". | ||
""" | ||
lines = [] | ||
minfo = [] | ||
ftp.dir(path, lines.append) | ||
for line in lines: | ||
split_line = line.split() | ||
if len(split_line) < 9: | ||
continue | ||
this = ( | ||
split_line[-1], | ||
{ | ||
"modify": " ".join(split_line[5:8]), | ||
"unix.owner": split_line[2], | ||
"unix.group": split_line[3], | ||
"unix.mode": split_line[0], | ||
"size": split_line[4], | ||
}, | ||
) | ||
if "d" == this[1]["unix.mode"][0]: | ||
this[1]["type"] = "dir" | ||
else: | ||
this[1]["type"] = "file" | ||
minfo.append(this) | ||
return minfo |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at this.