Source code for carabiner.cast

"""Tweaked casting between Python types."""

from typing import Any, List, TextIO, Union

from collections.abc import Iterable

from functools import singledispatch
from itertools import chain
import gzip
from io import StringIO, TextIOWrapper

from .itertools import batched


[docs] @singledispatch def clist(x) -> List: """Cast an object to a list. Parameters ---------- x : str or iterable Object to cast. Returns ------- list Examples -------- >>> clist('Hello world') ['Hello world'] >>> clist(('Hello', 'world')) # doctest: +SKIP ['Hello', 'world'] """ try: return list(x) except TypeError: ## if x is not iterable return [x]
@clist.register def _(x: str) -> List[str]: return [x] @clist.register def _(x: int) -> List[int]: return [x] @clist.register def _(x: float) -> List[float]: return [x] @clist.register def _(x: TextIOWrapper) -> List[TextIOWrapper]: return [x]
[docs] @singledispatch def cio(x, *args, **kwargs) -> TextIOWrapper: r"""Cast an object to a file-like object. Addtional parameters are passed to `open` or `gzip.open`. If the input is a string, then it is treated as a filename. If it ends in `.gz` or `.gzip`, then it is assumed to be a GZIP-compressed file. If the input is an open file, then the file is closed and reopened with the provided parameters. This can be useful to close for writing and open for reading. If the input is a `io.StringIO` object, then the cursor is moved to the start and the object is returned. Parameters ---------- x : str or iterable Object to cast. Returns ------- TextIOWrapper Examples -------- >>> cio('Hello.txt') # doctest: +SKIP <_io.TextIOWrapper name='Hello.txt' mode='r' encoding='UTF-8'> >>> cio('Hello.txt', 'w') <_io.TextIOWrapper name='Hello.txt' mode='w' encoding='UTF-8'> >>> list(cio(StringIO("Hello\nworld"))) ['Hello\n', 'world'] """ raise TypeError(f"Can't force object of type {type(x).__name__} " "to be a file-like")
@cio.register def _(x: str, *args, **kwargs): if x.endswith('.gz') or x.endswith('.gzip'): return gzip.open(x, *args, **kwargs) else: return open(x, *args, **kwargs) @cio.register def _(x: TextIOWrapper, *args, **kwargs): if x.name in ('<stdin>', '<stdout>'): return x else: return cio(x.name, *args, **kwargs) @cio.register def _(x: StringIO, *args, **kwargs): x.seek(0) return x
[docs] @singledispatch def cstr(x) -> str: """Cast an object to a string. If the object is a file-like object, return the filename. Parameters ---------- x Object to cast. Returns ------- str Examples -------- >>> cstr(["Hello", "world"]) "['Hello', 'world']" >>> cstr(open("test/test-file.txt", "w")) 'test/test-file.txt' """ return str(x)
@cstr.register def _(x: TextIOWrapper): return x.name CASTING_FUNCTIONS = dict(str=cstr, list=clist, TextIO=cio, TextIOWrapper=cio) def _cast_base(object: Any, to: str, *args, **kwargs): try: casting_function = CASTING_FUNCTIONS[to] except KeyError: raise TypeError(f"Cannot cast {type(object).__name__} to {to}.") return casting_function(object, *args, **kwargs) @singledispatch def _cast(to, object: Any, *args, **kwargs) -> Any: return _cast_base(object, str(to), *args, **kwargs) @_cast.register def _(to: type, object: Any, *args, **kwargs) -> Any: return _cast_base(object, to.__name__, *args, **kwargs) @_cast.register def _(to: str, object: Any, *args, **kwargs) -> Any: return _cast_base(object, to, *args, **kwargs)
[docs] def cast(object: Any, to: Union[str, type], *args, **kwargs) -> Any: """Cast an object to another type. Parameters ---------- x Object to cast. to : str, type Type to cast to. Available: `str`, `list`, `TextIO`, `TextIOWrapper` Returns ------- Casted object. Raises ------ TypeError If the object cannot be casted to the type. Examples -------- >>> cast("abc", list) ['abc'] >>> cast(["abc"], 'str') "['abc']" """ return _cast(to, object=object, *args, **kwargs)
[docs] @singledispatch def flatten(x) -> Any: """Return the only item for iterables of length one. If `x` is a lazy iterator, then this returns a lazy iterator if more than one item in `x`. Parameters ---------- x : str, iterable Object to flatten. Returns ------- iterable or item Raises ------ TypeError If `x` is not a supported type. Examples -------- >>> flatten("Hello") 'Hello' >>> flatten(["Hello"]) 'Hello' >>> flatten(["Hello", "world"]) ['Hello', 'world'] >>> flatten(tuple('abc')) ('a', 'b', 'c') >>> list(flatten(range(3))) [0, 1, 2] >>> flatten(range(1)) 0 """ raise TypeError(f"Cannot flatten object of type {type(x).__name__}.")
@flatten.register def _(x: list) -> Any: if len(x) == 1: return x[0] else: return x @flatten.register def _(x: str) -> Any: return x @flatten.register def _(x: int) -> Any: return x @flatten.register def _(x: float) -> Any: return x @flatten.register def _(x: tuple) -> Any: return tuple(flatten(list(x))) @flatten.register def _(x: set) -> Any: if len(x) == 1: return list(x)[0] else: return x @flatten.register def _(x: Iterable) -> Any: iterators = [] make_list = True for i, batch in enumerate(batched(x, 1000)): if make_list: batch = list(batch) if i == 0: if len(batch) == 1: return batch[0] else: make_list = False iterators.append(batch) return chain.from_iterable(iterators)