# -*- coding: utf-8 -*-
"""This module implements `namedzip` and `namedzip_longest`, which
extend `zip` and `itertools.zip_longest` respectively to generate
named tuples.
copyright: (c) 2019 by Erik R Berlin.
license: MIT, see LICENSE for more details.
"""
from collections import namedtuple
from functools import wraps
from inspect import isclass
import warnings
sentinel = object()
def _deprecation_warning(func):
"""Deprecation warning decorator for old api signatures."""
deprecation_message = (
"The typename and field_names parameters will be removed in "
"namedzip v2.0.0. Please use the named_tuple parameter instead."
)
@wraps(func)
def wrapper(*args, **kwargs):
deprecated_kwargs = bool(
"typename" in kwargs.keys() or "field_names" in kwargs.keys()
)
if deprecated_kwargs:
warnings.filterwarnings("always", message=deprecation_message)
warnings.warn(
category=DeprecationWarning, message=deprecation_message, stacklevel=2
)
if func.__name__ == "namedzip":
return _namedzip_v1(*args, **kwargs)
else:
return _namedzip_longest_v1(*args, **kwargs)
else:
return func(*args, **kwargs)
return wrapper
[docs]@_deprecation_warning
def namedzip(named_tuple, *iterables):
"""Extends :func:`zip` to generate named tuples.
Returns a generator if `*iterables` are supplied, otherwise returns
a function for creating generators.
Parameters
----------
named_tuple : tuple subclass
tuple subclass from `collections.namedtuple` factory function,
or subclass of `typing.NamedTuple`.
*iterables : iterable, optional
Iterable objects to zip.
Returns
-------
generator object
If `*iterables` are supplied.
function object
If `*iterables` are not supplied.
"""
_verify_named_tuple(named_tuple)
def _namedzip_factory(*iterables):
_compare_iterables_to_fields(len(iterables), len(named_tuple._fields))
zipped = _create_zip(*iterables)
return _namedzip_generator(zipped, named_tuple)
if iterables:
return _namedzip_factory(*iterables)
else:
return _namedzip_factory
[docs]@_deprecation_warning
def namedzip_longest(named_tuple, *iterables, fillvalue=None, defaults=None):
"""Extends :func:`itertools.zip_longest` to generate named tuples.
Returns a generator if `*iterables` are supplied, otherwise returns
a function for creating generators.
Parameters
----------
named_tuple : tuple subclass
tuple subclass from `collections.namedtuple` factory function,
or subclass of `typing.NamedTuple`.
*iterables : iterable, optional
Iterable objects to zip.
fillvalue : optional
Use for setting all missing values to the same default value.
(default is None).
defaults : iterable, optional
Individual default values for each iterable to zip. Overrides
`fillvalue` for last n iterables, and any defaults specified in
`named_tuple`. Length can be less than or equal to the number of
named tuple field names.
Returns
-------
generator object
If `*iterables` are supplied.
function object
If `*iterables` are not supplied.
"""
defaults = _set_defaults(defaults, fillvalue, named_tuple)
if defaults is not None:
# Override fillvalue when individual defaults are set.
fillvalue = sentinel
def _namedzip_longest_factory(*iterables):
_compare_iterables_to_fields(len(iterables), len(named_tuple._fields))
zipped = _create_zip(*iterables, fillvalue=fillvalue, type_longest=True)
return _namedzip_generator(zipped, named_tuple, defaults)
if iterables:
return _namedzip_longest_factory(*iterables)
else:
return _namedzip_longest_factory
def _namedzip_v1(*iterables, typename, field_names, **kwargs):
"""Extends :func:`zip` to generate named tuples.
Returns a generator if `*iterables` are supplied, otherwise returns
a function for creating generators.
Parameters
----------
*iterables : iterable, optional
Iterable objects passed as positional arguments.
typename : string
Type name for generated named tuple objects. Passed on to
`collections.namedtuple` factory function.
field_names : iterable
Field names for generated named tuple objects.Passed on to
`collections.namedtuple` factory function.
**kwargs :
Any additional keyword arguments will also be passed on to the
`collections.namedtuple` factory function.
Returns
-------
generator object
If `*iterables` are supplied.
function object
If `*iterables` are not supplied.
"""
named_tuple = namedtuple(typename, field_names, **kwargs)
def _namedzip_factory(*iterables):
_compare_iterables_to_fields(len(iterables), len(named_tuple._fields))
zipped = _create_zip(*iterables)
return _namedzip_generator(zipped, named_tuple)
if iterables:
return _namedzip_factory(*iterables)
else:
return _namedzip_factory
def _namedzip_longest_v1(*iterables, typename, field_names, **kwargs):
"""Extends :func:`itertools.zip_longest` to generate named tuples.
Returns a generator if `*iterables` are supplied, otherwise returns
a function for creating generators.
Parameters
----------
*iterables : iterable, optional
Iterable objects passed as positional arguments.
typename : string
Type name for generated named tuple objects. Passed on to
`collections.namedtuple` factory function.
field_names : iterable
Field names for generated named tuple objects. Passed on to
`collections.namedtuple` factory function.
fillvalue : optional
Use for setting all missing values to the same default value.
Passed on to `itertools.zip_longest` (default is None).
defaults : iterable, optional
Individual default values for each iterable to zip. Overrides
custom `fillvalue` if specified, and length must match the
number of `*iterables` supplied.
**kwargs
Any additional keyword arguments will be passed on to the
`collections.namedtuple` factory function.
Returns
-------
generator object
If `*iterables` are supplied.
function object
If `*iterables` are not supplied.
Raises
------
ValueError
If `defaults` are specified but do not match the number of
`field_names`.
Notes
-----
Does not utilize the functionality of `collections.namedtuple` for
setting default values.
"""
fillvalue = kwargs.pop("fillvalue", None)
defaults = kwargs.pop("defaults", None)
named_tuple = namedtuple(typename, field_names, **kwargs)
if defaults and len(defaults) != len(named_tuple._fields):
raise ValueError(
"Unequal number of field names ({}) and default values ({}).".format(
len(named_tuple._fields), len(defaults)
)
)
elif defaults is not None:
# Override fillvalue if individual defaults are specified.
fillvalue = sentinel
def _namedzip_longest_factory(*iterables):
_compare_iterables_to_fields(len(iterables), len(named_tuple._fields))
zipped = _create_zip(*iterables, fillvalue=fillvalue, type_longest=True)
return _namedzip_generator(zipped, named_tuple, defaults)
if iterables:
return _namedzip_longest_factory(*iterables)
else:
return _namedzip_longest_factory
def _compare_iterables_to_fields(iterable_count, field_count):
"""Compare number of iterable object and field names.
Parameters
----------
iterable_count : int
Number of iterable objects.
field_count : int
Number of named tuple field names.
Raises
------
ValueError
If `iterable_count` is not equal to `field_count`.
"""
if iterable_count != field_count:
raise ValueError(
"Unequal number of iterable objects ({}) and field names ({}).".format(
iterable_count, field_count
)
)
def _create_zip(*iterables, fillvalue=None, type_longest=False):
"""Zips supplied iterables and returns a generator.
Aggregates `*iterables` using `zip` or `itertools.zip_longest`,
depending on the value of the `type_longest` parameter.
Parameters
----------
*iterables : iterable
Iterable objects passed as positional arguments. Passed on to
`zip` or `zip_longest` depending on the `type_longest` flag.
fillvalue : optional
Set all missing values to this default value. Only specified
when called by `namedzip_longest`. (default is None).
type_longest : bool, optional
Specifies whether to use `zip_longest` over `zip`. Used by
`namedzip_longest`. (default is False).
Returns
------
generator object
"""
if type_longest:
from itertools import zip_longest
zipped = zip_longest(*iterables, fillvalue=fillvalue)
else:
zipped = zip(*iterables)
return zipped
def _namedzip_generator(zipped, named_tuple, defaults=None):
"""Generates named tuple objects.
Generates named tuple objects from tuples in `zipped`, based on
the `named_tuple` class. Also replaces sentinel values in `zipped` if
`defaults` are specified.
Parameters
----------
zipped : iterable
Should be generator produced by `zip` or zip_longest`.
named_tuple : tuple subclass
tuple subclass from `collections.namedtuple` factory function,
or subclass of `typing.NamedTuple`.
defaults : iterable or None, optional
Default values for each index of tuples generated by `zipped`.
(default is None).
Yields
------
named tuple object
"""
for vals in zipped:
if defaults:
vals = (x if x is not sentinel else defaults[i] for i, x in enumerate(vals))
yield named_tuple(*vals)
def _set_defaults(defaults, fillvalue, named_tuple):
"""Set default values to be used by `_namedzip_generator`.
Set values form `defaults` if not None, otherwise check if default
values are specified in the `named_tuple` class.
Defaults are applied to the last n fields similar to how the
`defaults` parameter of `collections.namedtuple` works. If fewer
defaults than field names are specified, missing (leading) default
values will be set to `fillvalue`.
Parameters
----------
defaults : iterable or None
Specified as keyword argument to `namedzip_longest` interface.
fillvalue :
Specified as keyword argument to `namedzip_longest` interface.
named_tuple : tuple subclass
tuple subclass from `collections.namedtuple` factory function,
or subclass of `typing.NamedTuple`.
Returns
-------
tuple or None
Raises
------
ValueError
If the number of default values is larger than the number of
named tuple field names.
"""
if defaults is not None:
# Default values specified in interface kwarg take priority.
defaults = tuple(defaults)
if len(defaults) > len(named_tuple._fields):
raise ValueError(
"Received more default values ({}) than field names ({}).".format(
len(defaults), len(named_tuple._fields)
)
)
elif len(named_tuple._fields) > len(defaults):
padded_defaults = [fillvalue] * (len(named_tuple._fields) - len(defaults))
padded_defaults.extend(defaults)
defaults = tuple(padded_defaults)
else:
# Defaults attribute can be called `_field_defaults` or `_fields_defaults`.
nt_defaults = getattr(named_tuple, "_fields_defaults", None) or getattr(
named_tuple, "_field_defaults", None
)
if nt_defaults: # Can be empty dict.
defaults = tuple(
nt_defaults.get(field, fillvalue) for field in (named_tuple._fields)
)
return defaults
def _verify_named_tuple(named_tuple):
"""Attempt to verify `named_tuple` object.
Parameters
----------
named_tuple : named tuple class
tuple subclass from `collections.namedtuple` factory function,
or subclass of `typing.NamedTuple`.
Raises
------
TypeError
If `named_tuple` does not appear to be a named tuple class.
"""
if not bool(
isclass(named_tuple)
and issubclass(named_tuple, tuple)
and callable(named_tuple)
and hasattr(named_tuple, "_fields")
):
raise TypeError(
"named_tuple parameter should be a tuple subclass created "
"by the collections.namedtuple factory function, or a "
"subclass of typing.NamedTuple."
)