Skip to content

File

File is a special DataModel, which is automatically generated when a DataChain is created from files, such as in dc.read_storage:

import datachain as dc

chain = dc.read_storage("gs://datachain-demo/dogs-and-cats")
chain.print_schema()

Output:

file: File@v1
    source: str
    path: str
    size: int
    version: str
    etag: str
    is_latest: bool
    last_modified: datetime
    location: Union[dict, list[dict], NoneType]

File classes include various metadata fields describing the underlying file, along with methods to read and manipulate file contents.

File

File(**kwargs)

Bases: DataModel

DataModel for reading binary files.

Attributes:

  • source (str) –

    The source of the file (e.g., 's3://bucket-name/').

  • path (str) –

    The path to the file (e.g., 'path/to/file.txt').

  • size (int) –

    The size of the file in bytes. Defaults to 0.

  • version (str) –

    The version of the file. Defaults to an empty string.

  • etag (str) –

    The ETag of the file. Defaults to an empty string.

  • is_latest (bool) –

    Whether the file is the latest version. Defaults to True.

  • last_modified (datetime) –

    The last modified timestamp of the file. Defaults to Unix epoch (1970-01-01T00:00:00).

  • location (dict | list[dict]) –

    The location of the file. Defaults to None.

Source code in datachain/lib/file.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self._catalog = None
    self._caching_enabled: bool = False
    self._download_cb: Callback = DEFAULT_CALLBACK

name property

name: str

The file name extracted from the path.

Example
file = File(source="s3://bucket", path="data/subdir/image.jpg")
file.name  # 'image.jpg'

parent property

parent: str

The parent directory of the file, extracted from the path.

Example
file = File(source="s3://bucket", path="data/subdir/image.jpg")
file.parent  # 'data/subdir'

as_audio_file

as_audio_file() -> AudioFile

Convert the file to a AudioFile object.

Source code in datachain/lib/file.py
def as_audio_file(self) -> "AudioFile":
    """Convert the file to a `AudioFile` object."""
    if isinstance(self, AudioFile):
        return self
    file = AudioFile(**self.model_dump())
    file._set_stream(self._catalog, caching_enabled=self._caching_enabled)
    return file

as_image_file

as_image_file() -> ImageFile

Convert the file to a ImageFile object.

Source code in datachain/lib/file.py
def as_image_file(self) -> "ImageFile":
    """Convert the file to a `ImageFile` object."""
    if isinstance(self, ImageFile):
        return self
    file = ImageFile(**self.model_dump())
    file._set_stream(self._catalog, caching_enabled=self._caching_enabled)
    return file

as_text_file

as_text_file() -> TextFile

Convert the file to a TextFile object.

Source code in datachain/lib/file.py
def as_text_file(self) -> "TextFile":
    """Convert the file to a `TextFile` object."""
    if isinstance(self, TextFile):
        return self
    file = TextFile(**self.model_dump())
    file._set_stream(self._catalog, caching_enabled=self._caching_enabled)
    return file

as_video_file

as_video_file() -> VideoFile

Convert the file to a VideoFile object.

Source code in datachain/lib/file.py
def as_video_file(self) -> "VideoFile":
    """Convert the file to a `VideoFile` object."""
    if isinstance(self, VideoFile):
        return self
    file = VideoFile(**self.model_dump())
    file._set_stream(self._catalog, caching_enabled=self._caching_enabled)
    return file

at classmethod

at(
    uri: str | PathLike[str], session: Session | None = None
) -> Self

Construct a File from a full URI in one call.

Parameters:

  • uri (str | PathLike[str]) –

    Full URI or path to the file (e.g. s3://bucket/path/to/file.png or /local/path).

  • session (Session | None, default: None ) –

    Optional session instance. If None, the current session is used.

Returns:

  • File ( Self ) –

    A new File object pointing to the given URI.

Example
file = File.at("s3://bucket/path/to/output.png")
with file.open("wb") as f: ...
Source code in datachain/lib/file.py
@classmethod
def at(
    cls, uri: str | os.PathLike[str], session: "Session | None" = None
) -> "Self":
    """Construct a File from a full URI in one call.

    Args:
        uri: Full URI or path to the file
            (e.g. ``s3://bucket/path/to/file.png`` or ``/local/path``).
        session: Optional session instance. If None, the current session is used.

    Returns:
        File: A new File object pointing to the given URI.

    Example:
        ```py
        file = File.at("s3://bucket/path/to/output.png")
        with file.open("wb") as f: ...
        ```
    """
    from datachain.client.fsspec import Client
    from datachain.query.session import Session

    if session is None:
        session = Session.get()
    catalog = session.catalog
    uri_str = stringify_path(uri)
    if uri_str.endswith(("/", os.sep)):
        raise ValueError(
            f"File.at directory URL/path given (trailing slash), got: {uri_str}"
        )
    client_cls = Client.get_implementation(uri_str)
    source, rel_path = client_cls.split_url(uri_str)
    source_uri = client_cls.storage_uri(source)
    file = cls(source=source_uri, path=rel_path)
    file._set_stream(catalog)
    return file

ensure_cached

ensure_cached() -> None

Download the file to the local cache.

get_local_path can be used to return the path to the cached copy on disk. This is useful when you need to pass the file to code that expects a local filesystem path (e.g. ffmpeg, opencv, pandas, etc).

Example
file.ensure_cached()
local_path = file.get_local_path()
df = pandas.read_csv(local_path)
Source code in datachain/lib/file.py
def ensure_cached(self) -> None:
    """Download the file to the local cache.

    `get_local_path` can be used to return the path to the cached copy on disk.
    This is useful when you need to pass the file to code that expects a local
    filesystem path (e.g. ``ffmpeg``, ``opencv``, ``pandas``, etc).

    Example:
        ```py
        file.ensure_cached()
        local_path = file.get_local_path()
        df = pandas.read_csv(local_path)
        ```
    """
    if self._catalog is None:
        raise RuntimeError(
            "cannot download file to cache because catalog is not setup"
        )
    client = self._catalog.get_client(self.source)
    client.download(self, callback=self._download_cb)

export

export(
    output: str | PathLike[str],
    placement: ExportPlacement = "fullpath",
    use_cache: bool = True,
    link_type: Literal["copy", "symlink"] = "copy",
    client_config: dict | None = None,
) -> None

Copy or link this file into an output directory.

Parameters:

  • output (str | PathLike[str]) –

    Destination directory. Accepts a local OS path, a cloud prefix fsspec URI (s3://…, gs://…, az://…).

  • placement (ExportPlacement, default: 'fullpath' ) –

    How to build the path under output:

    • "fullpath" (default) β€” output/bucket/dir/file.txt
    • "filepath" β€” output/dir/file.txt
    • "filename" β€” output/file.txt
    • "etag" β€” output/<etag>.txt
  • use_cache (bool, default: True ) –

    If True, download to local cache first. Also required for symlinking remote files.

  • link_type (Literal['copy', 'symlink'], default: 'copy' ) –

    "copy" (default) or "symlink". Symlink falls back to copy for virtual files and for remote files when use_cache is False.

  • client_config (dict | None, default: None ) –

    Extra kwargs forwarded to the storage client.

Example
# flat export by filename
f.export("./export", placement="filename")

# export to a cloud prefix
f.export("s3://output-bucket/results", placement="filepath")

# pass storage credentials via client_config
f.export("s3://bucket/out", client_config={"aws_access_key_id": "…"})

# symlink from local cache (avoids re-downloading)
f.export("./local_out", use_cache=True, link_type="symlink")
Source code in datachain/lib/file.py
def export(
    self,
    output: str | os.PathLike[str],
    placement: ExportPlacement = "fullpath",
    use_cache: bool = True,
    link_type: Literal["copy", "symlink"] = "copy",
    client_config: dict | None = None,
) -> None:
    """Copy or link this file into an output directory.

    Args:
        output: Destination directory.  Accepts a local OS path, a cloud
            prefix fsspec URI (``s3://…``, ``gs://…``, ``az://…``).
        placement: How to build the path under *output*:

            - ``"fullpath"`` (default) β€” ``output/bucket/dir/file.txt``
            - ``"filepath"`` β€” ``output/dir/file.txt``
            - ``"filename"`` β€” ``output/file.txt``
            - ``"etag"`` β€” ``output/<etag>.txt``
        use_cache: If True, download to local cache first.  Also
            required for symlinking remote files.
        link_type: ``"copy"`` (default) or ``"symlink"``.
            Symlink falls back to copy for virtual files and for
            remote files when *use_cache* is False.
        client_config: Extra kwargs forwarded to the storage client.

    Example:
        ```py
        # flat export by filename
        f.export("./export", placement="filename")

        # export to a cloud prefix
        f.export("s3://output-bucket/results", placement="filepath")

        # pass storage credentials via client_config
        f.export("s3://bucket/out", client_config={"aws_access_key_id": "…"})

        # symlink from local cache (avoids re-downloading)
        f.export("./local_out", use_cache=True, link_type="symlink")
        ```
    """
    if self._catalog is None:
        raise RuntimeError("Cannot export file: catalog is not set")

    self._caching_enabled = use_cache

    suffix = self._get_destination_suffix(placement)
    output_str = stringify_path(output)
    client = self._catalog.get_client(output_str, **(client_config or {}))

    # Normalization and traversal safety: for local exports, resolve to absolute
    # and validate the suffix. Cloud exports skip this β€” the cloud client already
    # rejects path-traversal characters in object keys.
    if client.PREFIX == "file://":
        from datachain.fs.utils import is_subpath

        output_os = client.fs._strip_protocol(output_str)
        # On Windows, normalize backslash separators to forward slashes
        # so posixpath.join produces consistent paths.  On Linux/macOS
        # backslash is a legal filename character and must not be replaced.
        output_abs = os.path.abspath(output_os)
        if os.name == "nt":
            output_abs = output_abs.replace("\\", "/")
        dst = posixpath.join(output_abs, suffix)

        try:
            client.validate_file_path(suffix)
        except ValueError as exc:
            raise FileError(str(exc), stringify_path(output), suffix) from None

        if not is_subpath(output_abs, dst):
            raise FileError(
                "destination is not within output directory",
                stringify_path(output),
                dst,
            )
    else:
        # For cloud exports, simply join the output prefix and suffix.
        # Relying on cloud clients to do their own validation.
        dst = posixpath.join(output_str, suffix)

    client.fs.makedirs(posixpath.dirname(dst), exist_ok=True)

    if link_type == "symlink":
        try:
            return self._symlink_to(dst)
        except OSError as exc:
            if exc.errno not in (errno.ENOTSUP, errno.EXDEV, errno.ENOSYS):
                raise

    self.save(dst, client_config=client_config)

get_fs_path

get_fs_path() -> str

Combine source and path into the full location string.

For cloud backends the result is a full URI-like (s3://…, gs://…). For local files the result is a bare OS path (no file:// prefix).

Examples:

# Cloud (S3, GCS, Azure, …)
f = File(source="s3://my-bucket", path="data/image.jpg")
f.get_fs_path()  # 's3://my-bucket/data/image.jpg'

# Local files
f = File(source="file:///home/user/project", path="out/result.csv")
f.get_fs_path()  # '/home/user/project/out/result.csv'

# No source β€” returns the relative path as-is
f = File(source="", path="dir/file.txt")
f.get_fs_path()  # 'dir/file.txt'

Raises:

  • FileError –

    If path is empty, ends with /, or contains . / .. segments. For local files, also rejects absolute paths, drive letters, and empty segments.

Source code in datachain/lib/file.py
def get_fs_path(self) -> str:
    """Combine ``source`` and ``path`` into the full location string.

    For cloud backends the result is a full URI-like (``s3://…``, ``gs://…``).
    For local files the result is a **bare OS path** (no ``file://``
    prefix).

    Examples:
        ```py
        # Cloud (S3, GCS, Azure, …)
        f = File(source="s3://my-bucket", path="data/image.jpg")
        f.get_fs_path()  # 's3://my-bucket/data/image.jpg'

        # Local files
        f = File(source="file:///home/user/project", path="out/result.csv")
        f.get_fs_path()  # '/home/user/project/out/result.csv'

        # No source β€” returns the relative path as-is
        f = File(source="", path="dir/file.txt")
        f.get_fs_path()  # 'dir/file.txt'
        ```

    Raises:
        FileError: If ``path`` is empty, ends with ``/``, or contains
            ``.`` / ``..`` segments.  For local files, also rejects
            absolute paths, drive letters, and empty segments.
    """
    from datachain.client.fsspec import Client

    client_cls = Client.get_implementation(self.source or "")
    try:
        client_cls.validate_file_path(self.path)
        client_cls.validate_source(self.source)
    except ValueError as exc:
        raise FileError(str(exc), self.source, self.path) from exc
    path = self.path

    if not self.source:
        return path

    if self.source.startswith("file://"):
        base_path = client_cls.FS_CLASS._strip_protocol(self.source)
        if not path:
            return base_path
        return Path(base_path, *PurePosixPath(path).parts).as_posix()

    # Cloud: build a full URI.
    name = client_cls.FS_CLASS._strip_protocol(self.source)
    base = str(client_cls.storage_uri(name))
    if base.endswith("/"):
        return f"{base}{path}"
    return f"{base}/{path}"

get_local_path

get_local_path() -> str | None

Return path to a file in a local cache.

Returns None if file is not cached. Raises an exception if cache is not setup.

Source code in datachain/lib/file.py
def get_local_path(self) -> str | None:
    """Return path to a file in a local cache.

    Returns None if file is not cached.
    Raises an exception if cache is not setup.
    """
    if self._catalog is None:
        raise RuntimeError(
            "cannot resolve local file path because catalog is not setup"
        )
    return self._catalog.cache.get_path(self)

get_uri

get_uri() -> str

Deprecated: Return a URI-like string for this file.

Source code in datachain/lib/file.py
def get_uri(self) -> str:
    """Deprecated: Return a URI-like string for this file."""
    warnings.warn(
        (
            "File.get_uri() is deprecated and will be removed in a future version. "
            "Use file.source + file.path or "
            "file.get_fs_path() for I/O locators."
        ),
        DeprecationWarning,
        stacklevel=2,
    )

    if not self.source:
        return self.get_fs_path()

    # For cloud, get_fs_path returns a full URI; for local, a bare path.
    # Wrap in path_to_fsspec_uri so we always return a URI.
    return path_to_fsspec_uri(self.get_fs_path())

open

open(
    mode: str = "rb",
    *,
    client_config: dict[str, Any] | None = None,
    **open_kwargs
) -> Iterator[Any]

Open the file and return a file-like object.

Supports both read ("rb", "r") and write modes (e.g. "wb", "w", "ab"). When opened in a write mode, metadata is refreshed after closing.

Source code in datachain/lib/file.py
@contextmanager
def open(
    self,
    mode: str = "rb",
    *,
    client_config: dict[str, Any] | None = None,
    **open_kwargs,
) -> Iterator[Any]:
    """Open the file and return a file-like object.

    Supports both read ("rb", "r") and write modes (e.g. "wb", "w", "ab").
    When opened in a write mode, metadata is refreshed after closing.
    """
    writing = any(ch in mode for ch in "wax+")
    if self.location and writing:
        raise VFileError(
            "Writing to virtual file is not supported",
            self.source,
            self.path,
        )

    if self._catalog is None:
        raise RuntimeError("Cannot open file: catalog is not set")

    base_cfg = getattr(self._catalog, "client_config", {}) or {}
    merged_cfg = {**base_cfg, **(client_config or {})}
    client: Client = self._catalog.get_client(self.source, **merged_cfg)

    if not writing:
        if self.location:
            with VFileRegistry.open(self, self.location) as f:  # type: ignore[arg-type]
                with self._wrap_text(f, mode, open_kwargs=open_kwargs) as wrapped:
                    yield wrapped
            return
        if self._caching_enabled:
            self.ensure_cached()
        with client.open_object(
            self, use_cache=self._caching_enabled, cb=self._download_cb
        ) as f:
            with self._wrap_text(f, mode, open_kwargs=open_kwargs) as wrapped:
                yield wrapped
        return

    # write path
    full_path = self.get_fs_path()
    fs_mode = mode if "b" in mode else f"{mode}b"
    with client.fs.open(full_path, fs_mode, **open_kwargs) as raw_handle:
        with self._wrap_text(
            raw_handle,
            mode,
            open_kwargs=open_kwargs,
        ) as wrapped:
            yield wrapped

    version_hint = self._extract_write_version(raw_handle)

    # refresh metadata pinned to the version that was just written
    refreshed = client.get_file_info(self.path, version_id=version_hint)
    for k, v in refreshed.model_dump().items():
        setattr(self, k, v)

read

read(length: int = -1)

Returns file contents.

Source code in datachain/lib/file.py
def read(self, length: int = -1):
    """Returns file contents."""
    return self.read_bytes(length)

read_bytes

read_bytes(length: int = -1)

Returns file contents as bytes.

Source code in datachain/lib/file.py
def read_bytes(self, length: int = -1):
    """Returns file contents as bytes."""
    with self.open() as stream:
        return stream.read(length)

read_text

read_text(**open_kwargs)

Return file contents decoded as text.

**open_kwargs : Any Extra keyword arguments forwarded to open(mode="r", ...) (e.g. encoding="utf-8", errors="ignore")

Source code in datachain/lib/file.py
def read_text(self, **open_kwargs):
    """Return file contents decoded as text.

    **open_kwargs : Any
        Extra keyword arguments forwarded to ``open(mode="r", ...)``
        (e.g. ``encoding="utf-8"``, ``errors="ignore"``)
    """
    if self.location:
        raise VFileError(
            "Reading text from virtual file is not supported",
            self.source,
            self.path,
        )

    with self.open(mode="r", **open_kwargs) as stream:
        return stream.read()

rebase

rebase(
    old_base: str,
    new_base: str,
    suffix: str = "",
    extension: str = "",
) -> str

Rebase the file's URI from one base directory to another.

Parameters:

  • old_base (str) –

    Base directory to remove from the file's URI

  • new_base (str) –

    New base directory to prepend

  • suffix (str, default: '' ) –

    Optional suffix to add before file extension

  • extension (str, default: '' ) –

    Optional new file extension (without dot)

Returns:

  • str ( str ) –

    Rebased URI with new base directory

Raises:

  • ValueError –

    If old_base is not found in the file's URI

Examples:

file = File(source="s3://bucket", path="data/2025-05-27/file.wav")
file.rebase("s3://bucket/data", "s3://output-bucket/processed",
            extension="mp3")
# 's3://output-bucket/processed/2025-05-27/file.mp3'

file.rebase("data/audio", "/local/output", suffix="_ch1",
            extension="npy")
# '/local/output/file_ch1.npy'
Source code in datachain/lib/file.py
def rebase(
    self,
    old_base: str,
    new_base: str,
    suffix: str = "",
    extension: str = "",
) -> str:
    """
    Rebase the file's URI from one base directory to another.

    Args:
        old_base: Base directory to remove from the file's URI
        new_base: New base directory to prepend
        suffix: Optional suffix to add before file extension
        extension: Optional new file extension (without dot)

    Returns:
        str: Rebased URI with new base directory

    Raises:
        ValueError: If old_base is not found in the file's URI

    Examples:
        ```py
        file = File(source="s3://bucket", path="data/2025-05-27/file.wav")
        file.rebase("s3://bucket/data", "s3://output-bucket/processed",
                    extension="mp3")
        # 's3://output-bucket/processed/2025-05-27/file.mp3'

        file.rebase("data/audio", "/local/output", suffix="_ch1",
                    extension="npy")
        # '/local/output/file_ch1.npy'
        ```
    """
    return rebase_path(self.get_fs_path(), old_base, new_base, suffix, extension)

resolve

resolve() -> Self

Resolve a File object by checking its existence and updating its metadata.

Returns:

  • File ( Self ) –

    The resolved File object with updated metadata.

Source code in datachain/lib/file.py
def resolve(self) -> "Self":
    """
    Resolve a File object by checking its existence and updating its metadata.

    Returns:
        File: The resolved File object with updated metadata.
    """
    if self._catalog is None:
        raise RuntimeError("Cannot resolve file: catalog is not set")

    if self.location:
        raise VFileError(
            "Resolving a virtual file is not supported",
            self.source,
            self.path,
        )

    try:
        client = self._catalog.get_client(self.source)
    except NotImplementedError as e:
        raise RuntimeError(
            f"Unsupported protocol for file source: {self.source}"
        ) from e

    try:
        converted_info = client.get_file_info(self.path)
        # get_file_info always returns a base File; re-wrap as type(self)
        # so resolve() preserves the subclass (TextFile, ImageFile, …).
        res = type(self)(**converted_info.model_dump())
        res._set_stream(self._catalog)
        return res
    except (FileNotFoundError, PermissionError, OSError) as e:
        logger.warning(
            "Error when resolving %s/%s: %s",
            self.source,
            self.path,
            str(e),
        )

    res = type(self)(
        path=self.path,
        source=self.source,
        size=0,
        etag="",
        version="",
        is_latest=True,
        last_modified=TIME_ZERO,
    )
    res._set_stream(self._catalog)
    return res

save

save(destination: str, client_config: dict | None = None)

Write file contents to destination.

Parameters:

  • destination (str) –

    Target path or URI. Accepts a local OS path, a cloud URI (s3://…, gs://…, az://…), or an unencoded file:// URI as produced by :func:~datachain.fs.utils.path_to_fsspec_uri. Do not pass Path.as_uri() output β€” that produces RFC percent-encoded URIs (e.g. file:///my%20dir) which fsspec does not decode, causing FileNotFoundError for paths containing spaces, #, or %.

  • client_config (dict | None, default: None ) –

    Optional extra kwargs forwarded to the storage client (e.g. credentials, endpoint URL).

Example
file.save("/local/output/result.bin")
file.save("s3://my-bucket/output/result.bin")
file.save("~/output/result.bin")
Source code in datachain/lib/file.py
def save(self, destination: str, client_config: dict | None = None):
    """Write file contents to *destination*.

    Args:
        destination: Target path or URI.  Accepts a local OS path, a
            cloud URI (``s3://…``, ``gs://…``, ``az://…``), or an
            unencoded ``file://`` URI as produced by
            :func:`~datachain.fs.utils.path_to_fsspec_uri`.
            Do **not** pass ``Path.as_uri()`` output β€” that produces
            RFC percent-encoded URIs (e.g. ``file:///my%20dir``) which
            fsspec does not decode, causing ``FileNotFoundError`` for
            paths containing spaces, ``#``, or ``%``.
        client_config: Optional extra kwargs forwarded to the storage
            client (e.g. credentials, endpoint URL).

    Example:
        ```py
        file.save("/local/output/result.bin")
        file.save("s3://my-bucket/output/result.bin")
        file.save("~/output/result.bin")
        ```
    """
    if self._catalog is None:
        raise RuntimeError("Cannot save file: catalog is not set")

    destination = stringify_path(destination)
    client: Client = self._catalog.get_client(destination, **(client_config or {}))
    client.upload(self.read(), path_to_fsspec_uri(destination))

upload classmethod

upload(
    data: bytes,
    path: str | PathLike[str],
    catalog: Catalog | None = None,
) -> Self

Upload bytes to a storage path and return a File pointing to it.

Parameters:

  • data (bytes) –

    The raw bytes to upload.

  • path (str | PathLike[str]) –

    Destination path (local or remote, e.g. s3://bucket/file.txt).

  • catalog (Catalog | None, default: None ) –

    Optional catalog instance. If None, the current session catalog is used.

Returns:

  • File ( Self ) –

    A new File object with metadata populated from the upload.

Example
file = File.upload(b"hello world", "s3://bucket/hello.txt")
Note

To write data as a stream, use File.at with open instead:

file = File.at("s3://bucket/output.txt")
with file.open("wb") as f:
    f.write(b"hello world")

Source code in datachain/lib/file.py
@classmethod
def upload(
    cls,
    data: bytes,
    path: str | os.PathLike[str],
    catalog: "Catalog | None" = None,
) -> "Self":
    """Upload bytes to a storage path and return a File pointing to it.

    Args:
        data: The raw bytes to upload.
        path: Destination path (local or remote, e.g. ``s3://bucket/file.txt``).
        catalog: Optional catalog instance. If None, the current session
            catalog is used.

    Returns:
        File: A new File object with metadata populated from the upload.

    Example:
        ```py
        file = File.upload(b"hello world", "s3://bucket/hello.txt")
        ```

    Note:
        To write data as a stream, use `File.at` with `open` instead:
        ```py
        file = File.at("s3://bucket/output.txt")
        with file.open("wb") as f:
            f.write(b"hello world")
        ```
    """
    if catalog is None:
        from datachain.query.session import Session

        catalog = Session.get().catalog
    from datachain.client.fsspec import Client

    path_str = stringify_path(path)

    client_cls = Client.get_implementation(path_str)
    source, rel_path = client_cls.split_url(path_str)

    client = catalog.get_client(client_cls.storage_uri(source))
    file = client.upload(data, rel_path)
    if not isinstance(file, cls):
        file = cls(**file.model_dump())
    file._set_stream(catalog)
    return file

FileError

FileError(message: str, source: str, path: str)

Bases: DataChainError

Source code in datachain/lib/file.py
def __init__(self, message: str, source: str, path: str):
    self.message = message
    self.source = source
    self.path = path
    super().__init__(f"Error in file '{source}/{path}': {message}")

TarVFile

Bases: VFile

Virtual file model for files extracted from tar archives.

open classmethod

open(file: File, location: list[dict])

Stream file from tar archive based on location in archive.

Source code in datachain/lib/file.py
@classmethod
def open(cls, file: "File", location: list[dict]):
    """Stream file from tar archive based on location in archive."""
    tar_file = cls.parent(file, location)

    loc = location[0]

    if (offset := loc.get("offset", None)) is None:
        raise VFileError("'offset' is not specified", file.source, file.path)

    if (size := loc.get("size", None)) is None:
        raise VFileError("'size' is not specified", file.source, file.path)

    client = file._catalog.get_client(tar_file.source)
    fd = client.open_object(tar_file, use_cache=file._caching_enabled)
    return FileSlice(fd, offset, size, file.name)