# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
# Source: Adapted from earthai-tech/gofast/utils/io_utils (https://github.com/earthai-tech/gofast)
# This module is included in the FusionLab package, with modifications
# to fit FusionLab’s I/O conventions.
"""
Input/Output utilities for managing file paths, directories, and
loading serialized data within FusionLab. Provides error-checked
deserialization, directory management, and archive handling
(e.g., .tgz, .zip), streamlining file operations and data recovery.
Adapted for FusionLab from the original fusionlab.utils.io_utils.
"""
import os
import re
import shutil
import pickle
import joblib
import datetime
import warnings
import copy
import csv
import json
import yaml
import h5py
import tarfile
from pathlib import Path
from pprint import pformat
from typing import Optional, Union, Any, Tuple, List, Dict, Text
from six.moves import urllib
from zipfile import ZipFile
from tqdm import tqdm
import numpy as np
import pandas as pd
from .._fusionlog import fusionlog
from ..api.property import BaseClass
from ..compat.sklearn import validate_params
from ..core.array_manager import to_numeric_dtypes
from ..core.checks import (
exist_features, check_files, is_in_if, str2columns
)
from ..core.io import EnsureFileExists
from ..core.utils import is_iterable, smart_format
from ..decorators import RunReturn, smartFitRun
from .validator import check_is_runned, is_frame
from ._dependency import import_optional_dependency
logger = fusionlog.get_fusionlab_logger(__name__)
__all__ = [
'FileManager',
'cpath',
'deserialize_data',
'extract_tar_with_progress',
'fetch_tgz_from_url',
'fetch_tgz_locally',
'dummy_csv_translator',
'fetch_json_data_from_url',
'get_config_fname_from_varname',
'get_valid_key',
'key_checker',
'key_search',
'load_serialized_data',
'load_csv',
'move_cfile',
'parse_csv',
'parse_json',
'parse_md',
'parse_yaml',
'print_cmsg',
'rename_files',
'sanitize_unicode_string',
'save_job',
'save_path',
'serialize_data',
'serialize_data_in',
'spath',
'store_or_write_hdf5',
'to_hdf5',
'zip_extractor',
'fetch_joblib_data',
'to_txt',
]
@smartFitRun
class FileManager(BaseClass):
r"""
A class for managing and organizing files within a directory
structure. This class provides methods to filter, organize,
and rename files in bulk based on file extensions and name
patterns. All operations are executed via the ``run`` method to
ensure proper initialization and state management.
Mathematically, if :math:`\mathcal{F}` represents the set of files
in the root directory and :math:`\phi(f)` is a filtering function
that selects files based on file type and name pattern, then the
FileManager produces a subset
.. math::
\mathcal{F}' = \{ f \in \mathcal{F} \mid \phi(f) \}
and performs operations such as moving or copying to reorganize
these files into a target directory.
Parameters
----------
root_dir : str
The root directory containing the files to be managed.
This directory must exist and contain the files subject to
filtering.
target_dir : str
The directory where the organized files will be placed.
If necessary, this directory can be created when
``create_dirs`` is True.
file_types : list of str, optional
A list of file extensions (e.g., ``['.csv', '.json']``) used to
filter the files. If ``None``, no file type filtering is applied.
name_patterns : list of str, optional
A list of substrings (e.g., ``['2023', 'report']``) to filter
file names. If ``None``, all file names are included.
move : bool, optional
If True, files are moved from the source to the target directory;
otherwise, they are copied. Default is False.
overwrite : bool, optional
If True, existing files in the target directory will be overwritten.
If False, existing files are skipped. Default is False.
create_dirs : bool, optional
If True, missing directories in the target path are created.
Default is False.
Attributes
----------
root_dir_ : str
The validated root directory from which files are managed.
target_dir_ : str
The directory where the processed files are stored.
Methods
-------
run(pattern, replacement)
Executes the file organization process. It filters files using
the criteria provided at initialization and, if a `pattern` and
corresponding `replacement` are given, performs bulk renaming.
get_processed_files()
Returns a list of file paths that have been processed and
organized into the target directory.
Examples
--------
>>> from fusionlab.utils.io_utils import FileManager
>>> manager = FileManager(
... root_dir='data/raw',
... target_dir='data/processed',
... file_types=['.csv', '.json'],
... name_patterns=['2023', 'report'],
... move=True,
... overwrite=True,
... create_dirs=True
... )
>>> manager.run(pattern='old', replacement='new')
>>> processed = manager.get_processed_files()
>>> print(processed)
Notes
-----
The public method ``run`` orchestrates the file management
operations by first calling the internal method
:meth:`_organize_files` to filter and move or copy files from the
source directory to the target directory. If renaming is needed,
:meth:`_rename_files` is invoked with the specified `pattern` and
`replacement`. The method :meth:`get_processed_files` compiles a
list of all files that have been organized, based on a walk of the
target directory.
See Also
--------
shutil.move : To move files between directories.
shutil.copy2: To copy files while preserving file metadata.
References
----------
.. [1] Python Software Foundation. "os.walk — Directory tree
generator". Python Documentation.
.. [2] Python Software Foundation. "shutil — High-level file
operations". Python Documentation.
"""
@validate_params({
'root_dir' : [str],
'target_dir' : [str],
'file_types' : [list, None],
'name_patterns': [list, None],
'move' : [bool],
'overwrite' : [bool],
'create_dirs' : [bool]
})
def __init__(
self,
root_dir: str,
target_dir: str,
file_types: Optional[List[str]] = None,
name_patterns: Optional[List[str]] = None,
move: bool = False,
overwrite: bool = False,
create_dirs: bool = False
):
# Assign parameters to instance attributes.
self.root_dir = root_dir
self.target_dir = target_dir
self.file_types = file_types
self.name_patterns = name_patterns
self.move = move
self.overwrite = overwrite
self.create_dirs = create_dirs
# Validate that the root directory exists.
if not os.path.isdir(self.root_dir):
raise ValueError(
f"Root directory '{self.root_dir}' does not exist."
)
# If requested, create the target directory.
if self.create_dirs and not os.path.exists(self.target_dir):
os.makedirs(self.target_dir, exist_ok=True)
logger.debug(
f"Initialized FileManager with root_dir: {self.root_dir}, "
f"target_dir: {self.target_dir}"
)
@RunReturn
def run(
self,
pattern: Optional[str] = None,
replacement: Optional[str] = None
):
r"""
Executes file organization operations.
This method filters files based on the specified file types
and name patterns, then organizes them by moving or copying
into the target directory. Additionally, if a `pattern` is
provided, file names containing that pattern are renamed by
replacing the pattern with the specified `replacement`.
Parameters
----------
pattern : str, optional
The substring to search for in file names. If provided,
file names containing this pattern will be renamed.
replacement : str, optional
The string to replace `pattern` with in file names.
Required if `pattern` is specified.
Returns
-------
self : FileManager
The instance itself after executing operations.
Examples
--------
>>> manager = FileManager(...)
>>> manager.run(pattern='old', replacement='new')
"""
# Organize (move/copy) filtered files from root to target.
self._organize_files()
# If a renaming pattern is provided, execute renaming.
if pattern is not None:
if replacement is None:
raise ValueError(
"Replacement string must be provided if pattern is specified."
)
self._rename_files(pattern, replacement)
# Mark the manager as having run its operations.
self._is_runned = True
def get_processed_files(self) -> List[str]:
r"""
Retrieves a list of processed files in the target directory.
Returns
-------
files : list of str
A list containing the full paths of the files that have been
organized into the target directory.
Examples
--------
>>> manager = FileManager(...)
>>> manager.run()
>>> files = manager.get_processed_files()
>>> print(files)
"""
# Ensure the run method has been executed.
check_is_runned(self, attributes=['_is_runned'])
processed_files = []
# Walk the target directory and accumulate file paths.
for dirpath, _, filenames in os.walk(self.target_dir):
for filename in filenames:
processed_files.append(
os.path.join(dirpath, filename)
)
return processed_files
def _organize_files(self):
r"""Private method to filter and process files from root_dir."""
try:
# Retrieve the list of files that match filtering criteria.
files = self._get_filtered_files()
# Process each file individually.
for file_path in files:
self._handle_file(file_path)
except Exception as e:
logger.error(f"Failed to organize files: {str(e)}")
raise RuntimeError(
f"Organizing files failed: {str(e)}"
) from e
def _rename_files(self, pattern: str, replacement: str):
r"""Private method to rename files in target_dir by pattern."""
try:
# Traverse target_dir and rename files matching the pattern.
for dirpath, _, filenames in os.walk(self.target_dir):
for filename in filenames:
if pattern in filename:
old_path = os.path.join(dirpath, filename)
new_filename = filename.replace(pattern, replacement)
new_path = os.path.join(dirpath, new_filename)
# Skip renaming if file exists and overwrite is False.
if os.path.exists(new_path) and not self.overwrite:
logger.info(
f"File {new_path} already exists; skipping."
)
continue
os.rename(old_path, new_path)
logger.debug(
f"Renamed {old_path} to {new_path}"
)
except Exception as e:
logger.error(f"Failed to rename files: {str(e)}")
raise RuntimeError(
f"Renaming files failed: {str(e)}"
) from e
def _get_filtered_files(self):
r"""Retrieve files from root_dir filtered by file_types
and name_patterns."""
matched_files = []
# Walk through root_dir recursively.
for dirpath, _, filenames in os.walk(self.root_dir):
for filename in filenames:
# Filter by file extension if file_types specified.
if self.file_types and not any(
filename.endswith(ext) for ext in self.file_types
):
continue
# Filter by name patterns if provided.
if self.name_patterns and not any(
pat in filename for pat in self.name_patterns
):
continue
matched_files.append(
os.path.join(dirpath, filename)
)
return matched_files
def _handle_file(self, file_path):
r"""Handle moving or copying a single file from root_dir
to target_dir."""
# Compute the relative path from the root directory.
relative_path = os.path.relpath(file_path, self.root_dir)
target_path = os.path.join(self.target_dir, relative_path)
# Ensure that the target directory exists.
target_dir = os.path.dirname(target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
# If the file exists in the target and overwrite is disabled, skip it.
if os.path.exists(target_path) and not self.overwrite:
logger.info(
f"File {target_path} already exists; skipping."
)
return
# Depending on the 'move' flag, either move or copy the file.
if self.move:
shutil.move(file_path, target_path)
logger.debug(
f"Moved {file_path} to {target_path}"
)
else:
shutil.copy2(file_path, target_path)
logger.debug(
f"Copied {file_path} to {target_path}"
)
def zip_extractor(
zip_file,
samples: Union[int, str] = '*',
ftype: Optional[str] = None,
savepath: Optional[str] = None,
pwd: Optional[Union[str, bytes]] = None
) -> list:
r"""
Extracts files from a ZIP archive based on various filtering
criteria and saves them to a specified directory.
The extraction process can be controlled by the ``samples``
parameter to limit the number of files extracted, or by the
``ftype`` parameter to filter by a specific file extension.
The resulting file names are returned as a list.
.. math::
\text{Extracted Files} = \{ f \in \mathcal{A} \mid \phi(f) \}
where :math:`\mathcal{A}` is the set of all files in the archive,
and :math:`\phi(f)` is a predicate that checks if a file matches
the desired extension and is within the specified sample count.
Parameters
----------
zip_file : str
Full path to the ZIP archive file.
samples : int or str, optional
Number of files to extract. If set to ``'*'``, all files are
extracted. Default is ``'*'``.
ftype : str, optional
File extension filter (e.g., ``'.csv'``). Only files with this
extension are extracted. If no matching files are found,
a ValueError is raised.
savepath : str, optional
Directory where the extracted files will be stored. If not
provided, files are extracted to the current working directory.
pwd : str or bytes, optional
Password for encrypted ZIP files. If provided as a string,
it will be used as is (or can be encoded to bytes as needed).
Returns
-------
list of str
A list of extracted file names (with paths).
Examples
--------
>>> from fusionlab.utils.io_utils import zip_extractor
>>> extracted_files = zip_extractor(
... 'data/archive.zip',
... samples='*',
... ftype='.csv',
... savepath='data/extracted',
... pwd='secret'
... )
>>> print(extracted_files)
['folder1/file1.csv', 'folder2/file2.csv', ...]
Notes
-----
The function first validates the input ZIP file using
``check_files`` (assumed to be defined in the package). It then
determines the sample count and filters files by extension if
``ftype`` is provided. Extraction is done via the standard
``ZipFile.extract`` or ``ZipFile.extractall`` methods.
See Also
--------
zipfile.ZipFile.extract : Extract a single file from a ZIP archive.
zipfile.ZipFile.extractall : Extract all files from a ZIP archive.
"""
def raise_msg_when(objn: list, ft: str) -> list:
"""
Filters the list of file names to only include those that end
with the given file extension. Raises a ValueError if no such
file is found.
Parameters
----------
objn : list of str
List of file names from the ZIP archive.
ft : str
The file extension filter (e.g., ``'.csv'``).
Returns
-------
list of str
Filtered list of file names that match the file extension.
"""
# Filter file names based on the extension.
filtered = [o for o in objn if o.endswith(ft)]
if len(filtered) == 0:
# Determine available extensions.
available_ext = [s.split('.')[-1] for s in objn if '.' in s]
available_str = (
f"{available_ext}" if available_ext else "None"
)
raise ValueError(
f"No files in the archive match {ft!r}. "
f"Available file types: {available_str}"
)
return filtered
# Validate the ZIP file using check_files (assumed to be defined).
zip_file = check_files(zip_file, formats='.zip', return_valid=True)
# Convert samples parameter to an integer if not '*'.
if isinstance(samples, str):
if samples != '*':
try:
samples = int(samples)
except Exception:
raise ValueError(
"Parameter `samples` must be an integer or '*'"
)
elif not isinstance(samples, int):
raise TypeError(
"Parameter `samples` must be int or str,"
f" not {type(samples).__name__!r}"
)
# Open the ZIP file for extraction.
with ZipFile(zip_file, 'r') as zip_obj:
# Get list of all file names in the archive.
objnames = zip_obj.namelist()
# Determine the sample count.
if samples == '*':
samples = len(objnames)
# If file type filter is specified, filter file names.
if ftype is not None:
objnames = raise_msg_when(objn=objnames, ft=ftype)
# Extract all files if samples exceed available files and
# no file type filter is provided.
if samples >= len(objnames) and ftype is None:
zip_obj.extractall(path=savepath, pwd=pwd)
else:
# Extract only the first `samples` files.
for zf in objnames[:samples]:
zip_obj.extract(zf, path=savepath, pwd=pwd)
return objnames
def to_hdf5(
data,
fn: str,
objname: Optional[str] = None,
close: bool = True,
**hdf5_kws
) -> Any:
r"""
Store a data object in Hierarchical Data Format 5 (HDF5).
This function serializes the input ``data`` into an HDF5 file. It
supports both pandas DataFrames and NumPy arrays. If ``data`` is a
DataFrame, it uses ``pd.HDFStore`` (which requires the ``pytables``
package) to store the data. If ``data`` is a NumPy array, it uses
``h5py.File`` to create a dataset.
The file path is constructed by concatenating the specified
``savepath`` (or the current working directory if ``savepath`` is
not provided) with the provided filename (``fn``). The function
automatically appends the appropriate file extension: ``.h5`` for
DataFrames and ``.hdf5`` for arrays.
.. math::
\text{filepath} = \text{savepath} \oplus \text{filename} \oplus
\text{extension}
where :math:`\oplus` denotes string concatenation.
Parameters
----------
data : Any
The data object to be stored. Must be either a NumPy array or a
pandas DataFrame.
fn : str
The file path (without extension) where the HDF5 file will be
saved.
objname : str, optional
The name under which to store the data within the HDF5 file.
Defaults to ``'data'`` if not provided.
close : bool, default=True
If ``True``, the file is closed after writing. If ``False``,
the file remains open for additional modifications.
**hdf5_kws : dict, optional
Additional keyword arguments to pass to the HDFStore
constructor (for DataFrames) or to customize dataset creation
(for arrays). Examples include:
- ``mode``: File mode (e.g., ``'a'``, ``'w'``)
- ``complevel``: Compression level (0-9)
- ``complib``: Compression library (e.g., ``'zlib'``)
- ``fletcher32``: Enable Fletcher32 checksum (bool)
In more details:
* mode : {'a', 'w', 'r', 'r+'}, default 'a'
``'r'``
Read-only; no data can be modified.
``'w'``
Write; a new file is created (an existing file with the same
name would be deleted).
``'a'``
Append; an existing file is opened for reading and writing,
and if the file does not exist it is created.
``'r+'``
It is similar to ``'a'``, but the file must already exist.
* complevel : int, 0-9, default None
Specifies a compression level for data.
A value of 0 or None disables compression.
* complib : {'zlib', 'lzo', 'bzip2', 'blosc'}, default 'zlib'
Specifies the compression library to be used.
As of v0.20.2 these additional compressors for Blosc are supported
(default if no compressor specified: 'blosc:blosclz'):
{'blosc:blosclz', 'blosc:lz4', 'blosc:lz4hc', 'blosc:snappy',
'blosc:zlib', 'blosc:zstd'}.
Specifying a compression library which is not available issues
a ValueError.
* fletcher32 : bool, default False
If applying compression use the fletcher32 checksum.
Returns
-------
store : object
An IO interface for the stored data. For DataFrames, this is a
``pd.HDFStore`` object; for arrays, an ``h5py.File`` object.
Examples
--------
>>> import os
>>> import pandas as pd
>>> from fusionlab.utils.io_utils import to_hdf5
>>> data = pd.DataFrame({
... 'a': [1, 2, 3],
... 'b': [4, 5, 6]
... })
>>> save_path = os.path.join('output', 'datafile')
>>> store = to_hdf5(data, fn=save_path, objname='mydata', verbose=1)
>>> # Access stored data:
>>> retrieved = store['mydata']
>>> print(retrieved.head())
Notes
-----
- Ensure the dependency ``pytables`` is installed when serializing a
DataFrame. If not, an error will be raised.
- When serializing NumPy arrays, the dataset is created with the name
``"dataset_01"``.
- If ``close`` is set to ``False``, the caller is responsible for
closing the store.
See Also
--------
joblib.dump : For serializing objects using Joblib.
pickle.dump : For serializing objects using Pickle.
h5py.File : For working with HDF5 files in Python.
References
----------
.. [1] McKinney, W. (2010). "Data Structures for Statistical Computing
in Python". Proceedings of the 9th Python in Science Conference.
.. [2] Van der Walt, S., Colbert, S. C., & Varoquaux, G. (2011).
"The NumPy Array: A Structure for Efficient Numerical
Computation". Computing in Science & Engineering, 13(2), 22-30.
"""
# Validate that data is either a NumPy array or a pandas DataFrame.
if not (isinstance(data, np.ndarray) or isinstance(data, pd.DataFrame)):
raise TypeError(
f"Expect a numpy array or pandas DataFrame, not "
f"{type(data).__name__!r}"
)
# Remove any existing HDF5 file extension from the provided filename.
fn = str(fn).replace('.h5', "").replace('.hdf5', "")
store = None
if isinstance(data, pd.DataFrame):
# Ensure the dependency 'pytables' is installed.
import_optional_dependency('tables')
# Create an HDFStore with a .h5 extension.
# remove extension if exist.
fn = str(fn).replace ('.h5', "").replace(".hdf5", "")
store = pd.HDFStore(fn + '.h5', **hdf5_kws)
# Use the provided objname or default to 'data'.
objname = objname or 'data'
store[str(objname)] = data
else:
# Convert data to a NumPy array (if not already).
data = np.asarray(data)
# Create an HDF5 file with a .hdf5 extension.
store = h5py.File(f"{fn}.hdf5", "w")
# Create a dataset named "dataset_01" with the array data.
store.create_dataset(
"dataset_01",
data.shape,
dtype=data.dtype,
data=data
)
# Optionally close the store.
if close:
store.close()
return store
def store_or_write_hdf5 (
df,
key:str= None,
mode:str='a',
kind: str=None,
path_or_buf:str= None,
encoding:str="utf8",
csv_sep: str=",",
index: bool=...,
columns:Union [str, List[Any]]=None,
sanitize_columns:bool=False,
func: Optional[callable]= None,
args: tuple=(),
applyto: Union [str, List[Any]]=None,
**func_kwds,
)->Union [None, pd.DataFrame]:
""" Store data to hdf5 or write data to csv file.
Note that by default, the data is not store nor write and
return data if frame or transform the Path-Like object to data frame.
Parameters
-----------
d: Dataframe, shape (m_samples, n_features)
data to store or write or sanitize.
key:str
Identifier for the group in the store.
mode: {'a', 'w', 'r+'}, default 'a'
Mode to open file:
- 'w': write, a new file is created (an existing file with the
same name would be deleted).
- 'a': append, an existing file is opened for reading and writing,
and if the file does not exist it is created.
- 'r+': similar to 'a', but the file must already exist.
kind: str, {'store', 'write', None} , default=None
Type of task to perform:
- 'store': Store data to hdf5
- 'write': export data to csv file.
- None: construct a dataframe if array is passed or sanitize it.
path_or_buf: str or pandas.HDFStore, or str, path object, file-like \
object, or None, default=None
File path or HDFStore object. String, path object
(implementing os.PathLike[str]), or file-like object implementing
a write() function. If ``write=True`` and None, the result is returned
as a string. If a non-binary file object is passed, it should be
opened with newline=" ", disabling universal newlines. If a binary
file object is passed, mode might need to contain a 'b'.
encoding: str, default='utf8'
A string representing the encoding to use in the output file,
Encoding is not supported if path_or_buf is a non-binary file object.
csv_sep: str, default=',',
String of length 1. Field delimiter for the output file.
index: bool, index =False,
Write data to csv with index or not.
columns: list of str, optional
Usefull to create a dataframe when array is passed. Be aware to fit
the number of array columns (shape[1])
sanitize_columns: bool, default=False,
remove undesirable character in the data columns using the default
argument of `regex` parameters and fill pattern to underscore '_'.
The default regex implementation is::
>>> import re
>>> re.compile (r'[_#&.)(*@!,;\s-]\s*', flags=re.IGNORECASE)
func: callable, Optional
A custom sanitizing function and apply to each columns of the dataframe.
If provide, the expected columns must be listed to `applyto` parameter.
args: tuple, optional
Positional arguments of the sanitizing columns
applyto: str or list of str, Optional
The list of columns to apply the function ``func``. To apply the
function to all columns, use the ``*`` instead.
func_kwds: dict,
Keywords arguments of the sanitizing function ``func``.
Return
-------
None or d: None of dataframe.
returns None if `kind` is set to ``write`` or ``store`` otherwise
return the dataframe.
Examples
--------
>>> from fusionlab.utils.io_utils import store_or_write_hdf5
>>> from fusionlab.datasets import load_bagoue
>>> data = load_bagoue().frame
>>> data.geol[:5]
0 VOLCANO-SEDIM. SCHISTS
1 GRANITES
2 GRANITES
3 GRANITES
4 GEOSYN. GRANITES
Name: geol, dtype: object
>>> data = store_or_write_hdf5 ( data, sanitize_columns = True)
>>> data[['type', 'geol', 'shape']] # put all to lowercase
type geol shape
0 cp volcano-sedim. schists w
1 ec granites v
2 ec granites v
>>> # compute using func
>>> def test_func ( a, times , to_percent=False ):
return ( a * times / 100) if to_percent else ( a *times )
>>> data.sfi[:5]
0 0.388909
1 1.340127
2 0.446594
3 0.763676
4 0.068501
Name: sfi, dtype: float64
>>> d = store_or_write_hdf5 ( data, func = test_func, args =(7,), applyto='sfi')
>>> d.sfi[:5]
0 2.722360
1 9.380889
2 3.126156
3 5.345733
4 0.479507
Name: sfi, dtype: float64
>>> store_or_write_hdf5 ( data, func = test_func, args =(7,),
applyto='sfi', to_percent=True).sfi[:5]
0 0.027224
1 0.093809
2 0.031262
3 0.053457
4 0.004795
Name: sfi, dtype: float64
>>> # write data to hdf5 and outputs to current directory
>>> store_or_write_hdf5 ( d, key='test0', path_or_buf= 'test_data.h5',
kind ='store')
>>> # export data to csv
>>> store_or_write_hdf5 ( d, key='test0', path_or_buf= 'test_data',
kind ='export')
"""
kind= key_search (str(kind), default_keys=(
"none", "store", "write", "export", "tocsv"),
raise_exception=True , deep=True)[0]
kind = "export" if kind in ('write', 'tocsv') else kind
is_frame(df, df_only =True, raise_exception=True, objname="Data")
d = to_numeric_dtypes(
df, columns=columns,sanitize_columns=sanitize_columns,
fill_pattern='_')
# get categorical variables
if ( sanitize_columns
or func is not None
):
d, _, cf = to_numeric_dtypes(d, return_feature_types= True )
#( strip then pass to lower case all non-numerical data)
# for minimum sanitization
for cat in cf :
d[cat]= d[cat].str.lower()
d[cat]= d[cat].str.strip()
if func is not None:
if not callable(func):
raise TypeError(
f"Expect a callable for `func`. Got {type(func).__name__!r}")
if applyto is None:
raise ValueError("Need to specify the data column to apply"
f"{func.__name__!r} to.")
applyto = is_iterable(
applyto, exclude_string=True, transform =True
) if applyto !="*" else d.columns
# check whether the applyto columns are in data columns
exist_features(d, applyto)
# map each colum
for col in applyto:
d [col]=d[col].apply( func, args=args, **func_kwds )
# store in h5 file.
if kind=='store':
if path_or_buf is None:
print("Destination file is missing. Use 'data.h5' instead outputs"
f" in the current directory {os.getcwd()}")
path_or_buf= 'data.h5'
d.to_hdf ( path_or_buf , key =key, mode =mode )
# export to csv file
if kind=="export":
d.to_csv(path_or_buf, encoding = encoding , sep=csv_sep ,
index =False if index is ... else index )
return d if kind not in ("store", "export") else None
def key_checker (
keys: str ,
valid_keys:List[str],
regex:re = None,
pattern:str = None ,
deep_search:bool =False
):
"""check whether a give key exists in valid_keys and return a list if
many keys are found.
Parameters
-----------
keys: str, list of str
Key value to find in the valid_keys
valid_keys: list
List of valid keys by default.
regex: `re` object,
Regular expresion object. the default is::
>>> import re
>>> re.compile (r'[_#&*@!_,;\s-]\s*', flags=re.IGNORECASE)
pattern: str, default = '[_#&*@!_,;\s-]\s*'
The base pattern to split the text into a columns
deep_search: bool, default=False
If deep-search, the key finder is no sensistive to lower/upper case
or whether a numeric data is included.
Returns
--------
keys: str, list ,
List of keys that exists in the `valid_keys`.
Examples
--------
>>> from fusionlab.utils.io_utils import key_checker
>>> key_checker('h502', valid_keys= ['h502', 'h253','h2601'])
Out[68]: 'h502'
>>> key_checker('h502+h2601', valid_keys= ['h502', 'h253','h2601'])
Out[69]: ['h502', 'h2601']
>>> key_checker('h502 h2601', valid_keys= ['h502', 'h253','h2601'])
Out[70]: ['h502', 'h2601']
>>> key_checker(['h502', 'h2601'], valid_keys= ['h502', 'h253','h2601'])
Out[73]: ['h502', 'h2601']
>>> key_checker(['h502', 'h2602'], valid_keys= ['h502', 'h253','h2601'])
UserWarning: key 'h2602' is missing in ['h502', 'h2602']
Out[82]: 'h502'
>>> key_checker(['502', 'H2601'], valid_keys= ['h502', 'h253','h2601'],
deep_search=True )
Out[57]: ['h502', 'h2601']
"""
_keys = copy.deepcopy(keys)
valid_keys = is_iterable(valid_keys , exclude_string =True, transform =True )
if isinstance ( keys, str):
pattern = pattern or '[_#&@!_+,;\s-]\s*'
keys = str2columns (keys, regex = regex , pattern=pattern )
# If iterbale object , save obj
# to improve error
kkeys = copy.deepcopy(keys)
if deep_search:
keys = key_search(
keys,
default_keys= valid_keys,
deep=True,
raise_exception= True,
regex =regex,
pattern=pattern
)
return keys[0] if len(keys)==1 else keys
# for consistency
keys = [ k for k in keys if ''.join(
[ str(i) for i in valid_keys] ).find(k)>=0 ]
# assertion error if key does not exist.
if len(keys)==0:
verb1, verb2 = ('', 'es') if len(kkeys)==1 else ('s', '')
msg = (f"key{verb1} {_keys!r} do{verb2} not exist."
f" Expect {' ,'.join(valid_keys)}") # dont use smartformat here
raise KeyError ( msg )
if len(keys) != len(kkeys):
# dont use is_in_if
miss_keys = is_in_if ( kkeys, keys , return_diff= True , error ='ignore')
miss_keys, verb = (miss_keys[0], 'is') if len( miss_keys) ==1 else (
miss_keys, 'are')
warnings.warn(f"key{'' if verb=='is' else 's'} {miss_keys!r} {verb}"
f" missing in {_keys}")
keys = keys[0] if len(keys)==1 else keys
return keys
def key_search (
keys: str,
default_keys: Union [Text , List[str]],
parse_keys: bool=True,
regex :re=None,
pattern :str=None,
deep: bool =...,
raise_exception:bool=...,
):
"""Find key in a list of default keys and select the best match.
Parameters
-----------
keys: str or list
The string or a list of key. When multiple keys is passed as a string,
use the space for key separating.
default_keys: str or list
The likehood key to find. Can be a litteral text. When a litteral text
is passed, it is better to provide the regex in order to skip some
character to parse the text properly.
parse_keys: bool, default=True
Parse litteral string using default `pattern` and `regex`.
.. versionadded:: 0.2.7
regex: `re` object,
Regular expresion object. Regex is important to specify the kind
of data to parse. the default is::
>>> import re
>>> re.compile (r'[_#&*@!_,;\s-]\s*', flags=re.IGNORECASE)
pattern: str, default = '[_#&*@!_,;\s-]\s*'
The base pattern to split the text into a columns. Pattern is
important especially when some character are considers as a part of
word but they are not a separator. For example a data columns with
a name `'DH_Azimuth'`, if a pattern is not explicitely provided,
the default pattern will parse as two separated word which is far
from the expected results.
deep: bool, default=False
Not sensistive to uppercase.
raise_exception: bool, default=False
raise error when key is not find.
Return
-------
list: list of valid keys or None if not find ( default)
Examples
---------
>>> from fusionlab.utils.io_utils import key_search
>>> key_search('h502-hh2601', default_keys= ['h502', 'h253','HH2601'])
Out[44]: ['h502']
>>> key_search('h502-hh2601', default_keys= ['h502', 'h253','HH2601'],
deep=True)
Out[46]: ['h502', 'HH2601']
>>> key_search('253', default_keys= ("I m here to find key among h502,
h253 and HH2601"))
Out[53]: ['h253']
>>> key_search ('east', default_keys= ['DH_East', 'DH_North'] , deep =True,)
Out[37]: ['East']
key_search ('east', default_keys= ['DH_East', 'DH_North'],
deep =True,parse_keys= False)
Out[39]: ['DH_East']
"""
def _ellipsis2false ( param):
if param ==...:
return False
return True
deep, raise_exception, parse_keys = [_ellipsis2false(
param) for param in [deep, raise_exception, parse_keys] ]
# make a copy of original keys
kinit = copy.deepcopy(keys)
if parse_keys:
if is_iterable(keys , exclude_string= True ):
keys = ' '.join ( [str(k) for k in keys ])
# for consisteny checker
pattern = pattern or '[#&@!_+,;\s-]\s*'
keys = str2columns ( keys , regex = regex , pattern = pattern )
if is_iterable ( default_keys , exclude_string=True ):
default_keys = ' '. join ( [ str(k) for k in default_keys ])
# make a copy
default_keys = str2columns(
default_keys, regex =regex , pattern = pattern )
else :
keys = is_iterable(
keys, exclude_string = True, transform =True )
default_keys = is_iterable (
default_keys, exclude_string=True, transform =True )
dk_init = copy.deepcopy(default_keys )
# if deep convert all keys to lower
if deep:
keys= [str(it).lower() for it in keys ]
default_keys = [str(it).lower() for it in default_keys ]
valid_keys =[]
for key in keys :
for ii, dkey in enumerate (default_keys) :
vk = re.findall(rf'\w*{key}\w*', dkey)
# rather than rf'\b\w*{key}\w*\b'
# if deep take the real values in defaults keys.
if len(vk) !=0:
if deep: valid_keys.append( dk_init[ii] )
else:valid_keys.extend( vk)
break
if ( raise_exception
and len(valid_keys)==0
):
kverb ='s' if len(kinit)> 1 else ''
raise KeyError (f"key{kverb} {kinit!r} not found."
f" Expect {','.join(dk_init)}")
return None if len(valid_keys)==0 else valid_keys
def serialize_data_in(
data,
filename: str = None,
force: bool = True,
savepath: str = None,
verbose: int = 0
) -> str:
r"""
Serializes a Python object to a binary file using either joblib
or pickle.
This function attempts to serialize the input `data` using the
``joblib.dump`` method. If this attempt fails, it falls back
to using ``pickle.dump``. The final file path is constructed
by concatenating the directory specified by ``savepath`` (or the
current working directory if ``savepath`` is None) with the
given ``filename``. Mathematically, the file path is given by:
.. math::
\text{filepath} = \text{savepath} \oplus \text{filename}
where :math:`\oplus` denotes string concatenation.
Parameters
----------
data: Any
The Python object to serialize. It must be compatible with
either ``joblib`` or ``pickle`` serialization.
filename : str, optional
The target filename for the serialized data. If ``None``, a
filename is generated using the current timestamp formatted as
``"%Y%m%d%H%M%S"`` (e.g., ``"serialized_20230315123045.pkl"``).
force : bool, default=True
Determines whether to overwrite an existing file with the same
filename. If ``False``, a timestamp is appended to the filename
to ensure uniqueness.
savepath : str, optional
The directory in which to save the serialized file. If not
specified, the file is saved to the current working directory
(``os.getcwd()``).
verbose : int, default=0
Controls the verbosity of output messages. Higher values
produce more detailed logging during the serialization process.
Returns
-------
str
The complete file path to which the data has been serialized.
Examples
--------
>>> from fusionlab.utils.io_utils import serialize_data_in
>>> data = {"a": 1, "b": 2}
>>> filepath = serialize_data_in(data, filename='data.pkl',
... force=True, verbose=1)
>>> print(filepath)
/path/to/current/directory/data.pkl
Notes
-----
The function first tries to serialize the input `data` using
``joblib.dump``. In case of any exception during this attempt, it
falls back to using ``pickle.dump``. This dual approach improves
robustness in diverse runtime environments where one serialization
method might be unsupported or encounter issues with the given data
type.
See Also
--------
joblib.dump : Serialize objects to disk using Joblib.
pickle.dump : Serialize objects to disk using Pickle.
os.getcwd : Retrieve the current working directory.
References
----------
.. [1] McKinney, W. (2010). "Data Structures for Statistical Computing
in Python". Proceedings of the 9th Python in Science Conference.
.. [2] Van der Walt, S., Colbert, S. C., & Varoquaux, G. (2011).
"The NumPy Array: A Structure for Efficient Numerical
Computation". Computing in Science & Engineering, 13(2), 22-30.
"""
# Determine the filename: if not provided, generate one using a timestamp.
if filename is None:
filename = (
f"serialized_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.pkl"
)
# Determine the save directory (default is current working directory)
directory = savepath if savepath is not None else os.getcwd()
filepath = os.path.join(directory, filename)
# If the file exists and force is False, modify the filename to ensure uniqueness.
if os.path.exists(filepath) and not force:
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
if filename.endswith('.pkl'):
filename = filename.replace('.pkl', f"_{timestamp}.pkl")
else:
filename = f"{filename}_{timestamp}.pkl"
filepath = os.path.join(directory, filename)
# Attempt to serialize the data using joblib.dump.
try:
joblib.dump(data, filepath)
if verbose > 0:
print(f"[INFO] Data serialized to {filepath} using joblib.")
except Exception as e:
if verbose > 0:
print(
f"[WARN] joblib.dump failed with error: {e}. "
"Falling back to pickle.dump..."
)
try:
with open(filepath, 'wb') as f:
pickle.dump(data, f)
if verbose > 0:
print(f"[INFO] Data serialized to {filepath} using pickle.")
except Exception as e2:
raise RuntimeError(
f"Serialization failed using both joblib and pickle: {e2}"
) from e2
return filepath
if filename is None:
filename = f"serialized_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.pkl"
filepath = os.path.join(savepath or os.getcwd(), filename)
if os.path.exists(filepath) and not force:
filename = filename.replace(
'.pkl', f"_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.pkl")
filepath = os.path.join(savepath or os.getcwd(), filename)
try:
joblib.dump(data, filepath)
if verbose > 0:
print(f"Data serialized to {filepath}")
except Exception as e: # noqa
with open(filepath, 'wb') as f:
pickle.dump(data, f)
if verbose > 0:
print(f"Data serialized using pickle to {filepath}")
return filepath
def save_path(nameOfPath: str) -> str:
"""
Creates a directory if it does not exist.
Parameters
----------
nameOfPath : str
Name or path of the directory to create.
Returns
-------
str
The path of the created directory. If it exists, returns the existing path.
Examples
--------
>>> save_path("test_directory")
'path/to/test_directory'
"""
path = os.path.join(os.getcwd(), nameOfPath)
os.makedirs(path, exist_ok=True)
return path
def sanitize_unicode_string(str_: str) -> str:
"""
Removes spaces and replaces accented characters in a string.
Parameters
----------
str_ : str
The string to sanitize.
Returns
-------
str
The sanitized string with removed spaces and replaced accents.
Examples
--------
>>> from fusionlab.utils.io_utils import sanitize_unicode_string
>>> sentence ='Nos clients sont extrêmement satisfaits '
'de la qualité du service fourni. En outre Nos clients '
'rachètent frequemment nos "services".'
>>> sanitize_unicode_string (sentence)
... 'nosclientssontextrmementsatisfaitsdelaqualitduservice'
'fournienoutrenosclientsrachtentfrequemmentnosservices'
>>> sanitize_unicode_string("Élève à l'école")
'elevealecole'
"""
accents_replacements = {'éèê': 'e', 'à â': 'a'}
str_ = re.sub(r'\s+', '', str_.lower())
for chars, repl in accents_replacements.items():
str_ = re.sub(f"[{chars}]", repl, str_)
return str_
def parse_md(pf: str, delimiter: str = ':'):
"""
Parse a markdown-style file with key-value pairs separated by
a delimiter.
Parameters
----------
pf : str
Path to the markdown file containing key-value pairs.
delimiter : str, default=':'
Delimiter used to separate key-value pairs.
Yields
------
Tuple[str, str]
A tuple containing the key and processed value.
Raises
------
IOError
If the provided path does not lead to a valid file.
Notes
-----
- This function yields key-value pairs by reading the file line-by-line.
- It applies `sanitize_unicode_string` to keys to ensure data consistency.
Examples
--------
>>> list(parse_md_data('parser_file.md', delimiter=':'))
[('key1', 'Value1'), ('key2', 'Value2')]
"""
if not os.path.isfile(pf):
raise IOError("Unable to detect the parser file. Need a Path-like object.")
with open(pf, 'r', encoding='utf8') as f:
pdata = f.readlines()
for row in pdata:
if row in ('\n', ' '):
continue
fr, en = row.strip().split(delimiter)
fr = sanitize_unicode_string(fr) # Clean up the key
en = en.strip()
# Capitalize the first letter of the value
en = en[0].upper() + en[1:]
yield fr, en
def dummy_csv_translator(
csv_fn: str, pf: str, delimiter: str = ':',
destfile: str = 'pme.en.csv'):
"""
Translate a CSV file using a dictionary created from a markdown-style parser file.
Parameters
----------
csv_fn : str
Path to the source CSV file.
pf : str
Path to the markdown-style file used to create the translation dictionary.
delimiter : str, default=':'
Delimiter used in the parser file to separate key-value pairs.
destfile : str, default='pme.en.csv'
Name of the destination file for the translated CSV.
Returns
-------
DataFrame
Translated CSV data as a DataFrame.
list
List of untranslated terms found in the source CSV.
Notes
-----
- This function uses `parse_md_data` to read the parser file and apply
translations to the CSV content.
- Missing translations are collected and returned for review.
Examples
--------
>>> df, missing = dummy_csv_translator(
"data.csv", "parser_file.md", delimiter=":", destfile="output.csv")
>>> print(df.head())
>>> print(missing)
"""
parser_data = dict(parse_md(pf, delimiter))
# Read CSV data
with open(csv_fn, 'r', encoding='utf8') as csv_f:
csv_reader = csv.reader(csv_f)
csv_data = [row for row in csv_reader]
# Locate 'Industry_type' column and split data blocks
industry_index = csv_data[0].index('Industry_type')
csv_1b = [row[:industry_index + 1] for row in csv_data]
csv_2b = [row[industry_index + 1:] for row in csv_data]
# Clean data in `csv_1b` and collect missing translations
csv_1b_cleaned = copy.deepcopy(csv_1b)
untranslated_terms = set()
for row in csv_1b_cleaned[3:]:
for i, value in enumerate(row):
value = sanitize_unicode_string(value.strip())
if value not in parser_data:
untranslated_terms.add(value)
else:
row[i] = parser_data.get(value, value)
# Combine cleaned blocks and convert to DataFrame
combined_data = [r1 + r2 for r1, r2 in zip(csv_1b_cleaned, csv_2b)]
df = pd.DataFrame(np.array(combined_data[1:]), columns=combined_data[0])
# Apply parser dictionary and save to destination file
df.replace(parser_data, inplace=True)
df.to_csv(destfile, index=False)
return df, list(untranslated_terms)
def rename_files(
src_files: Union[str, List[str]],
dst_files: Union[str, List[str]],
basename: Optional[str] = None,
extension: Optional[str] = None,
how: str = 'py',
prefix: bool = True,
keep_copy: bool = True,
trailer: str = '_',
sortby: Union[re.Pattern, callable] = None,
**kws
) -> None:
"""Rename files in directory.
Parameters
-----------
src_files: str, Path-like object
Source files to rename
dst_files: str of PathLike object
Destination files renamed.
extension: str, optional
If a path is given in `src_files`, specifying the `extension` will just
collect only files with this typical extensions.
basename: str, optional
If `dst_files` is passed as Path-object, name should be needed
for a change, otherwise, the number is incremented using the Python
index counting defined by the parameter ``how=py`
how: str, default='py'
The way to increment files when `dst_files` is given as a Path object.
For instance, for a ``name=E_survey`` and ``prefix==True``, the first
file should be ``E_survey_00`` if ``how='py'`` otherwise it should be
``E_survey_01``.
prefix: bool, default=True
Prefix is used to position the name before the number incrementation.
If ``False`` and `name` is given, the number is positionning before the
name. If ``True`` and not `prefix` for a ``name=E_survey``, it should be
``00_E_survey`` and ``01_E_survey``.
keep_copy: bool, default=True
Keep a copy of the source files.
trailer: str, default='_',
Item used to separate the basename for counter.
sortby: Regex or Callable,
Key to sort the collection of the items when `src_files` is passed as
a path-like object. This is usefull to keep order as the origin files
especially when files includes a specific character. Furthermore
[int| float |'num'|'digit'] sorted the files according to the
number included in the filename if exists.
kws: dict
keyword arguments passed to `os.rename`.
"""
dest_dir =None ; trailer = str(trailer)
extension = str(extension).lower()
if os.path.isfile (src_files ):
src_files = [src_files ]
elif os.path.isdir (src_files):
src_path = src_files
ldir = os.listdir(src_path)
src_files = ldir if extension =='none' else [
f for f in ldir if f.endswith (extension) ]
if sortby:
if sortby in ( int, float, 'num', 'number', 'digit'):
src_files = sorted(ldir, key=lambda s:int( re.search(
'\d+', s).group()) if re.search('\d+', s) else 0 )
else:
src_files = sorted(ldir, key=sortby)
src_files = [ os.path.join(src_path, f ) for f in src_files ]
# get only the files
src_files = [ f for f in src_files if os.path.isfile (f ) ]
else : raise FileNotFoundError(f"{src_files!r} not found.")
# Create the directory if it doesn't exist
if ( dst_files is not None
and not os.path.exists (dst_files)
):
os.makedirs(dst_files)
if os.path.isdir(dst_files):
dest_dir = dst_files
if isinstance (dst_files, str):
dst_files = [dst_files]
#XXX revise
# dst_files = is_iterable(dst_files , exclude_string= True, transform =True )
# get_extension of the source_files
_, ex = os.path.splitext (src_files[0])
if dest_dir:
if basename is None:
warnings.warn(
"Missing basename for renaming file. Should use `None` instead.")
basename =''; trailer =''
basename= str(basename)
if prefix:
dst_files =[ f"{str(basename)}{trailer}" + (
f"{i:03}" if how=='py' else f"{i+1:03}") + f"{ex}"
for i in range (len(src_files))]
elif not prefix:
dst_files =[ (f"{i:03}" if how=='py' else f"{i+1:03}"
) +f"{trailer}{str(basename)}" +f"{ex}"
for i in range (len(src_files))]
dst_files = [os.path.join(dest_dir , f) for f in dst_files ]
for f, nf in zip (src_files , dst_files):
try:
if keep_copy : shutil.copy (f, nf , **kws )
else : os.rename (f, nf , **kws )
except FileExistsError:
os.remove(nf)
if keep_copy : shutil.copy (f, nf , **kws )
else : os.rename (f, nf , **kws )
[docs]
@EnsureFileExists
def fetch_joblib_data(
job_file: str,
*keys: str,
error_mode: str = 'raise',
verbose: int = 0
) -> Union[Dict[str, Any], Tuple[Any, ...]]:
"""Dynamically load data from a joblib-saved dictionary with
flexible key access.
Parameters
----------
job_file : str
Path to the joblib file containing a dictionary
*keys : str
Variable-length list of dictionary keys to retrieve
error_mode : {'raise', 'warn', 'ignore'}, default='raise'
Handling of missing keys:
- 'raise': Immediately raise KeyError
- 'warn': Issue warning and skip missing keys
- 'ignore': Silently skip missing keys
verbose : int, default=0
Verbosity level:
- 0: No output
- 1: Basic loading information
- 2: Detailed debugging output
Returns
-------
Union[Dict, Tuple]
- Full dictionary if no keys specified
- Tuple of values for requested keys (maintaining order)
Raises
------
FileNotFoundError
If specified job_file doesn't exist
TypeError
If loaded data isn't a dictionary
KeyError
If requested key not found and error_mode='raise'
Examples
--------
>>> from fusionlab.utils.io_utils import fetch_joblib_data
>>> data = fetch_joblib_data('data.joblib', 'X_train', 'y_train')
>>> X, y = fetch_joblib_data('data.joblib', 'X_val', 'y_val', verbose=1)
>>> full_dict = fetch_joblib_data('data.joblib')
Notes
-----
- Maintains original insertion order for Python 3.7+ dictionaries
- Missing keys in 'warn'/'ignore' modes result in shorter return tuple
- Joblib files must contain dictionary objects
"""
try:
if verbose >= 1:
print(f"Loading data from {job_file}")
data = joblib.load(job_file)
except FileNotFoundError:
raise FileNotFoundError(f"Joblib file {job_file} not found") from None
except Exception as e:
raise ValueError(f"Error loading {job_file}: {str(e)}") from e
if not isinstance(data, dict):
raise TypeError(f"Loaded data from {job_file} is not a dictionary")
if not keys:
if verbose >= 1:
print("No keys requested - returning full dictionary")
return data
results = []
available_keys = list(data.keys ())
for key in keys:
if key in data:
results.append(data[key])
if verbose >= 2:
print(f"Successfully retrieved key: {key}")
else:
msg = ( f"Key '{key}' not found in {job_file}."
f" Available keys are: {smart_format(available_keys)}"
)
if error_mode == 'raise':
raise KeyError(msg)
elif error_mode == 'warn':
warnings.warn(msg, UserWarning)
if verbose >= 1:
print(f"Warning: {msg}")
# No action needed for 'ignore' mode
if verbose >= 1:
print(f"Retrieved {len(results)}/{len(keys)} requested items")
return tuple(results) if len(results) > 1 else results[0] if results else ()
def cpath(savepath: str = None, dpath: str = '_default_path_') -> str:
"""
Ensures a directory exists for saving files, creating it if necessary.
Parameters
----------
savepath : str, optional
The target directory to validate or create. If None, `dpath` is used
as the directory.
dpath : str, default='_default_path_'
Default directory created in the current working directory if
`savepath` is None.
Returns
-------
str
The absolute path to the validated or created directory.
Examples
--------
>>> from fusionlab.utils.io_utils import cpath
>>> default_path = cpath()
>>> print(f"Files will be saved to: {default_path}")
>>> custom_path = cpath('/path/to/save')
>>> print(f"Files will be saved to: {custom_path}")
Notes
-----
`cpath` validates the directory path and, if necessary, creates the
directory tree. If a problem occurs during creation, an error message
is printed.
See Also
--------
pathlib.Path.mkdir : Utility for directory creation.
"""
if savepath is None:
# Use default directory path if none provided
savepath = Path.cwd() / dpath
else:
savepath = Path(savepath)
try:
# Create the directory and parents if they do not exist
savepath.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating directory {savepath}: {e}")
return str(savepath.resolve())
def spath(name_of_path: str) -> str:
"""
Create a directory if it does not already exist.
Parameters
----------
name_of_path : str
Path-like object to create if it doesn't exist.
Returns
-------
str
The absolute path to the created or existing directory.
Examples
--------
>>> from fusionlab.utils.io_utils import spath
>>> path = spath('data/saved_models')
>>> print(f"Directory available at: {path}")
Notes
-----
`spath` is useful for quickly ensuring that a specific directory is
available for storing files. It provides feedback if the directory
already exists.
"""
savepath = os.path.join(os.getcwd(), name_of_path)
try:
if not os.path.isdir(savepath):
os.mkdir(name_of_path)
except:
warnings.warn("The path already exists.")
return savepath
def load_serialized_data(
filename: str, verbose: int = 0
):
"""
Load data from a serialized file (e.g., pickle or joblib format).
Parameters
----------
filename : str
Name of the file to load data from.
verbose : int, default=0
Verbosity level. Controls the amount of output information:
- 0: No output
- >2: Detailed loading process messages.
Returns
-------
Any
Data loaded from the file, or None if deserialization fails.
Raises
------
TypeError
If `filename` is not a string.
FileExistsError
If the specified file does not exist.
Examples
--------
>>> from fusionlab.utils.io_utils import load_serialized_data
>>> data = load_serialized_data('data/my_data.pkl', verbose=3)
Notes
-----
This function attempts to load serialized data using joblib and
fallbacks to pickle if needed. Verbose output provides feedback on
the loading process and success or failure of each step.
See Also
--------
joblib.load : High-performance loading utility.
pickle.load : General-purpose Python serialization library.
"""
filename = check_files(filename, return_valid = True )
_filename = os.path.basename(filename)
data = None
try:
# Attempt to load with joblib
data = joblib.load(filename)
if verbose > 2:
print(f"Data from {_filename!r} successfully reloaded using joblib.")
except:
if verbose > 2:
print(f"Fallback: {_filename!r} not loaded with joblib; trying pickle.")
with open(filename, 'rb') as tod:
data = pickle.load(tod)
if verbose > 2:
print(f"Data from {_filename!r} reloaded using pickle.")
if verbose > 0:
if data is None:
print("Unable to deserialize data. Please check your file.")
else:
print(f"Data from {_filename} has been successfully reloaded.")
return data
[docs]
def save_job(
job,
savefile ,* ,
protocol =None,
append_versions=True,
append_date=True,
fix_imports= True,
buffer_callback = None,
**job_kws
):
""" Quick save your job using 'joblib' or persistent Python pickle module.
Parameters
-----------
job: Any
Anything to save, preferabaly a models in dict
savefile: str, or path-like object
name of file to store the model.
The *file* argument must have a write() method that accepts a
single bytes argument. It can thus be a file object opened for
binary writing, an io.BytesIO instance, or any other custom
object that meets this interface.
append_versions: bool, default =True
Append the version of Joblib module or Python Pickle module following
by the scikit-learn, numpy and also pandas versions. This is useful
to have idea about previous versions for loading file when system or
modules have been upgraded. This could avoid bottleneck when data
have been stored for long times and user has forgotten the date and
versions at the time the file was saved.
append_date: bool, default=True,
Append the date of the day to the filename.
protocol: int, optional
The optional *protocol* argument tells the pickler to use the
given protocol; supported protocols are 0, 1, 2, 3, 4 and 5.
The default protocol is 4. It was introduced in Python 3.4, and
is incompatible with previous versions.
Specifying a negative protocol version selects the highest
protocol version supported. The higher the protocol used, the
more recent the version of Python needed to read the pickle
produced.
fix_imports: bool, default=True,
If *fix_imports* is True and *protocol* is less than 3, pickle
will try to map the new Python 3 names to the old module names
used in Python 2, so that the pickle data stream is readable
with Python 2.
buffer_call_back: int, optional
If *buffer_callback* is None (the default), buffer views are
serialized into *file* as part of the pickle stream.
If *buffer_callback* is not None, then it can be called any number
of times with a buffer view. If the callback returns a false value
(such as None), the given buffer is out-of-band; otherwise the
buffer is serialized in-band, i.e. inside the pickle stream.
It is an error if *buffer_callback* is not None and *protocol*
is None or smaller than 5.
job_kws: dict,
Additional keywords arguments passed to :func:`joblib.dump`.
Returns
-------
str
The final filename where the job was saved.
Notes
-----
This function appends system-specific metadata like versions and date to
the filename, which can aid in tracking compatibility over time.
Examples
--------
>>> from fusionlab.utils.io_utils import save_job
>>> model = {"key": "value"} # Replace with actual model object
>>> savefile = save_job(model, "my_model", append_date=True, append_versions=True)
>>> print(savefile)
'my_model.20240101.sklearn_v1.0.numpy_v1.21.joblib'
"""
def remove_extension(filename: str, extension: str) -> str:
return filename.replace(extension, '')
import sklearn
# check_files(savefile)
# Generate versioning metadata
versions = 'sklearn_v{0}.numpy_v{1}.pandas_v{2}'.format(
sklearn.__version__, np.__version__, pd.__version__)
date_str = datetime.datetime.now().strftime("%Y%m%d")
# Handle file extensions
savefile = str(savefile)
extension = '.joblib' if '.joblib' in savefile else '.pkl'
savefile = remove_extension(savefile, extension)
# Append date and versions if requested
if append_date:
savefile += f".{date_str}"
if append_versions:
savefile += f".{versions}"
try:
joblib.dump(job, f"{savefile}.joblib", **job_kws)
except Exception:
with open(f"{savefile}.pkl", 'wb') as wfile:
pickle.dump(job, wfile, protocol=protocol,
fix_imports=fix_imports,
buffer_callback=buffer_callback)
return savefile
def _cparser_manager(
cfile: str,
savepath: Optional[str] = None,
todo: str = 'load',
dpath: Optional[str] = None,
verbose: int = 0,
**pkws
) -> None:
"""
Manages configuration file saving and output messages based on action type.
Parameters
----------
cfile : str
Name of the configuration file.
savepath : str, optional
Directory path to save the configuration file.
todo : str, default='load'
Action to perform with the config file. Options are 'load' or 'dump'.
dpath : str, optional
Default path to use if savepath is not specified.
verbose : int, default=0
Controls verbosity level of output messages.
Notes
-----
This function uses `move_cfile` to ensure the configuration file is stored
in the correct location, and calls `print_cmsg` to provide user feedback.
"""
check_files(cfile)
if savepath == 'default':
savepath = None
yml_fn, _ = move_cfile(cfile, savepath, dpath=dpath)
if verbose > 0:
print(print_cmsg(yml_fn, todo, **pkws))
def move_cfile(
cfile: str,
savepath: Optional[str] = None,
**ckws
) -> Tuple[str, str]:
"""
Moves a file to the specified path. If moving fails, copies and
deletes the original.
Parameters
----------
cfile : str
Name of the file to move.
savepath : str, optional
Target directory. If not specified, uses default path via `cpath`.
Returns
-------
Tuple[str, str]
The new file path and a confirmation message.
Examples
--------
>>> from fusionlab.utils.io_utils import move_cfile
>>> new_path, msg = move_cfile('myfile.txt', 'new_directory')
>>> print(new_path, msg)
"""
check_files(cfile)
savepath = cpath(savepath or '_default_path_', **ckws)
destination_file_path = os.path.join(savepath, os.path.basename(cfile))
try:
shutil.move(cfile, destination_file_path)
except shutil.Error:
shutil.copy2(cfile, destination_file_path)
os.remove(cfile)
msg = (f"--> '{os.path.basename(destination_file_path)}'successfully"
f" saved to '{os.path.realpath(destination_file_path)}'."
)
return destination_file_path, msg
def print_cmsg(
cfile: str, todo: str = 'load', config: str = 'YAML') -> str:
"""
Generates output message for configuration file operations.
Parameters
----------
cfile : str
Name of the configuration file.
todo : str, default='load'
Operation performed ('load' or 'dump').
config : str, default='YAML'
Type of configuration file (e.g., 'YAML', 'CSV', 'JSON').
Returns
-------
str
Confirmation message for the configuration operation.
Examples
--------
>>> from fusionlab.utils.io_utils import print_cmsg
>>> msg = print_cmsg('config.yml', 'dump')
>>> print(msg)
--> YAML 'config.yml' data was successfully saved.
"""
if todo == 'load':
msg = f"--> Data successfully loaded from '{os.path.realpath(cfile)}'."
elif todo == 'dump':
msg =(
f"--> {config.upper()} '{os.path.basename(cfile)}'"
" data was successfully saved."
)
return msg
def parse_csv(
csv_fn: str = None,
data: Optional[Union[List[Dict], List[List[str]]]] = None,
todo: str = 'reader',
fieldnames: Optional[List[str]] = None,
savepath: Optional[str] = None,
header: bool = False,
verbose: int = 0,
**csvkws
) -> Union[List[Dict], List[List[str]], None]:
"""
Parses a CSV file or serializes data to a CSV file.
This function allows loading (reading) from or dumping (writing) to a CSV
file. It supports standard CSV and dictionary-based CSV formats.
Parameters
----------
csv_fn : str, optional
The CSV filename for reading or writing. For writing operations, if
`data` is provided and `todo` is set to 'write' or 'dictwriter', this
specifies the output CSV filename.
data : list, optional
Data to write in the form of a list of lists or dictionaries.
todo : str, default='reader'
Specifies the operation type:
- 'reader' or 'dictreader': Reads data from a CSV file.
- 'writer' or 'dictwriter': Writes data to a CSV file.
fieldnames : list of str, optional
List of keys for dictionary-based writing to specify the field order.
savepath : str, optional
Directory to save the CSV file when writing. Defaults to '_savecsv_'
if not provided and the path does not exist.
header : bool, default=False
If True, includes headers when writing with DictWriter.
verbose : int, default=0
Controls the verbosity level for output messages.
csvkws : dict, optional
Additional arguments passed to `csv.writer` or `csv.DictWriter`.
Returns
-------
Union[List[Dict], List[List[str]], None]
Parsed data from the CSV file, as a list of lists or a list of
dictionaries, based on the operation. Returns `None` when writing.
Notes
-----
For writing data, the method uses either `csv.writer` for regular CSV or
`csv.DictWriter` for dictionary-based CSV depending on the value of `todo`.
Examples
--------
>>> from fusionlab.utils.io_utils import parse_csv
>>> data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
>>> parse_csv(csv_fn='output.csv', data=data, todo='dictwriter', fieldnames=['name', 'age'])
>>> loaded_data = parse_csv(csv_fn='output.csv', todo='dictreader', fieldnames=['name', 'age'])
>>> print(loaded_data)
[{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}]
"""
csv_fn = check_files(csv_fn, formats ='.csv', return_valid=True )
todo, domsg = _return_ctask(todo)
if 'write' in todo:
csv_fn = get_config_fname_from_varname(
data, config_fname=csv_fn, config='.csv')
try:
if todo == 'reader':
with open(csv_fn, 'r') as csv_f:
csv_reader = csv.reader(csv_f)
data = [row for row in csv_reader]
elif todo == 'writer':
with open(f"{csv_fn}.csv", 'w', newline='', encoding='utf8') as new_csvf:
csv_writer = csv.writer(new_csvf, **csvkws)
csv_writer.writerows(data) if len(data) > 1 else csv_writer.writerow(data)
elif todo == 'dictreader':
with open(csv_fn, 'r', encoding='utf8') as csv_f:
csv_reader = csv.DictReader(csv_f, fieldnames=fieldnames)
data = list(csv_reader)
elif todo == 'dictwriter':
with open(f"{csv_fn}.csv", 'w') as new_csvf:
csv_writer = csv.DictWriter(new_csvf, fieldnames=fieldnames, **csvkws)
if header:
csv_writer.writeheader()
if isinstance(data, dict):
csv_writer.writerow(data)
else:
csv_writer.writerows(data)
except csv.Error as e:
raise csv.Error(f"Unable {domsg} CSV {csv_fn!r}. {e}")
except Exception as e:
msg = "Unrecognizable file" if 'read' in todo else "Unable to write"
raise TypeError(f"{msg} {csv_fn!r}. Check your"
f" {'file' if 'read' in todo else 'data'}. {e}")
_cparser_manager(f"{csv_fn}.csv", savepath, todo=todo, dpath='_savecsv_',
verbose=verbose, config='CSV')
return data
def _return_ctask(todo: Optional[str] = None) -> Tuple[str, str]:
"""
Determine the action to perform based on the `todo` input.
Parameters
----------
todo : str, optional
Specifies the action. Options:
- 'load': Load data from a config file (YAML|CSV|JSON).
- 'dump': Serialize data to a config file (YAML|CSV|JSON).
Returns
-------
Tuple[str, str]
`todo`: Corrected action string.
`domsg`: Message for user based on action.
Notes
-----
This function normalizes user input for `todo` to avoid misinterpretations.
"""
def p_csv(v, cond='dict', base='reader'):
return f"{cond}{base}" if cond in v else base
ltags = ('load', 'recover', True, 'fetch')
dtags = ('serialized', 'dump', 'save', 'write', 'serialize')
if todo is None:
raise ValueError(
"NoneType action cannot be performed. Specify 'load' or 'dump'.")
todo = str(todo).lower()
ltags += ('loads',) if todo == 'loads' else ()
dtags += ('dumps',) if todo == 'dumps' else ()
if todo in ltags:
todo, domsg = 'load', 'to parse'
elif todo in dtags:
todo, domsg = 'dump', 'to serialize'
elif 'read' in todo:
todo, domsg = p_csv(todo), 'to read'
elif 'write' in todo:
todo, domsg = p_csv(todo, base='writer'), 'to write'
else:
raise ValueError(
f"Invalid action '{todo}'. Use 'load' or 'dump' (YAML|CSV|JSON).")
return todo, domsg
def parse_yaml(
yml_fn: str = None,
data=None,
todo: str = 'load',
savepath: Optional[str] = None,
verbose: int = 0,
**ymlkws
):
"""
Parse and handle YAML configuration files for loading or saving data.
Parameters
----------
yml_fn : str, optional
The YAML filename. If `data` is provided and `todo` is set to 'dump',
`yml_fn` will be used as the output filename. If `todo` is set to
'load', `yml_fn` is the input filename to read from.
data : Any, optional
Data in a Python object format that will be serialized and saved as a
YAML file if `todo` is 'dump'.
todo : {'load', 'dump'}, default='load'
Action to perform with the YAML file:
- 'load': Load data from the YAML file specified by `yml_fn`.
- 'dump': Serialize `data` into a YAML format and save to `yml_fn`.
savepath : str, optional
Path where the YAML file will be saved if `todo` is 'dump'. If not
provided, a default path will be used. The function will ensure that
the path exists.
verbose : int, default=0
Controls verbosity of output messages.
**ymlkws : dict
Additional keyword arguments passed to `yaml.dump` when saving data.
Returns
-------
Any
The data loaded from the YAML file if `todo` is 'load', or `data`
after saving if `todo` is 'dump'.
Raises
------
yaml.YAMLError
If there is an issue with reading or writing the YAML file.
Notes
-----
This function uses `safe_load` and `safe_dump` methods from PyYAML for
secure handling of YAML files.
See Also
--------
`get_config_fname_from_varname` : Utility for generating YAML configuration
filenames based on variable names.
"""
yml_fn = check_files(yml_fn, formats =['.yml', '.yam'], return_valid=True )
# Determine task for loading or dumping YAML
todo = todo.lower()
if todo.startswith('dump'):
yml_fn = get_config_fname_from_varname(data, yml_fn)
try:
with open(f"{yml_fn}.yml", "w") as fw:
yaml.safe_dump(data, fw, **ymlkws)
except yaml.YAMLError:
raise yaml.YAMLError(
f"Unable to save data to {yml_fn}. Check file permissions.")
elif todo.startswith('load'):
try:
with open(yml_fn, "r") as fy:
data = yaml.safe_load(fy)
except yaml.YAMLError:
raise yaml.YAMLError(
f"Unable to load data from {yml_fn}. Check the YAML format.")
else:
raise ValueError(f"Invalid value for 'todo': {todo}. Use 'load' or 'dump'.")
# Manage paths and configurations
_cparser_manager(f"{yml_fn}.yml", savepath, todo=todo, dpath='_saveyaml_',
verbose=verbose, config='YAML')
return data
def get_config_fname_from_varname(
data, config_fname: Optional[str] = None, config: str = '.yml') -> str:
"""
Generate a filename based on a variable name for YAML configuration.
Parameters
----------
data : Any
The data object from which the variable name will be derived to
create a YAML configuration filename.
config_fname : str, optional
Custom configuration filename. If `None`, the name of `data` will
be used as the filename.
config : str, default='.yml'
The file extension/type for the configuration file. Can be '.yml',
'.json', or '.csv'.
Returns
-------
str
A suitable filename for saving the configuration data.
Raises
------
ValueError
If `config_fname` cannot be derived or an invalid file type is provided.
Notes
-----
This function supports dynamic filename generation based on variable names,
which aids in maintaining a clear configuration structure for serialized
data. Files are saved with appropriate extensions based on the `config` type.
"""
# Clean up file extension and validate config type
config = config.lstrip('.')
if config_fname is None:
try:
config_fname = f"{data}".split('=')[0].strip()
except Exception as e:
raise ValueError(f"Unable to determine configuration filename: {str(e)}")
else:
config_fname = config_fname.replace(f".{config}", "").replace(".yaml", "")
# Append correct file extension
return f"{config_fname}.{config}"
def parse_json(
json_fn: str = None,
data=None,
todo: str = 'load',
savepath: Optional[str] = None,
verbose: int = 0,
**jsonkws
):
"""
Parse and manage JSON configuration files, either loading data from
or saving data to a JSON file.
Parameters
----------
json_fn : str, optional
JSON filename or URL. If `data` is provided and `todo` is 'dump',
`json_fn` will be used as the output filename. If `todo` is 'load',
`json_fn` is the input filename or URL.
data : Any, optional
Data in Python object format to serialize and save if `todo` is 'dump'.
todo : {'load', 'loads', 'dump', 'dumps'}, default='load'
Action to perform with JSON:
- 'load': Load data from a JSON file.
- 'loads': Parse a JSON string.
- 'dump': Serialize `data` to a JSON file.
- 'dumps': Serialize `data` to a JSON string.
savepath : str, optional
Path where the JSON file will be saved if `todo` is 'dump'. If
`savepath` does not exist, it will save to the default path '_savejson_'.
verbose : int, default=0
Controls verbosity of output messages.
**jsonkws : dict
Additional keyword arguments passed to `json.dump` or `json.dumps`
when saving data.
Returns
-------
Any
The data loaded from the JSON file or URL if `todo` is 'load', or `data`
after saving if `todo` is 'dump'.
Raises
------
json.JSONDecodeError
If there is an issue with reading or writing the JSON file.
TypeError
If the JSON file or data cannot be processed.
Notes
-----
This function uses `json.load`, `json.loads`, `json.dump`, and `json.dumps`
for efficient handling of JSON files and strings.
See Also
--------
`fetch_json_data_from_url` : Fetches JSON data from a given URL.
`get_config_fname_from_varname` : Utility for generating JSON configuration
filenames based on variable names.
"""
json_fn = check_files(json_fn, formats ='.json', return_valid=True )
# Set task for loading or dumping JSON
if json_fn and "http" in json_fn:
todo, json_fn, data = fetch_json_data_from_url(json_fn, todo)
if 'dump' in todo:
json_fn = get_config_fname_from_varname(data, json_fn, config='.json')
JSON = {
"load": json.load,
"loads": json.loads,
"dump": json.dump,
"dumps": json.dumps
}
try:
if todo == 'load':
with open(json_fn, "r") as fj:
data = JSON[todo](fj)
elif todo == 'loads':
data = JSON[todo](json_fn)
elif todo == 'dump':
with open(f"{json_fn}.json", "w") as fw:
JSON[todo](data, fw, **jsonkws)
elif todo == 'dumps':
data = JSON[todo](data, **jsonkws)
except json.JSONDecodeError:
raise json.JSONDecodeError(
f"Unable to {todo} JSON file {json_fn}. Please verify your file.", f'{json_fn!r}', 1)
except Exception:
raise TypeError(
f"Error with {json_fn!r}. Verify your {'file' if 'load' in todo else 'data'}.")
_cparser_manager(
f"{json_fn}.json", savepath, todo=todo,
dpath='_savejson_', verbose=verbose, config='JSON'
)
return data
def fetch_json_data_from_url(url: str, todo: str = 'load'):
"""
Retrieve and parse JSON data from a URL.
Parameters
----------
url : str
Universal Resource Locator (URL) from which JSON data is fetched.
todo : {'load', 'dump'}, default='load'
Action to perform with JSON:
- 'load': Load JSON data from the URL.
- 'dump': Parse and prepare data from the URL for saving in a JSON file.
Returns
-------
tuple
A tuple of `todo` action, filename (or data source), and parsed data.
Raises
------
urllib.error.URLError
If there is an issue accessing the URL.
Notes
-----
The function uses `json.loads` to parse data directly from a URL response,
supporting convenient access to web-hosted JSON content.
"""
with urllib.request.urlopen(url) as jresponse:
source = jresponse.read()
data = json.loads(source)
if 'load' in todo:
todo, json_fn = 'loads', source
elif 'dump' in todo:
todo, json_fn = 'dumps', '_urlsourcejsonf.json'
return todo, json_fn, data
def deserialize_data(filename: str, verbose: int = 0) -> Any:
"""
Deserialize and load data from a serialized file using `joblib` or `pickle`.
The function attempts to load the serialized data from the provided file
`filename` using `joblib` first. If `joblib` fails, it tries to load the
data using `pickle`. An error is raised if both methods fail.
Parameters
----------
filename : str
The name or path of the file containing the serialized data.
This file is expected to be in a compatible format with either
`joblib` or `pickle`.
verbose : int, optional
Verbosity level. Messages indicating loading progress will be displayed
if `verbose` is greater than 0.
Returns
-------
Any
The data loaded from the serialized file, or `None` if loading fails.
Raises
------
TypeError
If `filename` is not a string, as file paths must be provided as strings.
FileNotFoundError
If the specified `filename` does not exist or cannot be located.
IOError
If both `joblib` and `pickle` fail to deserialize the data from the file.
ValueError
If the file was successfully read but yielded no data (i.e., `None`).
Examples
--------
>>> from fusionlab.utils.io_utils import deserialize_data
>>> data = deserialize_data('path/to/serialized_data.pkl', verbose=1)
Data loaded successfully from 'path/to/serialized_data.pkl' using joblib.
Notes
-----
The function first attempts deserialization with `joblib` to leverage
efficient file handling for large datasets. If `joblib` encounters an error,
it falls back to `pickle`, which provides broader compatibility with Python
objects but may be less optimized for large datasets.
See Also
--------
joblib.load : Joblib's load function for fast I/O operations on large data.
pickle.load : Pickle's load function for serializing and deserializing
Python objects.
References
----------
.. [1] Joblib Documentation - https://joblib.readthedocs.io
.. [2] Python Pickle Module - https://docs.python.org/3/library/pickle.html
"""
filename = check_files ( filename, return_valid =True )
# Attempt to load data using joblib
try:
data = joblib.load(filename)
if verbose:
print(f"Data loaded successfully from {filename!r} using joblib.")
except Exception as joblib_error:
# Fallback to pickle if joblib fails
try:
with open(filename, 'rb') as file:
data = pickle.load(file)
if verbose:
print(f"Data loaded successfully from {filename!r} using pickle.")
except Exception as pickle_error:
raise IOError(
f"Failed to load data from {filename!r}. "
f"Joblib error: {joblib_error}, Pickle error: {pickle_error}"
)
# Verify that the data is not None after successful deserialization
if data is None:
raise ValueError(
f"Data in {filename!r} could not be deserialized. "
"The file may be corrupted or contain no data."
)
return data
def serialize_data(
data: Any,
filename: Optional[str] = None,
savepath: Optional[str] = None,
to: Optional[str] = None,
force: bool = True,
compress: Optional[Union[int, str]] = None,
pickle_protocol: int = pickle.HIGHEST_PROTOCOL,
verbose: int = 0
) -> str:
r"""
Serialize and save a Python object to a binary file using either
``joblib`` or ``pickle``. This function is designed to be robust
and versatile, handling multiple cases including file naming,
overwriting behavior, and compression options.
The final file path is computed as:
.. math::
\text{filepath} = \text{savepath} \oplus \text{filename}
where :math:`\oplus` denotes string concatenation.
Parameters
----------
data : Any
The Python object to serialize. The object must be compatible
with either ``joblib.dump`` or ``pickle.dump``.
filename : str, optional
The target filename for the serialized data. If ``None``, a
filename is generated using the current timestamp, e.g.,
``"__mydumpedfile_20230315_123045.pkl"``.
savepath : str, optional
The directory in which to save the file. If not specified, the
current working directory (``os.getcwd()``) is used. The directory
is created if it does not exist.
to : str, optional
The serialization method to use. Acceptable values are
``'joblib'`` and ``'pickle'``. If ``None``, the default is
``'joblib'``.
force : bool, default=True
If ``True``, any existing file with the same name is overwritten.
If ``False``, a timestamp is appended to the filename to ensure
uniqueness.
compress : int or str, optional
Compression level or method for ``joblib.dump``. If ``None``,
no compression is applied.
pickle_protocol : int, default=pickle.HIGHEST_PROTOCOL
The pickle protocol to use when serializing with ``pickle.dump``.
verbose : int, default=0
Controls the verbosity of output messages. Higher values produce
more detailed logging during the serialization process.
Returns
-------
str
The full path to the saved serialized file.
Examples
--------
>>> from fusionlab.utils.io_utils import serialize_data
>>> import numpy as np
>>> data = {"a": np.arange(10), "b": np.random.rand(10)}
>>> filepath = serialize_data(
... data, filename="mydata.pkl", savepath="output",
... to="pickle", force=False, verbose=1
... )
>>> print(filepath)
/current/working/directory/output/mydata_<timestamp>.pkl
Notes
-----
The function first constructs the file path from ``savepath`` and
``filename``. If a file already exists and ``force`` is False, a
timestamp is appended to ensure uniqueness. Then, depending on the
value of ``to``, the function attempts to serialize the data using
either ``joblib.dump`` (with optional compression via the ``compress``
parameter) or ``pickle.dump`` (using the specified ``pickle_protocol``).
If an error occurs during serialization, an ``IOError`` is raised.
See Also
--------
joblib.dump : Serialize objects to disk using Joblib.
pickle.dump : Serialize objects to disk using Pickle.
os.getcwd : Retrieve the current working directory.
References
----------
.. [1] McKinney, W. (2010). "Data Structures for Statistical Computing
in Python". Proceedings of the 9th Python in Science Conference.
.. [2] Van der Walt, S., Colbert, S. C., & Varoquaux, G. (2011).
"The NumPy Array: A Structure for Efficient Numerical
Computation". Computing in Science & Engineering, 13(2), 22-30.
"""
# Set default serialization method to joblib if not provided.
if to is None:
to = "joblib"
else:
if not isinstance(to, str):
raise TypeError(
f"Serialization method `to` must be a string, not "
f"{type(to)}."
)
to = to.lower()
if to not in ("joblib", "pickle"):
raise ValueError(
"Unknown serialization method `to`. Must be "
"'joblib' or 'pickle'."
)
# Generate filename using timestamp if not provided.
if filename is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"__mydumpedfile_{timestamp}.pkl"
# Ensure filename ends with .pkl
if not filename.endswith(".pkl"):
filename += ".pkl"
# Determine full save directory.
directory = savepath if savepath is not None else os.getcwd()
if not os.path.exists(directory):
try:
os.makedirs(directory)
if verbose:
print(f"[INFO] Created directory: {directory}")
except Exception as e:
raise IOError(
f"Failed to create directory '{directory}': {e}"
) from e
full_path = os.path.join(directory, filename)
# If file exists and force is False, append a timestamp.
if os.path.exists(full_path) and not force:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
if filename.endswith(".pkl"):
filename = filename.replace(
".pkl", f"_{timestamp}.pkl"
)
else:
filename = f"{filename}_{timestamp}.pkl"
full_path = os.path.join(directory, filename)
if verbose:
print(
f"[INFO] File exists. New filename generated: {filename}"
)
# Attempt serialization using the specified method.
try:
if to == "joblib":
joblib.dump(data, full_path, compress=compress)
if verbose:
print(
f"[INFO] Data serialized using joblib and saved to "
f"{full_path!r}."
)
elif to == "pickle":
with open(full_path, "wb") as file:
pickle.dump(data, file, protocol=pickle_protocol)
if verbose:
print(
f"[INFO] Data serialized using pickle and saved to "
f"{full_path!r}."
)
except Exception as e:
raise IOError(
f"An error occurred during data serialization: {e}"
) from e
return full_path
def fetch_tgz_from_url(
data_url: str,
tgz_filename: str,
data_path: Optional[Union[str, Path]] = None,
file_to_retrieve: Optional[str] = None,
**kwargs
) -> Optional[Path]:
"""
Downloads a .tgz file from a specified URL, saves it to a directory,
and optionally extracts a specific file from the archive.
This function retrieves a .tgz file from the provided `data_url` and saves
it to the specified `data_path` directory. If `file_to_retrieve` is specified,
the function will extract only that file from the archive; otherwise, the
entire archive will be extracted.
Parameters
----------
data_url : str
The URL to download the .tgz file from.
tgz_filename : str
The name to assign to the downloaded .tgz file.
data_path : Union[str, Path], optional
Directory where the downloaded file will be saved. Defaults to a 'tgz_data'
directory in the current working directory if not specified.
file_to_retrieve : str, optional
Specific filename to extract from the .tgz archive. If not provided,
the entire archive is extracted.
**kwargs : dict
Additional keyword arguments to pass to the extraction method.
Returns
-------
Optional[Path]
Path to the extracted file if a specific file was requested; otherwise,
returns None.
Raises
------
FileNotFoundError
If the specified `file_to_retrieve` is not found in the archive.
Examples
--------
>>> from fusionlab.utils.io_utils import fetch_tgz_from_url
>>> data_url = 'https://example.com/data.tar.gz'
>>> extracted_file = fetch_tgz_from_url(
... data_url, 'data.tar.gz', data_path='data_dir', file_to_retrieve='file.csv')
>>> print(extracted_file)
Notes
-----
Uses the `tqdm` progress bar for tracking download progress.
"""
import urllib.request
data_path = Path(data_path or os.path.join(os.getcwd(), 'tgz_data'))
data_path.mkdir(parents=True, exist_ok=True)
tgz_path = data_path / tgz_filename
# Download with progress bar
with tqdm(unit='B', unit_scale=True, miniters=1, desc=tgz_filename, ncols=100) as t:
urllib.request.urlretrieve(data_url, tgz_path, reporthook=_download_progress_hook(t))
try:
with tarfile.open(tgz_path, "r:gz") as tar:
if file_to_retrieve:
tar.extract(file_to_retrieve, path=data_path, **kwargs)
return data_path / file_to_retrieve
tar.extractall(path=data_path)
except (tarfile.TarError, KeyError) as e:
print(f"Error extracting {file_to_retrieve or 'archive'}: {e}")
return None
return None
def fetch_tgz_locally(
tgz_file: str,
filename: str,
savefile: str = 'tgz',
rename_outfile: Optional[str] = None
) -> str:
"""
Extracts a specific file from a local .tgz archive and optionally renames it.
This function fetches a specific file `filename` from a local tar archive
located at `tgz_file`, and saves it to `savefile`. If `rename_outfile` is
specified, the file is renamed after extraction.
Parameters
----------
tgz_file : str
Full path to the tar file.
filename : str
Name of the target file to extract from the archive.
savefile : str, optional
Destination directory for the extracted file, defaulting to 'tgz'.
rename_outfile : str, optional
New name for the fetched file. If not provided, retains the original name.
Returns
-------
str
Full path to the fetched and possibly renamed file.
Raises
------
FileNotFoundError
If the `tgz_file` or the specified `filename` is not found.
Examples
--------
>>> from fusionlab.utils.io_utils import fetch_tgz_locally
>>> fetched_file = fetch_tgz_locally(
... 'path/to/archive.tgz', 'file.csv', savefile='extracted', rename_outfile='renamed.csv')
>>> print(fetched_file)
"""
tgz_path = Path(tgz_file)
save_path = Path(savefile)
save_path.mkdir(parents=True, exist_ok=True)
if not tgz_path.is_file():
raise FileNotFoundError(f"Source {tgz_file!r} is not a valid file.")
with tarfile.open(tgz_path) as tar:
member = next((m for m in tar.getmembers() if m.name.endswith(filename)), None)
if member:
tar.extract(member, path=save_path)
extracted_file_path = save_path / member.name
final_file_path = save_path / (rename_outfile if rename_outfile else filename)
if extracted_file_path != final_file_path:
extracted_file_path.rename(final_file_path)
if extracted_file_path.parent != save_path:
shutil.rmtree(extracted_file_path.parent, ignore_errors=True)
else:
raise FileNotFoundError(f"File {filename} not found in {tgz_file}.")
print(f"--> '{final_file_path}' was successfully extracted from '{tgz_path.name}' "
f"and saved to '{save_path}'.")
return str(final_file_path)
def extract_tar_with_progress(
tar: tarfile.TarFile,
member: tarfile.TarInfo,
path: Path
):
"""
Extracts a single file from a tar archive with a progress bar.
Parameters
----------
tar : tarfile.TarFile
Opened tar file object.
member : tarfile.TarInfo
Tar member (file) to be extracted.
path : Path
Directory path where the file will be extracted.
Examples
--------
>>> from fusionlab.utils.io_utils import extract_tar_with_progress
>>> with tarfile.open('data.tar.gz', 'r:gz') as tar:
... member = tar.getmember('file.csv')
... extract_tar_with_progress(tar, member, Path('output_dir'))
Notes
-----
Uses `tqdm` for progress tracking of the file extraction process.
"""
with tqdm(total=member.size, desc=f"Extracting {member.name}",
unit='B', unit_scale=True) as progress_bar:
with tar.extractfile(member) as member_file:
with open(path / member.name, 'wb') as out_file:
shutil.copyfileobj(member_file, out_file, length=1024 * 1024,
callback=lambda x: progress_bar.update(1024 * 1024))
def _download_progress_hook(t):
"""Progress hook for urlretrieve to update tqdm progress bar."""
last_block = [0]
def inner(block_count=1, block_size=1, total_size=None):
if total_size is not None:
t.total = total_size
t.update((block_count - last_block[0]) * block_size)
last_block[0] = block_count
return inner
def load_csv(
data_path: str, delimiter: Optional[str] = ',', **kwargs
) ->pd.DataFrame:
"""
Loads a CSV file into a pandas DataFrame.
This function reads a comma-separated values (CSV) file into a `pandas`
DataFrame, with the ability to specify a custom delimiter. It provides
support for additional options passed to `pandas.read_csv` for more
granular control over the data loading process.
Parameters
----------
data_path : str
The file path to the CSV file that is to be loaded. The file path must
lead to a `.csv` file. If the file does not exist at the specified path,
a `FileNotFoundError` is raised.
delimiter : str, optional
The character used to separate values in the CSV file. The default is
`,` for standard CSVs. If a different delimiter is used in the file
(e.g., `;`), it can be specified here.
**kwargs : dict
Additional keyword arguments that will be passed directly to
`pandas.read_csv`. For instance, users can specify `header`, `index_col`,
`dtype`, and other options supported by `read_csv` for more customized
data handling.
Returns
-------
DataFrame
A pandas DataFrame containing the loaded data, with the specified
options applied.
Raises
------
FileNotFoundError
If the specified file does not exist at the provided `data_path`.
ValueError
If the file specified by `data_path` is not a CSV file (i.e., does not
have a `.csv` extension), a `ValueError` is raised to ensure correct
file type.
Notes
-----
This function simplifies the process of loading CSV data into a DataFrame,
with a straightforward parameter for delimiter customization and full access
to `pandas.read_csv` options. It is ideal for basic CSV loading tasks, as well
as more complex ones requiring specific column handling, type casting, and
missing value handling, which can be passed via `**kwargs`.
Examples
--------
Suppose you have a CSV file `example.csv` with the following content:
```
name,age,city
Alice,30,New York
Bob,25,Los Angeles
```
To load this file into a DataFrame:
>>> from fusionlab.utils.io_utils import load_csv
>>> df = load_csv('example.csv')
>>> print(df)
name age city
0 Alice 30 New York
1 Bob 25 Los Angeles
If the file uses a semicolon (`;`) as the delimiter:
>>> df = load_csv('example.csv', delimiter=';')
Additionally, you can pass custom `read_csv` parameters through `**kwargs`,
such as specifying a column as the index:
>>> df = load_csv('example.csv', index_col='name')
>>> print(df)
age city
name
Alice 30 New York
Bob 25 Los Angeles
See Also
--------
pandas.read_csv : Full documentation for loading CSV files into a DataFrame
with detailed parameter options.
References
----------
.. [1] Wes McKinney, "Python for Data Analysis," 2nd Edition, O'Reilly Media, 2017.
"""
if not os.path.isfile(data_path):
raise FileNotFoundError(f"The file '{data_path}' does not exist.")
if not data_path.lower().endswith('.csv'):
raise ValueError(
"The specified file is not a CSV file. Please provide a valid CSV file.")
# Load the CSV data into a DataFrame with the specified delimiter and additional kwargs
return pd.read_csv(data_path, delimiter=delimiter, **kwargs)
def get_valid_key(input_key, default_key, substitute_key_dict=None,
regex_pattern = "[#&*@!,;\s]\s*", deep_search=True):
"""
Validates an input key and substitutes it with a valid key if necessary,
based on a mapping of valid keys to their possible substitutes. If the input
key is not provided or is invalid, a default key is used.
Parameters
----------
input_key : str
The key to validate and possibly substitute.
default_key : str
The default key to use if input_key is None, empty, or not found in
the substitute mapping.
substitute_key_dict : dict, optional
A mapping of valid keys to lists of their possible substitutes. This
allows for flexible key substitution and validation.
regex_pattern: str, default = '[#&*@!,;\s-]\s*'
The base pattern to split the text into a columns
deep_search: bool, default=False
If deep-search, the key finder is no sensistive to lower/upper case
or whether a numeric data is included.
Returns
-------
str
A valid key, which is either the original input_key if valid, a substituted
key if the original was found in the substitute mappings, or the default_key.
Notes
-----
This function also leverages an external validation through `key_checker` for
a deep search validation, ensuring the returned key is within the set of valid keys.
Example
-------
>>> from fusionlab.utils.io_utils import get_valid_key
>>> substitute_key_dict = {'valid_key1': ['vk1', 'key1'], 'valid_key2': ['vk2', 'key2']}
>>> get_valid_key('vk1', 'default_key', substitute_key_dict)
'valid_key1'
>>> get_valid_key('unknown_key', 'default_key', substitute_key_dict)
'KeyError...'
"""
# Ensure substitute_mapping is a dictionary if not provided
substitute_key_dict = substitute_key_dict or {}
# Fallback to default_key if input_key is None or empty
input_key = input_key or default_key
# Attempt to find a valid substitute for the input_key
for valid_key, substitutes in substitute_key_dict.items():
# Case-insensitive comparison for substitutes
normalized_substitutes = [str(sub).lower() for sub in substitutes]
if str(input_key).lower() in normalized_substitutes:
input_key = valid_key
break
regex = re.compile (fr'{regex_pattern}', flags=re.IGNORECASE)
# use valid keys only if substitute_key_dict not provided.
valid_keys = substitute_key_dict.keys() if substitute_key_dict else is_iterable(
default_key, exclude_string=True, transform=True)
valid_keys = set (list(valid_keys) + [default_key])
# Further validate the (possibly substituted) input_key
input_key = key_checker(input_key, valid_keys=valid_keys,
deep_search=deep_search,regex = regex )
return input_key
def to_txt(
d,
filename=None,
format='txt',
indent=2,
width=80,
depth=None,
compat=False,
include_header=True,
mode='w',
encoding='utf-8',
overwrite=True,
header=None,
footer=None,
serializer=None,
savepath = None,
verbose=1,
**kwargs
):
"""
Export data objects to a text or JSON file with
optional custom formatting.
The function, `<to_txt>`, handles writing `<d>`
(a string, dict, list, or general object) to a
file named `<filename>`. When no filename is
given, it automatically generates one based on
the current date/time. If `<format>` is `"json"`
and `<d>` is valid for JSON serialization,
it attempts a JSON export. Otherwise, it falls
back to text mode, leveraging Python's built-in
`pformat` and an optional `<serializer>` for
advanced transformations.
.. math::
\\text{FileName}_{timestamp} \\rightarrow \\text{output}
where :math:`\\text{FileName}_{timestamp}` is an
auto-generated name like `output_20230101_123456.txt`
if `<filename>` is not provided.
Parameters
----------
d : object
Data to write. Can be any Python object
supported by `pformat`, or a dict if
`<format>` is `'json'`.
filename : str, optional
Full path (or name) of the output file. If
None, a time-stamped name is produced,
prefixed with `'output_'`.
format : str, default='txt'
File format, either ``"txt"`` or ``"json"``.
If it fails to serialize as JSON, the
process reverts to text.
indent : int, default=2
Indentation level for pretty-printing text
or JSON.
width : int, default=80
Wrap width for formatted text lines.
depth : int, optional
Maximum depth to which nested structures
are expanded. If None, no limit is applied.
compat : bool, default=False
If True, instructs `pformat` to produce
more compact text. Not used when exporting
JSON.
include_header : bool, default=True
Whether to include a decorative header
(with timestamp) at the top of the file in
text mode.
mode : str, default='w'
File writing mode. Typically `'w'` for
overwrite, `'a'` for append.
encoding : str, default='utf-8'
Text encoding used when opening the file.
overwrite : bool, default=True
If False, raises an error if the file
already exists.
header : str, optional
Custom header text (if `<include_header>`
is True). Overwrites the default header
if given.
footer : str, optional
Custom footer text appended at the end of
the file, if `<include_header>` is True.
serializer : callable, optional
A function that transforms `<d>` before
printing. If it fails, `<d>` remains
unchanged.
verbose : int, default=1
Verbosity level for logging. Higher
values yield more console messages
(e.g., file stats at `<verbose>`>=3).
**kwargs
Additional parameters passed to the JSON
serializer (`json.dump`) or `pformat`.
Returns
-------
str
The final filename used to store the
output (potentially auto-generated).
Notes
-----
If `<format>` is `"json"`, the function tries
`json.dump` with a few standard parameters.
If an exception occurs, it reverts to text
export. The `<serializer>` argument allows
custom transformations, such as flattening
nested dicts or converting objects to JSON-
serializable representations [1]_.
Examples
--------
>>> from fusionlab.utils.io_utils import to_txt
>>> my_data = {"name":"Alice","age":30}
>>> # Basic text export
>>> txt_file = to_txt(my_data, verbose=2)
>>> # Enforce JSON format
>>> json_file = to_txt(my_data, format='json', indent=4)
See Also
--------
pformat : Pretty-print complex Python data
structures.
References
----------
.. [1] van Rossum, Guido, *Python's standard
library "json" module*, Python Docs.
"""
# Generate filename with timestamp if not provided
if filename is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
ext = 'json' if format == 'json' else 'txt'
filename = f"output_{timestamp}.{ext}"
# Use the savepath if provided
if savepath:
# Ensure directory exists
if not os.path.exists(savepath):
os.makedirs(savepath)
# Join the savepath and filename
filename = os.path.join(savepath, filename)
# Check file existence if overwrite disabled
if not overwrite and os.path.exists(filename):
raise FileExistsError(
f"File '{filename}' exists. Set overwrite=True to override.")
original_format = format.lower()
success = False
file_created = False
# Attempt JSON export if requested
if original_format == 'json':
try:
json_kwargs = {'indent': indent, 'ensure_ascii': False}
json_kwargs.update(kwargs)
with open(filename, 'w', encoding=encoding) as f:
json.dump(d, f, **json_kwargs)
success = True
file_created = True
if verbose >= 1:
print(f"JSON export successful: {filename}")
except Exception as e:
if verbose >= 1:
print(f"JSON export failed ({e}), falling back to TXT")
filename = os.path.splitext(filename)[0] + '.txt'
format = 'txt'
# TXT export fallback
if not success:
# Apply serializer if provided
processed_data = d
if serializer:
try:
processed_data = serializer(d)
except Exception as e:
if verbose >= 2:
print(f"Serializer error: {e}, using original data")
# Generate formatted content
if isinstance(processed_data, str):
content = processed_data
else:
content = pformat(
processed_data,
indent=indent,
width=width,
depth=depth,
compact=compat
)
# Add header/footer
header_section = ""
if include_header:
header_section = header if header else (
"\n" + "=" * 60 + "\n"
f"DATA EXPORT | {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
"=" * 60 + "\n\n"
)
footer_section = footer if footer else (
"\n\n" + "=" * 60 + "\n" if include_header else ""
)
content = f"{header_section}{content}{footer_section}"
# Write to file
with open(filename, mode, encoding=encoding) as f:
f.write(content)
file_created = True
# Verbose reporting
if verbose >= 1 and file_created:
print(f"Created '{filename}' ({format.upper()})")
if verbose >= 3:
try:
size = os.path.getsize(filename)
print(f"Dimensions: {size} bytes | {len(content.splitlines())} lines")
print(f"Encoding: {encoding} | Mode: {mode}")
except Exception as e:
if verbose >= 4:
print(f"Metadata error: {e}")
return filename