from typing import TYPE_CHECKING, Any
import os
from os.path import abspath, basename, join
import shutil
from threading import Timer
import time
import logging
import json
import zipfile
from urllib.parse import urlparse
from tqdm.auto import tqdm
from rastervision.pipeline import rv_config_ as rv_config
from rastervision.pipeline.file_system import FileSystem
from rastervision.pipeline.file_system.local_file_system import (
LocalFileSystem, make_dir)
if TYPE_CHECKING:
from tempfile import TemporaryDirectory
log = logging.getLogger(__name__)
[docs]def get_local_path(uri: str, download_dir: str,
fs: FileSystem | None = None) -> str:
"""Return the path where a local copy of URI should be stored.
If ``uri`` is local, return it. If it's remote, we generate a path for it
within ``download_dir``.
Args:
uri: the URI of the file to be copied
download_dir: path of the local directory in which files should
be copied
fs: if supplied, use fs instead of automatically chosen FileSystem for
URI
Returns:
A local path.
"""
if uri is None:
return None
if not fs:
fs = FileSystem.get_file_system(uri, 'r')
path = fs.local_path(uri, download_dir)
return path
[docs]def sync_to_dir(src_dir: str,
dst_dir_uri: str,
delete: bool = False,
fs: FileSystem | None = None) -> None: # pragma: no cover
"""Synchronize a local source directory to destination directory.
Transfers files from source to destination directories so that the
destination has all the source files. If FileSystem is remote, this
involves uploading.
Args:
src_dir: path of local source directory
dst_dir_uri: URI of destination directory
delete: if True, delete files in the destination to match those in the
source directory
fs: if supplied, use fs instead of automatically chosen FileSystem for
dst_dir_uri
"""
if not fs:
fs = FileSystem.get_file_system(dst_dir_uri, 'w')
fs.sync_to_dir(src_dir, dst_dir_uri, delete=delete)
[docs]def sync_from_dir(src_dir_uri: str,
dst_dir: str,
delete: bool = False,
fs: FileSystem | None = None):
"""Synchronize a source directory to local destination directory.
Transfers files from source to destination directories so that the
destination has all the source files. If FileSystem is remote, this
involves downloading.
Args:
src_dir_uri: URI of source directory
dst_dir: path of local destination directory
delete: if True, delete files in the destination to match those in the
source directory
fs: if supplied, use fs instead of automatically chosen FileSystem for
dst_dir_uri
"""
if not fs:
fs = FileSystem.get_file_system(src_dir_uri, 'r')
fs.sync_from_dir(src_dir_uri, dst_dir, delete=delete)
[docs]def start_sync(src_dir: str,
dst_dir_uri: str,
sync_interval: int = 600,
fs: FileSystem | None = None) -> None: # pragma: no cover
"""Repeatedly sync a local source directory to a destination on a schedule.
Calls :func:`sync_to_dir` on a schedule.
Args:
src_dir: path of the local source directory
dst_dir_uri: URI of destination directory
sync_interval: period in seconds for syncing
fs: if supplied, use fs instead of automatically chosen FileSystem
"""
def _sync_dir():
while True:
time.sleep(sync_interval)
log.info(f'Syncing {src_dir} to {dst_dir_uri}...')
sync_to_dir(src_dir, dst_dir_uri, delete=False, fs=fs)
class SyncThread:
def __init__(self):
thread = Timer(0.68, _sync_dir)
thread.daemon = True
thread.start()
self.thread = thread
def __enter__(self):
return self.thread
def __exit__(self, type, value, traceback):
self.thread.cancel()
return SyncThread()
[docs]def download_if_needed(uri: str,
download_dir: str | None = None,
fs: FileSystem | None = None,
use_cache: bool = True) -> str:
"""Download a file to a directory if remote and return its local path.
The full local path, within ``download_dir``, is determined by
:func:`.get_local_path`. If a file doesn't already exists at that path, it
is downloaded.
Args:
uri: URI of file to download. If this is a local path, it will be
returned as is.
download_dir: Local directory to download file into. If ``None``, the
file will be downloaded to cache dir as defined by ``RVConfig``.
Defaults to ``None``.
fs: If provided, use ``fs`` instead of the automatically chosen
:class:`.FileSystem` for ``uri``. Defaults to ``None``.
use_cache: If ``False`` and the file is remote, download it regardless
of whether it exists in cache. Defaults to ``True``.
Returns:
Local path to file.
Raises:
NotReadableError: if URI cannot be read from.
"""
if download_dir is None:
download_dir = rv_config.get_cache_dir()
if not fs:
fs = FileSystem.get_file_system(uri, 'r')
local_path = get_local_path(uri, download_dir, fs=fs)
if local_path == uri:
return local_path
if use_cache and file_exists(local_path, include_dir=False):
log.info(f'Using cached file {local_path}.')
return local_path
log.info(f'Downloading {uri} to {local_path}...')
make_dir(local_path, use_dirname=True)
fs.copy_from(uri, local_path)
return local_path
[docs]def download_or_copy(uri: str, target_dir: str,
fs: FileSystem | None = None) -> str:
"""Download or copy a file to a directory and return the local file path.
If the file already exists in ``target_dir``, nothing is done. If the file
is elsewhere but still local, it is copied to ``target_dir``. If it is
remote, it is downloaded to the cache dir and then moved to ``target_dir``.
Args:
uri: URI of file.
target_dir: Local directory to download or copy file to.
fs: If supplied, use fs instead of automatically chosen
:class:`.FileSystem` for ``uri``.
Returns:
Local path to file.
"""
target_path = join(target_dir, basename(uri))
if file_exists(target_path, fs=LocalFileSystem, include_dir=False):
pass
elif is_local(uri):
shutil.copy(uri, target_path)
else:
download_path = download_if_needed(uri, fs=fs)
shutil.move(download_path, target_path)
return target_path
[docs]def file_exists(uri: str,
fs: FileSystem | None = None,
include_dir: bool = True) -> bool:
"""Check if file exists.
Args:
uri: URI of file
fs: If supplied, use fs instead of automatically chosen
:class:`.FileSystem` for ``uri``.
include_dir: Include directories in check, if the file system
supports directory reads. Otherwise only return true if a single
file exists at the URI. Defaults to ``True``.
"""
if not fs:
fs = FileSystem.get_file_system(uri, 'r')
return fs.file_exists(uri, include_dir)
[docs]def list_paths(uri: str, ext: str = '', fs: FileSystem | None = None,
**kwargs) -> list[str]:
"""List paths rooted at URI.
Optionally only include paths with a certain file extension.
Args:
uri: The URI of a directory
ext: The optional file extension to filter by
fs: If supplied, use fs instead of automatically chosen
:class:`.FileSystem` for ``uri``.
**kwargs: Extra kwargs to pass to ``fs.list_paths()``.
"""
if uri is None:
return None
if not fs:
fs = FileSystem.get_file_system(uri, 'r')
return fs.list_paths(uri, ext=ext, **kwargs)
[docs]def upload_or_copy(src_path: str, dst_uri: str,
fs: FileSystem | None = None) -> None:
"""Upload or copy a file.
If ``dst_uri`` is local, the file is copied. Otherwise, it is uploaded.
Args:
src_path: Path to source file.
dst_uri: URI of destination for file.
fs: If supplied, use fs instead of automatically chosen
:class:`.FileSystem` for ``dst_uri``.
Raises:
NotWritableError: if dst_uri cannot be written to
"""
if not file_exists(src_path, fs=LocalFileSystem, include_dir=False):
raise FileNotFoundError(f'{src_path} does not exist.')
if is_local(dst_uri) and abspath(src_path) == abspath(dst_uri):
return
log.info(f'Uploading {src_path} to {dst_uri}')
if not fs:
fs = FileSystem.get_file_system(dst_uri, 'w')
fs.copy_to(src_path, dst_uri)
[docs]def file_to_str(uri: str, fs: FileSystem | None = None) -> str:
"""Load contents of text file into a string.
Args:
uri: URI of file.
fs: If supplied, use fs instead of automatically chosen FileSystem.
Returns:
Contents of text file.
Raises:
NotReadableError: If URI cannot be read.
"""
if not fs:
fs = FileSystem.get_file_system(uri, 'r')
return fs.read_str(uri)
[docs]def str_to_file(content_str: str, uri: str,
fs: FileSystem | None = None) -> None:
"""Writes string to text file.
Args:
content_str: string to write
uri: URI of file to write
fs: if supplied, use fs instead of automatically chosen FileSystem
Raise:
NotWritableError if uri cannot be written
"""
if not fs:
fs = FileSystem.get_file_system(uri, 'r')
return fs.write_str(uri, content_str)
[docs]def file_to_json(uri: str) -> Any:
"""Load data from JSON file at uri."""
return json.loads(file_to_str(uri))
[docs]def json_to_file(obj: Any, uri: str) -> None:
"""Serialize obj to JSON and upload to uri."""
str_to_file(json.dumps(obj), uri)
[docs]def zipdir(dir: str, zip_path: str) -> None:
"""Save a zip file with contents of directory.
Contents of directory will be at root of zip file.
Args:
dir: directory to zip
zip_path: path to zip file to create
"""
make_dir(zip_path, use_dirname=True)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as ziph:
with tqdm(desc='Zipping', delay=5) as bar:
for dirpath, dirnames, filenames in os.walk(dir):
for fn in filenames:
bar.set_postfix_str(fn)
src = join(dirpath, fn)
dst = join(dirpath[len(dir):], fn)
ziph.write(src, dst)
bar.update(1)
[docs]def unzip(zip_path: str, target_dir: str) -> None:
"""Unzip contents of zip file at zip_path into target_dir.
Creates target_dir if needed.
"""
make_dir(target_dir)
with zipfile.ZipFile(zip_path, 'r') as zipf:
zipf.extractall(target_dir)
[docs]def is_local(uri: str) -> bool:
return FileSystem.get_file_system(uri) == LocalFileSystem
[docs]def is_archive(uri: str) -> bool:
"""Check if the URI's extension represents an archived file."""
formats = sum((fmts for _, fmts, _ in shutil.get_unpack_formats()), [])
return any(uri.endswith(fmt) for fmt in formats)
[docs]def get_tmp_dir() -> 'TemporaryDirectory':
"""Return temporary directory given by the :class:`RVConfig`.
Returns:
TemporaryDirectory: A context manager.
"""
return rv_config.get_tmp_dir()
[docs]def uri_to_vsi_path(uri: str) -> str:
"""A function to convert URIs to VSI path strings.
Args:
uri: URI of the file. Acceptable URI schemes are file, s3, gs, http,
https, and ftp.
"""
URI_SCHEME_TO_VSI = {
'http': 'vsicurl',
'https': 'vsicurl',
'ftp': 'vsicurl',
's3': 'vsis3',
'gs': 'vsigs',
}
parsed = urlparse(uri)
scheme, netloc, path = parsed.scheme, parsed.netloc, parsed.path
if scheme in URI_SCHEME_TO_VSI:
return join('/', URI_SCHEME_TO_VSI[scheme], f'{netloc}{path}')
# assume file schema
return abspath(join(netloc, path))