# ByteDance Volcengine EMR, Copyright 2024.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The core module of TOSFS."""
import io
import logging
import mimetypes
import os
import tempfile
import warnings
from glob import has_magic
from typing import Any, BinaryIO, Collection, Generator, List, Optional, Tuple, Union
import tos
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import other_paths
from fsspec.utils import setup_logging as setup_logger
from tos.auth import CredentialProviderAuth
from tos.exceptions import TosClientError, TosServerError
from tos.models import CommonPrefixInfo
from tos.models2 import (
ListedObject,
ListedObjectVersion,
ListObjectType2Output,
PartInfo,
UploadPartCopyOutput,
)
from tosfs.compatible import FsspecCompatibleFS
from tosfs.consts import (
ENV_NAME_TOS_BUCKET_TAG_ENABLE,
ENV_NAME_TOS_SDK_LOGGING_LEVEL,
ENV_NAME_TOSFS_LOGGING_LEVEL,
FILE_OPERATION_READ_WRITE_BUFFER_SIZE,
GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE,
LS_OPERATION_DEFAULT_MAX_ITEMS,
MANAGED_COPY_MAX_THRESHOLD,
MANAGED_COPY_MIN_THRESHOLD,
MPU_PART_SIZE_THRESHOLD,
PART_MAX_SIZE,
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD,
TOS_BUCKET_TYPE_FNS,
TOS_BUCKET_TYPE_HNS,
TOS_SERVER_STATUS_CODE_NOT_FOUND,
TOSFS_LOG_FORMAT,
)
from tosfs.exceptions import TosfsError
from tosfs.fsspec_utils import glob_translate
from tosfs.models import DeletingObject
from tosfs.retry import CONFLICT_CODE, INVALID_RANGE_CODE, retryable_func_executor
from tosfs.tag import BucketTagMgr
from tosfs.utils import find_bucket_key, get_brange
from tosfs.version import Version
logger = logging.getLogger("tosfs")
def setup_logging() -> None:
"""Set up the logging configuration for TOSFS."""
setup_logger(
logger=logger,
level=os.environ.get(ENV_NAME_TOSFS_LOGGING_LEVEL, "WARNING"),
)
formatter = logging.Formatter(TOSFS_LOG_FORMAT)
for handler in logger.handlers:
handler.setFormatter(formatter)
# set and config tos client's logger
tos.set_logger(
name="tosclient",
level=os.environ.get(ENV_NAME_TOS_SDK_LOGGING_LEVEL, "WARNING"),
log_handler=logging.StreamHandler(),
format_string=TOSFS_LOG_FORMAT,
)
setup_logging()
if logger.level < logging.WARNING:
logger.warning(
"The tosfs's log level is set to be %s", logging.getLevelName(logger.level)
)
[docs]
class TosFileSystem(FsspecCompatibleFS):
"""Tos file system.
It's an implementation of AbstractFileSystem which is an
abstract super-class for pythonic file-systems.
"""
protocol = ("tos",)
[docs]
def __init__(
self,
endpoint: Optional[str] = None,
key: str = "",
secret: str = "",
region: Optional[str] = None,
session_token: Optional[str] = None,
max_retry_num: int = 20,
max_connections: int = 1024,
connection_timeout: int = 10,
socket_timeout: int = 30,
high_latency_log_threshold: int = 100,
version_aware: bool = False,
credentials_provider: Optional[object] = None,
default_block_size: Optional[int] = None,
default_fill_cache: bool = True,
default_cache_type: str = "readahead",
multipart_staging_dirs: str = tempfile.mkdtemp(),
multipart_size: int = 8 << 20,
multipart_thread_pool_size: int = max(2, os.cpu_count() or 1),
multipart_threshold: int = 5 << 20,
enable_crc: bool = True,
enable_verify_ssl: bool = True,
dns_cache_timeout: int = 0,
proxy_host: Optional[str] = None,
proxy_port: Optional[int] = None,
proxy_username: Optional[str] = None,
proxy_password: Optional[str] = None,
disable_encoding_meta: Optional[bool] = None,
except100_continue_threshold: int = 65536,
endpoint_url: Optional[str] = None, # Deprecated parameter
**kwargs: Any,
) -> None:
"""Initialise the TosFileSystem.
Parameters
----------
endpoint : str, optional
The endpoint of the TOS service.
key : str
The access key ID(ak) to access the TOS service.
secret : str
The secret access key(sk) to access the TOS service.
region : str, optional
The region of the TOS service.
session_token : str, optional
The temporary session token to access the TOS service.
max_retry_num : int, optional
The maximum number of retries for a failed request (default is 20).
max_connections : int, optional
The maximum number of HTTP connections that can be opened in the
connection pool (default is 1024).
connection_timeout : int, optional
The time to keep a connection open in seconds (default is 10).
socket_timeout : int, optional
The socket read and write timeout time for a single request after
a connection is successfully established, in seconds.
The default is 30 seconds.
Reference: https://requests.readthedocs.io/en/latest/user/quickstart/
#timeouts (default is 30).
high_latency_log_threshold : int, optional
The threshold for logging high latency operations. When greater than 0,
it represents enabling high-latency logs. The unit is KB.
By default, it is 100.
When the total transmission rate of a single request is lower than
this value and the total request time is greater than 500 milliseconds,
WARN-level logs are printed.
version_aware : bool, optional
Whether the filesystem is version aware (default is False).
Currently, not been supported, please DO NOT set to True.
credentials_provider : object, optional
The credentials provider for the TOS service.
default_block_size : int, optional
The default block size for reading and writing (default is None).
default_fill_cache : bool, optional
Whether to fill the cache (default is True).
default_cache_type : str, optional
The default cache type (default is 'readahead').
multipart_staging_dirs : str, optional
The staging directories for multipart uploads (default is a temporary
directory). Separate the staging dirs with comma if there are many
staging dir paths.
multipart_size : int, optional
The multipart upload part size of the given object storage.
(default is 8MB).
multipart_thread_pool_size : int, optional
The size of thread pool used for uploading multipart in parallel for the
given object storage. (default is max(2, os.cpu_count()).
multipart_threshold : int, optional
The threshold which control whether enable multipart upload during
writing data to the given object storage, if the write data size is less
than threshold, will write data via simple put instead of multipart upload.
default is 10 MB.
enable_crc : bool
Whether to enable client side CRC check after upload, default is true
enable_verify_ssl : bool
Whether to verify the SSL certificate, default is true.
dns_cache_timeout : int
The DNS cache timeout in minutes, if it is less than or equal to 0,
it means to close the DNS cache, default is 0.
proxy_host : str, optional
The host address of the proxy server, currently only supports
the http protocol.
proxy_port : int, optional
The port of the proxy server.
proxy_username : str, optional
The username to use when connecting to the proxy server.
proxy_password : str, optional
The password to use when connecting to the proxy server.
disable_encoding_meta : bool, optional
Whether to encode user-defined metadata x-tos-meta- Content-Disposition,
default encoding, no encoding when set to true.
except100_continue_threshold : int
When it is greater than 0, it means that the interface related to the
upload object opens the 100-continue mechanism for requests with the
length of the data to be uploaded greater than the threshold
(if the length of the data cannot be predicted, it is uniformly determined
to be greater than the threshold), unit byte, default 65536
endpoint_url : str, optional
(deprecated) The endpoint URL of the TOS service.
kwargs : Any, optional
Additional arguments.
"""
self.endpoint = endpoint
if endpoint_url is not None:
warnings.warn(
"The 'endpoint_url' parameter is deprecated and will be removed"
" in a future release. Please use 'endpoint' instead.",
DeprecationWarning,
stacklevel=2,
)
self.endpoint = endpoint_url
self.tos_client = tos.TosClientV2(
key,
secret,
self.endpoint,
region,
security_token=session_token,
max_retry_count=0,
max_connections=max_connections,
connection_time=connection_timeout,
socket_timeout=socket_timeout,
high_latency_log_threshold=high_latency_log_threshold,
credentials_provider=credentials_provider,
enable_crc=enable_crc,
enable_verify_ssl=enable_verify_ssl,
disable_encoding_meta=disable_encoding_meta,
dns_cache_time=dns_cache_timeout,
proxy_host=proxy_host,
proxy_port=proxy_port,
proxy_username=proxy_username,
proxy_password=proxy_password,
except100_continue_threshold=except100_continue_threshold,
user_agent_product_name="EMR",
user_agent_soft_name="TOSFS",
user_agent_soft_version=Version.version,
user_agent_customized_key_values={"revision": Version.revision},
)
if version_aware:
raise ValueError("Currently, version_aware is not supported.")
self.tag_enabled = (
os.environ.get(ENV_NAME_TOS_BUCKET_TAG_ENABLE, "true").lower() == "true"
)
if self.tag_enabled:
logger.debug("The tos bucket tag is enabled.")
self._init_tag_manager()
self.version_aware = version_aware
self.default_block_size = (
default_block_size or FILE_OPERATION_READ_WRITE_BUFFER_SIZE
)
self.default_fill_cache = default_fill_cache
self.default_cache_type = default_cache_type
self.max_retry_num = max_retry_num
self.multipart_staging_dirs = [
d.strip() for d in multipart_staging_dirs.split(",")
]
self.multipart_size = multipart_size
self.multipart_thread_pool_size = multipart_thread_pool_size
self.multipart_threshold = multipart_threshold
super().__init__(**kwargs)
def _open(
self,
path: str,
mode: str = "rb",
block_size: Optional[int] = None,
version_id: Optional[str] = None,
fill_cache: Optional[bool] = None,
cache_type: Optional[str] = None,
autocommit: bool = True,
**kwargs: Any,
) -> AbstractBufferedFile:
"""Open a file for reading or writing.
Parameters
----------
path: string
Path of file on TOS
mode: string
One of 'r', 'w', 'a', 'rb', 'wb', or 'ab'. These have the same meaning
as they do for the built-in `open` function.
block_size: int
Size of data-node blocks if reading
version_id : str
Explicit version of the object to open. This requires that the tos
filesystem is version aware and bucket versioning is enabled on the
relevant bucket.
fill_cache: bool
If seeking to new a part of the file beyond the current buffer,
with this True, the buffer will be filled between the sections to
best support random access. When reading only a few specific chunks
out of a file, performance may be better if False.
cache_type : str
See fsspec's documentation for available cache_type values. Set to "none"
if no caching is desired. If None, defaults to ``self.default_cache_type``.
autocommit : bool
If True, writes will be committed to the filesystem on flush or close.
kwargs: dict-like
Additional parameters.
"""
if block_size is None:
block_size = self.default_block_size
if fill_cache is None:
fill_cache = self.default_fill_cache
if version_id:
raise ValueError(
"version_id cannot be specified if the filesystem "
"is not version aware"
)
if cache_type is None:
cache_type = self.default_cache_type
return TosFile(
self,
path,
mode,
block_size=block_size,
version_id=version_id,
fill_cache=fill_cache,
cache_type=cache_type,
autocommit=autocommit,
)
[docs]
def ls(
self,
path: str,
detail: bool = False,
versions: bool = False,
**kwargs: Union[str, bool, float, None],
) -> Union[List[dict], List[str]]:
"""List objects under the given path.
Parameters
----------
path : str
The path to list.
detail : bool, optional
Whether to return detailed information (default is False).
versions : bool, optional
Whether to list object versions (default is False).
**kwargs : dict, optional
Additional arguments.
Returns
-------
Union[List[dict], List[str]]
A list of objects under the given path. If `detail` is True,
returns a list of dictionaries with detailed information.
Otherwise, returns a list of object names.
Raises
------
IOError
If there is an error accessing the parent directory.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.ls("mybucket")
['mybucket/file1', 'mybucket/file2']
>>> fs.ls("mybucket", detail=True)
[{'name': 'mybucket/file1', 'size': 123, 'type': 'file'},
{'name': 'mybucket/file2', 'size': 456, 'type': 'file'}]
"""
path = self._strip_protocol(path)
if path in ["", "/"]:
files = self._ls_buckets()
return files if detail else sorted([o["name"] for o in files])
files = self._ls_dirs_and_files(path, versions=versions)
if not files and "/" in path:
try:
files = self._ls_dirs_and_files(self._parent(path), versions=versions)
except IOError:
pass
files = [
o
for o in files
if o["name"].rstrip("/") == path and o["type"] != "directory"
]
return files if detail else sorted([o["name"] for o in files])
[docs]
def ls_iterate(
self,
path: str,
detail: bool = False,
versions: bool = False,
batch_size: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
recursive: bool = False,
**kwargs: Union[str, bool, float, None],
) -> Generator[Union[List[dict], List[str]], None, None]:
"""List objects under the given path in batches then returns an iterator.
Parameters
----------
path : str
The path to list.
detail : bool, optional
Whether to return detailed information (default is False).
versions : bool, optional
Whether to list object versions (default is False).
batch_size : int, optional
The number of items to fetch in each batch (default is 1000).
recursive : bool, optional
Whether to list objects recursively (default is False).
**kwargs : dict, optional
Additional arguments.
Returns
-------
Generator[Union[dict, str], None, None]
An iterator that yields objects under the given path.
Raises
------
ValueError
If versions is specified but the filesystem is not version aware.
"""
if versions:
raise ValueError(
"versions cannot be specified if the filesystem "
"is not version aware."
)
path = self._strip_protocol(path)
bucket, key, _ = self._split_path(path)
if recursive and self._is_hns_bucket(bucket):
raise ValueError("Recursive listing is not supported for HNS bucket.")
prefix = key.lstrip("/") + "/" if key else ""
continuation_token = ""
is_truncated = True
while is_truncated:
def _call_list_objects_type2(
continuation_token: str = continuation_token,
) -> ListObjectType2Output:
return self.tos_client.list_objects_type2(
bucket,
prefix,
start_after=prefix,
delimiter=None if recursive else "/",
max_keys=batch_size,
continuation_token=continuation_token,
)
resp = retryable_func_executor(
_call_list_objects_type2,
args=(continuation_token,),
max_retry_num=self.max_retry_num,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
results = resp.contents + resp.common_prefixes
batch = []
for obj in results:
if isinstance(obj, CommonPrefixInfo):
info = self._fill_dir_info(bucket, obj)
elif obj.key.endswith("/"):
info = self._fill_dir_info(bucket, None, obj.key)
else:
info = self._fill_file_info(obj, bucket, versions)
batch.append(info if detail else info["name"])
yield batch
[docs]
def info(
self,
path: str,
bucket: Optional[str] = None,
key: Optional[str] = None,
version_id: Optional[str] = None,
) -> dict:
"""Give details of entry at path.
Returns a single dictionary, with exactly the same information as ``ls``
would with ``detail=True``.
The default implementation should calls ls and could be overridden by a
shortcut. kwargs are passed on to ```ls()``.
Some file systems might not be able to measure the file's size, in
which case, the returned dict will include ``'size': None``.
Returns
-------
dict with keys: name (full path in the FS), size (in bytes), type (file,
directory, or something else) and other FS-specific keys.
"""
if path in ["/", ""]:
return {"name": path, "size": 0, "type": "directory"}
path = self._strip_protocol(path)
bucket, key, path_version_id = self._split_path(path)
fullpath = "/".join((bucket, key))
if version_id:
raise ValueError(
"version_id cannot be specified due to the "
"filesystem is not support version aware."
)
if not key:
return self._bucket_info(bucket)
bucket_type = self._get_bucket_type(bucket)
if bucket_type == TOS_BUCKET_TYPE_FNS:
result = self._object_info(bucket, key, version_id)
if not result:
result = self._get_dir_info(bucket, key, fullpath)
else:
# Priority is given to judging dir, followed by file.
result = self._get_dir_info(bucket, key, fullpath)
if not result:
result = self._object_info(bucket, key, version_id)
if not result:
raise FileNotFoundError(f"Can not get information for path: {path}")
return result
[docs]
def exists(self, path: str, **kwargs: Any) -> bool:
"""Check if a path exists in the TOS.
Parameters
----------
path : str
The path to check for existence.
**kwargs : Any, optional
Additional arguments if needed in the future.
Returns
-------
bool
True if the path exists, False otherwise.
Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the path.
tos.exceptions.TosServerError
If there is a server error while checking the path.
TosfsError
If there is an unknown error while checking the path.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.exists("tos://bucket/to/file")
True
>>> fs.exists("tos://mybucket/nonexistentfile")
False
"""
if path in ["", "/"]:
# the root always exists
return True
path = self._strip_protocol(path)
bucket, key, version_id = self._split_path(path)
# if the path is a bucket
if not key:
return self._exists_bucket(bucket)
try:
resp = retryable_func_executor(
lambda: self.tos_client.get_file_status(bucket, key),
max_retry_num=self.max_retry_num,
)
return resp.key is not None
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
raise e
[docs]
def rm_file(self, path: str) -> None:
"""Delete a file."""
logger.warning("Call rm_file api: path %s", path)
super().rm_file(path)
[docs]
def rmdir(self, path: str) -> None:
"""Remove a directory if it is empty.
Parameters
----------
path : str
The path of the directory to remove. The path should be in the format
`tos://bucket/path/to/directory`.
Raises
------
FileNotFoundError
If the directory does not exist.
NotADirectoryError
If the path is not a directory.
TosfsError
If the directory is not empty,
or the path is a bucket.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.rmdir("tos://mybucket/mydir/")
"""
logger.warning("Call rmdir api: path %s", path)
path = self._strip_protocol(path) + "/"
bucket, key, _ = self._split_path(path)
if not key:
raise TosfsError("Cannot remove a bucket using rmdir api.")
if not self.exists(path):
raise FileNotFoundError(f"Directory {path} not found.")
if not self.isdir(path):
raise NotADirectoryError(f"{path} is not a directory.")
if len(self._ls_objects(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0:
raise TosfsError(f"Directory {path} is not empty.")
retryable_func_executor(
lambda: self.tos_client.delete_object(bucket, key.rstrip("/") + "/"),
max_retry_num=self.max_retry_num,
)
[docs]
def rm(
self, path: str, recursive: bool = False, maxdepth: Optional[int] = None
) -> None:
"""Delete files.
Parameters
----------
path: str or list of str
File(s) to delete.
recursive: bool
If file(s) are directories, recursively delete contents and then
also remove the directory
maxdepth: int or None
Depth to pass to walk for finding files to delete, if recursive.
If None, there will be no limit and infinite recursion may be
possible.
"""
logger.warning(
"Call rm api: path %s, recursive %s, maxdepth %s", path, recursive, maxdepth
)
if isinstance(path, str):
if not self.exists(path):
raise FileNotFoundError(path)
bucket, key, _ = self._split_path(path)
if not key:
raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.")
if not recursive or maxdepth:
return super().rm(path, recursive=recursive, maxdepth=maxdepth)
if self.isfile(path):
self.rm_file(path)
else:
try:
self._list_and_batch_delete_objs(bucket, key)
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
else:
for single_path in path:
self.rm(single_path, recursive=recursive, maxdepth=maxdepth)
[docs]
def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
"""Create directory entry at path.
For systems that don't have true directories, may create an object for
this instance only and not touch the real filesystem
Parameters
----------
path: str
location
create_parents: bool
if True, this is equivalent to ``makedirs``
kwargs: Any
may be permissions, etc.
"""
path = self._strip_protocol(path) + "/"
bucket, key, _ = self._split_path(path)
if not key:
raise TosfsError(f"Cannot create a bucket {bucket} using mkdir api.")
if create_parents:
parent = self._parent(f"{bucket}/{key}".rstrip("/") + "/")
if not self.exists(parent):
# here we need to create the parent directory recursively
self.mkdir(parent, create_parents=True)
retryable_func_executor(
lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"),
max_retry_num=self.max_retry_num,
)
else:
parent = self._parent(path)
if not self.exists(parent):
raise FileNotFoundError(f"Parent directory {parent} does not exist.")
else:
retryable_func_executor(
lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"),
max_retry_num=self.max_retry_num,
)
[docs]
def makedirs(self, path: str, exist_ok: bool = False) -> None:
"""Recursively make directories.
Creates directory at path and any intervening required directories.
Raises exception if, for instance, the path already exists but is a
file.
Parameters
----------
path: str
leaf directory name
exist_ok: bool (False)
If False, will error if the target already exists
"""
path = self._strip_protocol(path) + "/"
path_exist = self.exists(path)
if exist_ok and path_exist:
return
if not exist_ok and path_exist:
raise FileExistsError(path)
self.mkdir(path, create_parents=True)
[docs]
def touch(self, path: str, truncate: bool = True, **kwargs: Any) -> None:
"""Create an empty file at the given path.
Parameters
----------
path : str
The path of the file to create.
truncate : bool, optional
Whether to truncate the file if it already exists (default is True).
**kwargs : Any, optional
Additional arguments.
Raises
------
FileExistsError
If the file already exists and `truncate` is False.
TosfsError
If there is an unknown error while creating the file.
tos.exceptions.TosClientError
If there is a client error while creating the file.
tos.exceptions.TosServerError
If there is a server error while creating the file.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.touch("tos://mybucket/myfile")
"""
path = self._strip_protocol(path)
bucket, key, _ = self._split_path(path)
if not truncate and self.exists(path):
raise FileExistsError(f"File {path} already exists.")
retryable_func_executor(
lambda: self.tos_client.put_object(bucket, key),
max_retry_num=self.max_retry_num,
)
[docs]
def isdir(self, path: str) -> bool:
"""Check if the path is a directory.
Parameters
----------
path : str
The path to check.
Returns
-------
bool
True if the path is a directory, False otherwise.
Raises
------
TosClientError
If there is a client error while accessing the path.
TosServerError
If there is a server error while accessing the path.
TosfsError
If there is an unknown error while accessing the path.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.isdir("tos://mybucket/mydir/")
"""
path = self._strip_protocol(path) + "/"
bucket, key, _ = self._split_path(path)
if not key:
return False
try:
if self._is_fns_bucket(bucket):
resp = retryable_func_executor(
lambda: self.tos_client.get_file_status(bucket, key),
max_retry_num=self.max_retry_num,
)
return resp.key != key
else:
resp = retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key),
max_retry_num=self.max_retry_num,
)
return resp.is_directory
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
raise e
[docs]
def isfile(self, path: str) -> bool:
"""Check if the path is a file.
Parameters
----------
path : str
The path to check.
Returns
-------
bool
True if the path is a file, False otherwise.
"""
if path.endswith("/"):
return False
bucket, key, _ = self._split_path(path)
if not key:
return False
try:
resp = retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key),
max_retry_num=self.max_retry_num,
)
if self._is_fns_bucket(bucket):
return True
else:
return not resp.is_directory
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
[docs]
def put(
self,
lpath: str,
rpath: str,
recursive: bool = False,
callback: Any = None,
maxdepth: Optional[int] = None,
disable_glob: bool = False,
**kwargs: Any,
) -> None:
"""Copy file(s) from local.
Copies a specific file or tree of files (if recursive=True). If rpath
ends with a "/", it will be assumed to be a directory, and target files
will go within.
Calls put_file for each source.
"""
super().put(
lpath, rpath, recursive=recursive, disable_glob=disable_glob, **kwargs
)
[docs]
def put_file(
self,
lpath: str,
rpath: str,
chunksize: int = FILE_OPERATION_READ_WRITE_BUFFER_SIZE,
**kwargs: Any,
) -> None:
"""Put a file from local to TOS.
Parameters
----------
lpath : str
The local path of the file to put.
rpath : str
The remote path of the file to receive.
chunksize : int, optional
The size of the chunks to read from the file (default is 5 * 2**20).
**kwargs : Any, optional
Additional arguments.
Raises
------
FileNotFoundError
If the local file does not exist.
TosClientError
If there is a client error while putting the file.
TosServerError
If there is a server error while putting the file.
TosfsError
If there is an unknown error while putting the file.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.put_file("localfile.txt", "tos://mybucket/remote.txt")
"""
if not os.path.exists(lpath):
raise FileNotFoundError(f"Local file {lpath} not found.")
if os.path.isdir(lpath):
self.makedirs(rpath, exist_ok=True)
return
size = os.path.getsize(lpath)
content_type = None
if "ContentType" not in kwargs:
content_type, _ = mimetypes.guess_type(lpath)
if self.isfile(rpath):
self.makedirs(self._parent(rpath), exist_ok=True)
if self.isdir(rpath):
rpath = os.path.join(rpath, os.path.basename(lpath))
self.mkdirs(self._parent(rpath), exist_ok=True)
bucket, key, _ = self._split_path(rpath)
with open(lpath, "rb") as f:
if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize):
chunk = f.read()
retryable_func_executor(
lambda: self.tos_client.put_object(
bucket,
key,
content=chunk,
content_type=content_type,
),
max_retry_num=self.max_retry_num,
)
else:
mpu = retryable_func_executor(
lambda: self.tos_client.create_multipart_upload(
bucket, key, content_type=content_type
),
max_retry_num=self.max_retry_num,
)
retryable_func_executor(
lambda: self.tos_client.upload_part_from_file(
bucket, key, mpu.upload_id, file_path=lpath, part_number=1
),
max_retry_num=self.max_retry_num,
)
retryable_func_executor(
lambda: self.tos_client.complete_multipart_upload(
bucket, key, mpu.upload_id, complete_all=True
),
max_retry_num=self.max_retry_num,
)
[docs]
def get(
self,
rpath: str,
lpath: str,
recursive: bool = False,
callback: Any = None,
maxdepth: Optional[int] = None,
**kwargs: Any,
) -> None:
"""Copy file(s) to local.
Copies a specific file or tree of files (if recursive=True). If lpath
ends with a "/", it will be assumed to be a directory, and target files
will go within. Can submit a list of paths, which may be glob-patterns
and will be expanded.
Calls get_file for each source.
"""
if isinstance(lpath, list) and isinstance(rpath, list):
# No need to expand paths when both source and destination
# are provided as lists
rpaths = rpath
lpaths = lpath
else:
from fsspec.implementations.local import (
LocalFileSystem,
make_path_posix,
trailing_sep,
)
# to make the exists check work
if self.isdir(rpath):
rpath = rpath.rstrip("/") + "/"
source_is_str = isinstance(rpath, str)
rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
if not rpaths:
return
if isinstance(lpath, str):
lpath = make_path_posix(lpath)
source_is_file = len(rpaths) == 1
dest_is_dir = isinstance(lpath, str) and (
trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
)
exists = source_is_str and (
(has_magic(rpath) and source_is_file)
or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
)
lpaths = other_paths(
rpaths,
lpath,
exists=exists,
flatten=not source_is_str,
)
for lpath, rpath in zip(lpaths, rpaths):
self.get_file(rpath, lpath, callback=None, **kwargs)
[docs]
def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None:
"""Get a file from the TOS filesystem and write to a local path.
This method will retry the download if there is error.
Parameters
----------
rpath : str
The remote path of the file to get.
lpath : str
The local path to save the file.
**kwargs : Any, optional
Additional arguments.
Raises
------
FileNotFoundError
If the file does not exist.
tos.exceptions.TosClientError
If there is a client error while getting the file.
tos.exceptions.TosServerError
If there is a server error while getting the file.
TosfsError
If there is an unknown error while getting the file.
"""
if os.path.isdir(lpath):
return
if not self.exists(rpath):
raise FileNotFoundError(rpath)
if self.isdir(rpath):
logger.debug("The remote path is a directory, skip downloading.")
return
bucket, key, version_id = self._split_path(rpath)
def _read_chunks(body: BinaryIO, f: BinaryIO) -> None:
bytes_read = 0
while True:
chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE)
if not chunk:
break
bytes_read += len(chunk)
f.write(chunk)
def download_file() -> None:
body, content_length = self._open_remote_file(
bucket, key, version_id, range_start=0, **kwargs
)
dir_path = os.path.dirname(lpath)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(lpath, "wb") as f:
retryable_func_executor(_read_chunks, args=(body, f))
retryable_func_executor(download_file)
[docs]
def walk(
self,
path: str,
maxdepth: Optional[int] = None,
topdown: bool = True,
on_error: str = "omit",
**kwargs: Any,
) -> Generator[str, List[str], List[str]]:
"""List objects under the given path.
Parameters
----------
path : str
The path to list.
maxdepth : int, optional
The maximum depth to walk to (default is None).
topdown : bool, optional
Whether to walk top-down or bottom-up (default is True).
on_error : str, optional
How to handle errors (default is 'omit').
**kwargs : Any, optional
Additional arguments.
Raises
------
ValueError
If the path is an invalid path.
"""
if path in ["", "*"] + ["{}://".format(p) for p in self.protocol]:
raise ValueError("Cannot access all of TOS via path {}.".format(path))
return super().walk(
path, maxdepth=maxdepth, topdown=topdown, on_error=on_error, **kwargs
)
[docs]
def find(
self,
path: str,
maxdepth: Optional[int] = None,
withdirs: bool = False,
detail: bool = False,
prefix: str = "",
**kwargs: Any,
) -> Union[List[str], dict]:
"""Find all files or dirs with conditions.
Like posix ``find`` command without conditions
Parameters
----------
path : str
The path to search.
maxdepth: int, optional
If not None, the maximum number of levels to descend
withdirs: bool
Whether to include directory paths in the output. This is True
when used by glob, but users usually only want files.
prefix: str
Only return files that match ``^{path}/{prefix}`` (if there is an
exact match ``filename == {path}/{prefix}``, it also will be included)
detail: bool
If True, return a dict with file information, else just the path
**kwargs: Any
Additional arguments.
"""
if path in ["", "*"] + ["{}://".format(p) for p in self.protocol]:
raise ValueError("Cannot access all of TOS via path {}.".format(path))
path = self._strip_protocol(path)
bucket, key, _ = self._split_path(path)
if not bucket:
raise ValueError("Cannot access all of TOS without specify a bucket.")
if maxdepth is not None and maxdepth < 1:
raise ValueError("maxdepth must be at least 1")
if maxdepth and prefix:
raise ValueError(
"Can not specify 'prefix' option alongside 'maxdepth' options."
)
if maxdepth:
return super().find(
bucket + "/" + key,
maxdepth=maxdepth,
withdirs=withdirs,
detail=detail,
**kwargs,
)
out = self._find_file_dir(key, path, prefix, withdirs, kwargs)
if detail:
return {o["name"]: o for o in out}
else:
return [o["name"] for o in out]
[docs]
def expand_path(
self,
path: Union[str, List[str]],
recursive: bool = False,
maxdepth: Optional[int] = None,
) -> List[str]:
"""Expand path to a list of files.
Parameters
----------
path : str
The path to expand.
recursive : bool, optional
Whether to expand recursively (default is False).
maxdepth : int, optional
The maximum depth to expand to (default is None).
**kwargs : Any, optional
Additional arguments.
Returns
-------
List[str]
A list of expanded paths.
"""
if maxdepth is not None and maxdepth < 1:
raise ValueError("maxdepth must be at least 1")
if isinstance(path, str):
return self.expand_path([path], recursive, maxdepth)
out = set()
path = [self._strip_protocol(p) for p in path]
for p in path: # can gather here
if has_magic(p):
bit = set(self.glob(p, maxdepth=maxdepth))
out |= bit
if recursive:
# glob call above expanded one depth so if maxdepth is defined
# then decrement it in expand_path call below. If it is zero
# after decrementing then avoid expand_path call.
if maxdepth is not None and maxdepth <= 1:
continue
out |= set(
self.expand_path(
list(bit),
recursive=recursive,
maxdepth=maxdepth - 1 if maxdepth is not None else None,
)
)
continue
elif recursive:
rec = set(self.find(p, maxdepth=maxdepth, withdirs=True))
out |= rec
if p not in out and (recursive is False or self.exists(p)):
# should only check once, for the root
out.add(p)
if not out:
raise FileNotFoundError(path)
return sorted(out)
[docs]
def cp_file(
self,
path1: str,
path2: str,
preserve_etag: Optional[bool] = None,
managed_copy_threshold: Optional[int] = MANAGED_COPY_MAX_THRESHOLD,
**kwargs: Any,
) -> None:
"""Copy file between locations on tos.
Parameters
----------
path1 : str
The source path of the file to copy.
path2 : str
The destination path of the file to copy.
preserve_etag : bool, optional
Whether to preserve etag while copying. If the file is uploaded
as a single part, then it will be always equivalent to the md5
hash of the file hence etag will always be preserved. But if the
file is uploaded in multi parts, then this option will try to
reproduce the same multipart upload while copying and preserve
the generated etag.
managed_copy_threshold : int, optional
The threshold size of the file to copy using managed copy. If the
size of the file is greater than this threshold, then the file
will be copied using managed copy (default is 5 * 2**30).
**kwargs : Any, optional
Additional arguments.
Raises
------
FileNotFoundError
If the source file does not exist.
ValueError
If the destination is a versioned file.
TosClientError
If there is a client error while copying the file.
TosServerError
If there is a server error while copying the file.
TosfsError
If there is an unknown error while copying the file.
"""
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
if path1 == path2:
logger.warning("Source and destination are the same: %s", path1)
return
if self.isdir(path1) and self.isdir(path2):
return
bucket, key, vers = self._split_path(path1)
info = self.info(path1, bucket, key, version_id=vers)
if not info:
raise FileNotFoundError(f"Can not get information for path: {path1}")
if info["type"] == "directory":
logger.warning("Do not support copy directory %s.", path1)
return
size = info["size"]
_, _, parts_suffix = info.get("ETag", "").strip('"').partition("-")
if preserve_etag and parts_suffix:
self._copy_etag_preserved(path1, path2, size, total_parts=int(parts_suffix))
elif size <= min(
MANAGED_COPY_MAX_THRESHOLD,
(
managed_copy_threshold
if managed_copy_threshold
else MANAGED_COPY_MAX_THRESHOLD
),
):
self._copy_basic(path1, path2, **kwargs)
else:
# if the preserve_etag is true, either the file is uploaded
# on multiple parts or the size is lower than 5GB
assert not preserve_etag
# serial multipart copy
self._copy_managed(path1, path2, size, **kwargs)
[docs]
def glob(
self, path: str, maxdepth: Optional[int] = None, **kwargs: Any
) -> Collection[Any]:
"""Return list of paths matching a glob-like pattern.
Parameters
----------
path : str
The path to search.
maxdepth : int, optional
The maximum depth to search to (default is None).
**kwargs : Any, optional
Additional arguments.
"""
if path.startswith("*"):
raise ValueError("Cannot traverse all of tosfs")
if maxdepth is not None and maxdepth < 1:
raise ValueError("maxdepth must be at least 1")
import re
seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
path = self._strip_protocol(path)
append_slash_to_dirname = ends_with_sep or path.endswith(
tuple(sep + "**" for sep in seps)
)
idx_star = path.find("*") if path.find("*") >= 0 else len(path)
idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
min_idx = min(idx_star, idx_qmark, idx_brace)
detail = kwargs.pop("detail", False)
if not has_magic(path):
if self.exists(path, **kwargs):
return {path: self.info(path, **kwargs)} if detail else [path]
return {} if detail else []
depth: Optional[int] = None
root, depth = "", path[min_idx + 1 :].count("/") + 1
if "/" in path[:min_idx]:
min_idx = path[:min_idx].rindex("/")
root = path[: min_idx + 1]
if "**" in path:
if maxdepth is not None:
idx_double_stars = path.find("**")
depth_double_stars = path[idx_double_stars:].count("/") + 1
depth = depth - depth_double_stars + maxdepth
else:
depth = None
allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
pattern = re.compile(glob_translate(path + ("/" if ends_with_sep else "")))
if isinstance(allpaths, dict):
out = {
p: info
for p, info in sorted(allpaths.items())
if pattern.match(
p + "/"
if append_slash_to_dirname and info["type"] == "directory"
else p
)
}
else:
out = {}
return out if detail else list(out)
def _rm(self, path: str) -> None:
logger.info("Removing path: %s", path)
bucket, key, _ = self._split_path(path)
if path.endswith("/") or self.isdir(path):
key = key.rstrip("/") + "/"
try:
retryable_func_executor(
lambda: self.tos_client.delete_object(bucket, key),
max_retry_num=self.max_retry_num,
)
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
######################## private methods ########################
def _list_and_batch_delete_objs(self, bucket: str, key: str) -> None:
bucket_type = self._get_bucket_type(bucket)
is_truncated = True
continuation_token = ""
all_results = []
if bucket_type == TOS_BUCKET_TYPE_FNS:
def _call_list_objects(
continuation_token: str = "",
) -> ListObjectType2Output:
return retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/",
max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS,
continuation_token=continuation_token,
),
max_retry_num=self.max_retry_num,
)
while is_truncated:
resp = _call_list_objects(continuation_token)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
all_results = resp.contents
deleting_objects = [
DeletingObject(o.key if hasattr(o, "key") else o.prefix)
for o in all_results
]
if deleting_objects:
self._delete_objects(bucket, deleting_objects)
elif bucket_type == TOS_BUCKET_TYPE_HNS:
all_results = self._list_and_collect_objects(
bucket, bucket_type, key.rstrip("/") + "/"
)
if all_results:
self._delete_objects(bucket, all_results)
else:
raise ValueError(f"Unsupported bucket type: {bucket_type}")
def _delete_objects(
self, bucket: str, deleting_objects: list[DeletingObject]
) -> None:
bucket_type = self._get_bucket_type(bucket)
if bucket_type == TOS_BUCKET_TYPE_FNS:
delete_resp = retryable_func_executor(
lambda: self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
),
max_retry_num=self.max_retry_num,
)
if delete_resp.error:
for d in delete_resp.error:
logger.warning("Deleted object: %s failed", d)
else:
def _call_delete_object(obj: DeletingObject) -> None:
retryable_func_executor(
lambda: self.tos_client.delete_object(bucket, obj.key),
max_retry_num=self.max_retry_num,
)
# Preferentially delete subpaths with longer keys
for obj in sorted(deleting_objects, key=lambda x: len(x.key), reverse=True):
_call_delete_object(obj)
def _list_and_collect_objects(
self,
bucket: str,
bucket_type: str,
prefix: str,
collected_objects: Optional[List[DeletingObject]] = None,
) -> List[DeletingObject]:
if collected_objects is None:
collected_objects = []
collected_keys = {obj.key for obj in collected_objects}
is_truncated = True
continuation_token = ""
while is_truncated:
def _call_list_objects_type2(
continuation_token: str = continuation_token, prefix: str = prefix
) -> ListObjectType2Output:
return self.tos_client.list_objects_type2(
bucket,
prefix=prefix,
max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS,
continuation_token=continuation_token,
delimiter="/" if bucket_type == TOS_BUCKET_TYPE_HNS else None,
)
resp = retryable_func_executor(
_call_list_objects_type2,
max_retry_num=self.max_retry_num,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
for obj in resp.contents:
key = obj.key if hasattr(obj, "key") else obj.prefix
if key not in collected_keys:
collected_objects.append(DeletingObject(key=key))
collected_keys.add(key)
for common_prefix in resp.common_prefixes:
key = common_prefix.prefix
if key not in collected_keys:
collected_objects.append(DeletingObject(key=key))
collected_keys.add(key)
if bucket_type == TOS_BUCKET_TYPE_HNS:
self._list_and_collect_objects(
bucket, bucket_type, common_prefix.prefix, collected_objects
)
return collected_objects
def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None:
"""Copy file between locations on tos.
Not allowed where the origin is larger than 5GB.
"""
buc1, key1, ver1 = self._split_path(path1)
buc2, key2, ver2 = self._split_path(path2)
if ver2:
raise ValueError("Cannot copy to a versioned file!")
retryable_func_executor(
lambda: self.tos_client.copy_object(
bucket=buc2,
key=key2,
src_bucket=buc1,
src_key=key1,
src_version_id=ver1,
),
max_retry_num=self.max_retry_num,
)
def _copy_etag_preserved(
self, path1: str, path2: str, size: int, total_parts: int, **kwargs: Any
) -> None:
"""Copy file as multiple-part while preserving the etag."""
bucket1, key1, version1 = self._split_path(path1)
bucket2, key2, version2 = self._split_path(path2)
upload_id = None
try:
mpu = retryable_func_executor(
lambda: self.tos_client.create_multipart_upload(bucket2, key2),
max_retry_num=self.max_retry_num,
)
upload_id = mpu.upload_id
parts = []
brange_first = 0
for i in range(1, total_parts + 1):
part_size = min(size - brange_first, PART_MAX_SIZE)
brange_last = brange_first + part_size - 1
if brange_last > size:
brange_last = size - 1
def _call_upload_part_copy(
i: int = i,
brange_first: int = brange_first,
brange_last: int = brange_last,
) -> UploadPartCopyOutput:
return self.tos_client.upload_part_copy(
bucket=bucket2,
key=key2,
part_number=i,
upload_id=upload_id,
src_bucket=bucket1,
src_key=key1,
copy_source_range_start=brange_first,
copy_source_range_end=brange_last,
)
part = retryable_func_executor(
_call_upload_part_copy,
args=(i, brange_first, brange_last),
max_retry_num=self.max_retry_num,
)
parts.append(
PartInfo(
part_number=part.part_number,
etag=part.etag,
part_size=size,
offset=None,
hash_crc64_ecma=None,
is_completed=None,
)
)
brange_first += part_size
retryable_func_executor(
lambda: self.tos_client.complete_multipart_upload(
bucket2, key2, upload_id, parts
),
max_retry_num=self.max_retry_num,
)
except Exception as e:
retryable_func_executor(
lambda: self.tos_client.abort_multipart_upload(
bucket2, key2, upload_id
),
max_retry_num=self.max_retry_num,
)
raise TosfsError(f"Copy failed ({path1} -> {path2}): {e}") from e
def _copy_managed(
self,
path1: str,
path2: str,
size: int,
block: int = MANAGED_COPY_MAX_THRESHOLD,
**kwargs: Any,
) -> None:
"""Copy file between locations on tos as multiple-part.
block: int
The size of the pieces, must be larger than 5MB and at
most MANAGED_COPY_MAX_THRESHOLD.
Smaller blocks mean more calls, only useful for testing.
"""
if block < MANAGED_COPY_MIN_THRESHOLD or block > MANAGED_COPY_MAX_THRESHOLD:
raise ValueError("Copy block size must be 5MB<=block<=5GB")
bucket1, key1, version1 = self._split_path(path1)
bucket2, key2, version2 = self._split_path(path2)
upload_id = None
try:
mpu = retryable_func_executor(
lambda: self.tos_client.create_multipart_upload(bucket2, key2),
max_retry_num=self.max_retry_num,
)
upload_id = mpu.upload_id
def _call_upload_part_copy(
i: int, brange_first: int, brange_last: int
) -> UploadPartCopyOutput:
return self.tos_client.upload_part_copy(
bucket=bucket2,
key=key2,
part_number=i + 1,
upload_id=upload_id,
src_bucket=bucket1,
src_key=key1,
copy_source_range_start=brange_first,
copy_source_range_end=brange_last,
)
out = [
retryable_func_executor(
_call_upload_part_copy,
args=(i, brange_first, brange_last),
max_retry_num=self.max_retry_num,
)
for i, (brange_first, brange_last) in enumerate(get_brange(size, block))
]
parts = [
PartInfo(
part_number=i + 1,
etag=o.etag,
part_size=size,
offset=None,
hash_crc64_ecma=None,
is_completed=None,
)
for i, o in enumerate(out)
]
retryable_func_executor(
lambda: self.tos_client.complete_multipart_upload(
bucket2, key2, upload_id, parts
),
max_retry_num=self.max_retry_num,
)
except Exception as e:
retryable_func_executor(
lambda: self.tos_client.abort_multipart_upload(
bucket2, key2, upload_id
),
max_retry_num=self.max_retry_num,
)
raise TosfsError(f"Copy failed ({path1} -> {path2}): {e}") from e
def _find_file_dir(
self, key: str, path: str, prefix: str, withdirs: bool, kwargs: Any
) -> List[dict]:
out = self._ls_dirs_and_files(
path,
delimiter="",
include_self=True,
prefix=prefix,
recursive=True,
)
if not out and key:
try:
out = [self.info(path)]
except FileNotFoundError:
out = []
dirs = {
self._parent(o["name"]): {
"Key": self._parent(o["name"]).rstrip("/"),
"Size": 0,
"size": 0,
"name": self._parent(o["name"]).rstrip("/"),
"type": "directory",
}
for o in out
if len(path) <= len(self._parent(o["name"]))
}
if withdirs:
for dir_info in dirs.values():
if dir_info not in out:
out.append(dir_info)
else:
out = [o for o in out if o["type"] == "file"]
return sorted(out, key=lambda x: x["name"])
def _open_remote_file(
self,
bucket: str,
key: str,
version_id: Optional[str],
range_start: int,
**kwargs: Any,
) -> Tuple[BinaryIO, int]:
if "callback" in kwargs:
kwargs.pop("callback")
try:
resp = retryable_func_executor(
lambda: self.tos_client.get_object(
bucket,
key,
version_id=version_id,
range_start=range_start,
**kwargs,
),
max_retry_num=self.max_retry_num,
)
except TosServerError as e:
if e.status_code == INVALID_RANGE_CODE:
obj_info = self._object_info(bucket=bucket, key=key)
if obj_info and (
obj_info["size"] == 0 or range_start == obj_info["size"]
):
return io.BytesIO(), 0
else:
raise e
return resp.content, resp.content_length
def _bucket_info(self, bucket: str) -> dict:
"""Get the information of a bucket.
Parameters
----------
bucket : str
The name of the bucket.
Returns
-------
dict
A dictionary containing the bucket information with the following keys:
- 'Key': The bucket name.
- 'Size': The size of the bucket (always 0).
- 'StorageClass': The storage class of the bucket (always 'BUCKET').
- 'size': The size of the bucket (always 0).
- 'type': The type of the bucket (always 'directory').
- 'name': The bucket name.
Raises
------
tos.exceptions.TosClientError
If there is a client error while accessing the bucket.
tos.exceptions.TosServerError
If there is a server error while accessing the bucket.
FileNotFoundError
If the bucket does not exist.
TosfsError
If there is an unknown error while accessing the bucket.
"""
try:
retryable_func_executor(
lambda: self.tos_client.head_bucket(bucket),
max_retry_num=self.max_retry_num,
)
return self._fill_bucket_info(bucket)
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
raise FileNotFoundError(bucket) from e
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
def _object_info(
self, bucket: str, key: str, version_id: Optional[str] = None
) -> Optional[dict]:
"""Get the information of an object.
Parameters
----------
bucket : str
The bucket name.
key : str
The object key.
version_id : str, optional
The version id of the object (default is None).
Returns
-------
dict
A dictionary containing the object information with the following keys:
- 'ETag': The entity tag of the object.
- 'LastModified': The last modified date of the object.
- 'size': The size of the object in bytes.
- 'name': The full path of the object.
- 'type': The type of the object (always 'file').
- 'StorageClass': The storage class of the object.
- 'VersionId': The version id of the object.
- 'ContentType': The content type of the object.
Raises
------
tos.exceptions.TosClientError
If there is a client error while accessing the object.
tos.exceptions.TosServerError
If there is a server error while accessing the object.
TosfsError
If there is an unknown error while accessing the object.
"""
try:
out = retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key, version_id=version_id),
max_retry_num=self.max_retry_num,
)
return {
"ETag": out.etag or "",
"LastModified": out.last_modified or "",
"size": out.content_length or 0,
"name": "/".join((bucket, key)),
"type": "file",
"StorageClass": out.storage_class or "STANDARD",
"VersionId": out.version_id or "",
"ContentType": out.content_type or "",
}
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND or (
self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS
and e.status_code == CONFLICT_CODE
and e.header._store["x-tos-ec"][1] == "0026-00000020"
):
pass
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
return None
def _get_dir_info(self, bucket: str, key: str, fullpath: str) -> Optional[dict]:
try:
# We check to see if the path is a directory by attempting to list its
# contexts. If anything is found, it is indeed a directory
out = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/" if key else "",
delimiter="/",
max_keys=1,
),
max_retry_num=self.max_retry_num,
)
if out.key_count > 0 or out.contents or out.common_prefixes:
return {
"name": fullpath,
"Key": fullpath,
"Size": 0,
"size": 0,
"type": "directory",
}
return None
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
def _exists_bucket(self, bucket: str) -> bool:
"""Check if a bucket exists in the TOS.
Parameters
----------
bucket : str
The name of the bucket to check for existence.
Returns
-------
bool
True if the bucket exists, False otherwise.
Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the bucket.
tos.exceptions.TosServerError
If there is a server error while checking the bucket.
TosfsError
If there is an unknown error while checking the bucket.
Examples
--------
>>> fs = TosFileSystem()
>>> fs._exists_bucket("mybucket")
True
>>> fs._exists_bucket("nonexistentbucket")
False
"""
try:
retryable_func_executor(
lambda: self.tos_client.head_bucket(bucket),
max_retry_num=self.max_retry_num,
)
return True
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
def _ls_buckets(self) -> List[dict]:
"""List all buckets in the account.
Returns
-------
List[dict]
A list of dictionaries,
each containing information about a bucket with the following keys:
- 'Key': The bucket name.
- 'Size': The size of the bucket (always 0).
- 'StorageClass': The storage class of the bucket (always 'BUCKET').
- 'size': The size of the bucket (always 0).
- 'type': The type of the bucket (always 'directory').
- 'name': The bucket name.
Raises
------
tos.exceptions.TosClientError
If there is a client error while listing the buckets.
tos.exceptions.TosServerError
If there is a server error while listing the buckets.
TosfsError
If there is an unknown error while listing the buckets.
"""
try:
resp = retryable_func_executor(
lambda: self.tos_client.list_buckets(), max_retry_num=self.max_retry_num
)
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
return [self._fill_bucket_info(bucket.name) for bucket in resp.buckets]
def _ls_dirs_and_files(
self,
path: str,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
delimiter: str = "/",
prefix: str = "",
include_self: bool = False,
versions: bool = False,
recursive: bool = False,
) -> List[dict]:
bucket, key, _ = self._split_path(path)
if not prefix:
prefix = ""
if key:
prefix = key.lstrip("/") + "/" + prefix
logger.debug("Get directory listing for %s", path)
dirs = []
files = []
seen_names = set()
for obj in self._ls_objects(
bucket,
max_items=max_items,
delimiter=delimiter,
prefix=prefix,
include_self=include_self,
versions=versions,
recursive=recursive,
):
if isinstance(obj, CommonPrefixInfo):
dir_info = self._fill_dir_info(bucket, obj)
dir_name = dir_info["name"]
if dir_name not in seen_names:
dirs.append(dir_info)
seen_names.add(dir_name)
elif obj.key.endswith("/"):
dir_info = self._fill_dir_info(bucket, None, obj.key)
dir_name = dir_info["name"]
if dir_name not in seen_names:
dirs.append(dir_info)
seen_names.add(dir_name)
else:
file_info = self._fill_file_info(obj, bucket, versions)
file_name = file_info["name"]
if file_name not in seen_names:
files.append(file_info)
seen_names.add(file_name)
files += dirs
return files
def _ls_objects(
self,
bucket: str,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
delimiter: str = "/",
prefix: str = "",
include_self: bool = False,
versions: bool = False,
recursive: bool = False,
) -> List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]:
if versions:
raise ValueError(
"versions cannot be specified if the filesystem is "
"not version aware."
)
bucket_type = self._get_bucket_type(bucket)
all_results = []
if recursive and bucket_type == TOS_BUCKET_TYPE_HNS:
def _recursive_list(bucket: str, prefix: str) -> None:
resp = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
prefix=prefix,
delimiter="/",
max_keys=max_items,
),
max_retry_num=self.max_retry_num,
)
all_results.extend(resp.contents + resp.common_prefixes)
for common_prefix in resp.common_prefixes:
_recursive_list(bucket, common_prefix.prefix)
_recursive_list(bucket, prefix)
else:
is_truncated = True
continuation_token = ""
while is_truncated:
def _call_list_objects_type2(
continuation_token: str = continuation_token,
) -> ListObjectType2Output:
return self.tos_client.list_objects_type2(
bucket,
prefix,
start_after=prefix if not include_self else None,
delimiter=delimiter,
max_keys=max_items,
continuation_token=continuation_token,
)
resp = retryable_func_executor(
_call_list_objects_type2,
args=(continuation_token,),
max_retry_num=self.max_retry_num,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
all_results.extend(resp.contents + resp.common_prefixes)
return all_results
def _prefix_search_for_exists(self, bucket: str, key: str) -> bool:
bucket_type = self._get_bucket_type(bucket)
if bucket_type == TOS_BUCKET_TYPE_FNS:
resp = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
key.rstrip("/") + "/",
start_after=key.rstrip("/") + "/",
max_keys=1,
),
max_retry_num=self.max_retry_num,
)
return len(resp.contents) > 0
elif bucket_type == TOS_BUCKET_TYPE_HNS:
def search_in_common_prefixes(bucket: str, prefix: str) -> bool:
resp = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
prefix,
delimiter="/",
max_keys=1,
),
max_retry_num=self.max_retry_num,
)
if len(resp.contents) > 0:
return True
for common_prefix in resp.common_prefixes:
if search_in_common_prefixes(bucket, common_prefix):
return True
return False
resp = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
key.rstrip("/") + "/",
delimiter="/",
max_keys=1,
),
max_retry_num=self.max_retry_num,
)
if len(resp.contents) > 0:
return True
return search_in_common_prefixes(bucket, key.rstrip("/") + "/")
else:
raise ValueError(f"Unsupported bucket type {bucket_type}")
def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]:
"""Normalise tos path string into bucket and key.
Parameters
----------
path : string
Input path, like `tos://mybucket/path/to/file`
Examples
--------
>>> self._split_path("tos://mybucket/path/to/file")
['mybucket', 'path/to/file', None]
# pylint: disable=line-too-long
>>> self._split_path("tos://mybucket/path/to/versioned_file?versionId=some_version_id")
['mybucket', 'path/to/versioned_file', 'some_version_id']
"""
path = self._strip_protocol(path)
path = path.lstrip("/")
if "/" not in path:
return path, "", None
bucket, keypart = find_bucket_key(path)
key, _, version_id = keypart.partition("?versionId=")
if self.tag_enabled:
self.bucket_tag_mgr.add_bucket_tag(bucket, self.endpoint)
return (
bucket,
key,
version_id if self.version_aware and version_id else None,
)
@staticmethod
def _extract_bucket_type(bucket_type_resp: Any) -> Optional[str]:
if bucket_type_resp is None:
return None
# old tos sdk: directly returns str
if isinstance(bucket_type_resp, str):
return bucket_type_resp
# new tos sdk: GetBucketTypeOutput / HeadBucketOutput
bucket_type = getattr(bucket_type_resp, "bucket_type", None)
if bucket_type:
return bucket_type
# possible newer shape: GetBucketInfoOutput.bucket_info.bucket_type
bucket_info = getattr(bucket_type_resp, "bucket_info", None)
if bucket_info is not None:
return getattr(bucket_info, "bucket_type", None)
return None
def _get_bucket_type(self, bucket: str) -> str:
bucket_type_resp: Any
get_bucket_type = getattr(self.tos_client, "get_bucket_type", None)
legacy_get_bucket_type = getattr(self.tos_client, "_get_bucket_type", None)
if callable(get_bucket_type):
bucket_type_resp = retryable_func_executor(
lambda: get_bucket_type(bucket),
max_retry_num=self.max_retry_num,
)
elif callable(legacy_get_bucket_type):
bucket_type_resp = retryable_func_executor(
lambda: legacy_get_bucket_type(bucket),
max_retry_num=self.max_retry_num,
)
else:
bucket_type_resp = retryable_func_executor(
lambda: self.tos_client.head_bucket(bucket),
max_retry_num=self.max_retry_num,
)
bucket_type = self._extract_bucket_type(bucket_type_resp)
if not bucket_type:
return TOS_BUCKET_TYPE_FNS
return bucket_type
def _is_hns_bucket(self, bucket: str) -> bool:
return self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS
def _is_fns_bucket(self, bucket: str) -> bool:
return self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_FNS
def _init_tag_manager(self) -> None:
auth = self.tos_client.auth
if isinstance(auth, CredentialProviderAuth):
credentials = auth.credentials_provider.get_credentials()
self.bucket_tag_mgr = BucketTagMgr(
credentials.get_ak(),
credentials.get_sk(),
credentials.get_security_token(),
auth.region,
)
else:
raise TosfsError(
"Currently only support CredentialProviderAuth type, "
"please check if you set (ak & sk) or (session token) "
"correctly."
)
@staticmethod
def _fill_dir_info(
bucket: str, common_prefix: Optional[CommonPrefixInfo], key: str = ""
) -> dict:
name = "/".join(
[bucket, common_prefix.prefix[:-1] if common_prefix else key]
).rstrip("/")
return {
"name": name,
"Key": name,
"Size": 0,
"size": 0,
"type": "directory",
}
@staticmethod
def _fill_file_info(obj: ListedObject, bucket: str, versions: bool = False) -> dict:
result = {
"Key": f"{bucket}/{obj.key}",
"size": obj.size,
"name": f"{bucket}/{obj.key}",
"type": "file",
"LastModified": obj.last_modified,
}
if (
isinstance(obj, ListedObjectVersion)
and versions
and obj.version_id
and obj.version_id != "null"
):
result["name"] += f"?versionId={obj.version_id}"
return result
@staticmethod
def _fill_bucket_info(bucket_name: str) -> dict:
return {
"Key": bucket_name,
"Size": 0,
"StorageClass": "BUCKET",
"size": 0,
"type": "directory",
"name": bucket_name,
}
class TosFile(AbstractBufferedFile):
"""File-like operations for TOS."""
def __init__(
self,
fs: TosFileSystem,
path: str,
mode: str = "rb",
block_size: Union[int, str] = "default",
autocommit: bool = True,
cache_type: str = "readahead",
**kwargs: Any,
):
"""Instantiate a TOS file."""
bucket, key, path_version_id = fs._split_path(path)
super().__init__(
fs,
path,
mode,
block_size=block_size,
autocommit=autocommit,
cache_type=cache_type,
**kwargs,
)
self.fs = fs
self.bucket = bucket
self.key = key
self.version_id = path_version_id
self.path = path
self.mode = mode
self.autocommit = autocommit
self.append_block = False
self.append_offset = 0
self.buffer: Optional[io.BytesIO] = io.BytesIO()
self.parts: list = []
self.upload_id = None
self.part_number = 1
self._check_init_params(key, path, mode, block_size)
if "a" in mode:
self.append_block = True
try:
head = retryable_func_executor(
lambda: self.fs.tos_client.head_object(bucket, key),
max_retry_num=self.fs.max_retry_num,
)
self.append_offset = head.content_length
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
pass
else:
raise e
def _check_init_params(
self, key: str, path: str, mode: str, block_size: Union[int, str]
) -> None:
if not key:
raise ValueError("Attempt to open non key-like path: %s" % path)
if "r" not in mode and int(block_size) < MPU_PART_SIZE_THRESHOLD:
raise ValueError(
f"Block size must be >= {MPU_PART_SIZE_THRESHOLD // (2**20)}MB."
)
def _initiate_upload(self) -> None:
"""Create remote file/upload."""
if self.autocommit and self.tell() < self.blocksize:
# only happens when closing small file, use on-shot PUT
return
else:
logger.debug("Initiate upload for %s", self)
self.upload_id = retryable_func_executor(
lambda: self.fs.tos_client.create_multipart_upload(
self.bucket, self.key
).upload_id,
max_retry_num=self.fs.max_retry_num,
)
def _upload_chunk(self, final: bool = False) -> bool:
"""Write one part of a multi-block file upload.
Parameters
----------
final: bool
This is the last block, so should complete file, if
self.autocommit is True.
"""
bucket, key, _ = self.fs._split_path(self.path)
if self.buffer:
logger.debug(
"Upload for %s, final=%s, loc=%s, buffer loc=%s",
self,
final,
self.loc,
self.buffer.tell(),
)
if self.append_block:
self._append_chunk()
else:
if (
self.autocommit
and final
and self.tell() < min(self.blocksize, self.fs.multipart_threshold)
):
# only happens when closing small file, use one-shot PUT
pass
else:
if self.buffer is None:
self.buffer = io.BytesIO()
self.buffer.seek(0)
content = self.buffer.read()
part = retryable_func_executor(
lambda: self.fs.tos_client.upload_part(
self.bucket,
self.key,
self.upload_id,
self.part_number,
content=content,
),
max_retry_num=self.fs.max_retry_num,
)
self.parts.append(
PartInfo(
part_number=self.part_number,
etag=part.etag,
part_size=len(content),
offset=None,
hash_crc64_ecma=None,
is_completed=None,
)
)
self.part_number += 1
self.buffer = io.BytesIO()
if self.autocommit and final:
self.commit()
return not final
def _append_chunk(self) -> None:
"""Append or create to a file."""
if self.buffer:
self.buffer.seek(0)
content = self.buffer.read()
if content:
resp = retryable_func_executor(
lambda: self.fs.tos_client.append_object(
self.bucket,
self.key,
offset=self.append_offset,
content=content,
),
)
self.append_offset = resp.next_append_offset
def _fetch_range(self, start: int, end: int) -> bytes:
if start == end:
logger.warning(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
self.bucket,
self.key,
start,
end,
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end)
def fetch() -> bytes:
with io.BytesIO() as temp_buffer:
for chunk in self.fs.tos_client.get_object(
self.bucket,
self.key,
self.version_id,
range_start=start,
range_end=end,
):
temp_buffer.write(chunk)
return temp_buffer.getvalue()
return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)
def commit(self) -> None:
"""Complete multipart upload or PUT."""
logger.debug("Commit %s", self)
if self.tell() == 0 and self.upload_id is not None:
if self.buffer is not None:
logger.debug("Empty file committed %s", self)
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.fs.touch(self.path, **self.kwargs)
elif self.upload_id is None and self.buffer is not None:
logger.debug("One-shot upload of %s", self)
self.buffer.seek(0)
data = self.buffer.read()
retryable_func_executor(
lambda: self.fs.tos_client.put_object(
self.bucket, self.key, content=data
),
max_retry_num=self.fs.max_retry_num,
)
elif self.upload_id is not None:
logger.debug("Complete multi-part upload for %s ", self)
retryable_func_executor(
lambda: self.fs.tos_client.complete_multipart_upload(
self.bucket,
self.key,
upload_id=self.upload_id,
parts=self.parts,
),
max_retry_num=self.fs.max_retry_num,
)
self.buffer = None
def discard(self) -> None:
"""Close the file without writing."""
if self.upload_id:
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.buffer = None