danom documentation¶
- class danom.Either¶
Bases:
ABC,GenericEithermonad. Consists ofRightandLeftfor successful and failed operations respectively. Each monad is a frozen instance to prevent further mutation.- abstractmethod and_then(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.Either[~danom._either.U_co, ~danom._either.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Either[U_co, E_co]¶
Pipe another function that returns a monad. For
Leftwill return original inner.from danom import Left, Right Right(1).and_then(add_one) == Right(2) Right(1).and_then(raise_err) == Left(TypeError()) Left(TypeError()).and_then(add_one) == Left(TypeError()) Left(TypeError()).and_then(raise_value_err) == Left(TypeError())
- static either_is_ok(result: Either[T_co, E_co]) bool¶
Check whether the monad is ok. Allows for
filterorpartitionin aStreamwithout needing a lambda or custom function.from danom import Stream, Either Stream.from_iterable([Right(), Right(), Left()]).filter(Either.either_is_ok).collect() == (Right(), Right())
- static either_unwrap(result: Either[T_co, E_co]) T_co¶
Unwrap the Right or
Leftmonad to get the inner value.from danom import Stream, Either oks, errs = Stream.from_iterable([Right(1), Right(2), Left()]).partition(Either.either_is_ok) oks.map(Either.either_unwrap).collect == (1, 2)
- abstractmethod is_ok() bool¶
Returns
Trueif the result type isRight. ReturnsFalseif the result type isLeft.>>> from danom import Left, Right >>> Right().is_ok() == True True >>> Left().is_ok() == False True
- abstractmethod map(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Either[U_co, E_co]¶
Pipe a pure function and wrap the return value with
Right. Given anLeftwill returnself.from danom import Left, Right Right(1).map(add_one) == Right(2) Left(1).map(add_one) == Left(1)
- abstractmethod map_err(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Either[U_co, E_co]¶
Pipe a pure function and wrap the return value with
Left. Given anRightwill returnself.from danom import Left, Right Left(TypeError()).map_err(type_err_to_value_err) == Left(ValueError()) Right(1).map(type_err_to_value_err) == Right(1)
- abstractmethod or_else(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.Either[~danom._either.U_co, ~danom._either.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Either[U_co, E_co]¶
Pipe a function that returns a monad to recover from an
Left. ForRightwill return originalEither.from danom import Left, Right Right(1).or_else(replace_err_with_zero) == Right(1) Left(TypeError()).or_else(replace_err_with_zero) == Right(0)
- classmethod unit(inner: T_co) Right[T_co]¶
Unit method. Given an item of type
TreturnRight(T)>>> from danom import Left, Right, Either >>> Either.unit(0) == Right(inner=0) True >>> Right.unit(0) == Right(inner=0) True >>> Left.unit(0) == Right(inner=0) True
- abstractmethod unwrap() T_co¶
Unwrap the Right or
Leftmonad to get the inner value.>>> from danom import Left, Right >>> Right().unwrap() == None True >>> Right(1).unwrap() == 1 True >>> Right("ok").unwrap() == 'ok' True >>> Left(-1).unwrap() == -1 True
- class danom.Err(error: Any = None, input_args: tuple[()] | tuple[tuple[Any, ...], dict[str, Any]] | tuple[object, tuple[Any, ...], dict[str, Any]] = (), traceback: str = '')¶
Bases:
Result[Never,E_co]- and_then(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.Result[~danom._result.U_co, ~danom._result.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe another function that returns a monad. For
Errwill return original error.from danom import Err, Ok Ok(1).and_then(add_one) == Ok(2) Ok(1).and_then(raise_err) == Err(error=TypeError()) Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
- details: list[dict[str, Any]]¶
- error: Any¶
- input_args: tuple[()] | tuple[tuple[Any, ...], dict[str, Any]] | tuple[object, tuple[Any, ...], dict[str, Any]]¶
- is_ok() Literal[False]¶
Returns
Trueif the result type isOk. ReturnsFalseif the result type isErr.>>> from danom import Err, Ok >>> Ok().is_ok() == True True >>> Err().is_ok() == False True
- map(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe a pure function and wrap the return value with
Ok. Given anErrwill return self.from danom import Err, Ok Ok(1).map(add_one) == Ok(2) Err(error=TypeError()).map(add_one) == Err(error=TypeError())
- map_err(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Err[F_co]¶
Pipe a pure function and wrap the return value with
Err. Given anOkwill return self.from danom import Err, Ok Err(error=TypeError()).map_err(type_err_to_value_err) == Err(error=ValueError()) Ok(1).map(type_err_to_value_err) == Ok(1)
- or_else(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.Result[~danom._result.U_co, ~danom._result.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Result[U_co, E_co]¶
Pipe a function that returns a monad to recover from an
Err. ForOkwill return originalResult.from danom import Err, Ok Ok(1).or_else(replace_err_with_zero) == Ok(1) Err(error=TypeError()).or_else(replace_err_with_zero) == Ok(0)
- traceback: str¶
- unwrap() T_co¶
Unwrap the
Okmonad and get the inner value. Unwrap theErrmonad will raise the inner error.>>> from danom import Err, Ok >>> Ok().unwrap() == None True >>> Ok(1).unwrap() == 1 True >>> Ok("ok").unwrap() == 'ok' True >>> Err(error=TypeError()).unwrap() Traceback (most recent call last): ... TypeError:
- class danom.Left(inner: Any = None)¶
Bases:
Either[Never,E_co]- and_then(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.Either[~danom._either.U_co, ~danom._either.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe another function that returns a monad. For
Leftwill return original inner.from danom import Left, Right Right(1).and_then(add_one) == Right(2) Right(1).and_then(raise_err) == Left(TypeError()) Left(TypeError()).and_then(add_one) == Left(TypeError()) Left(TypeError()).and_then(raise_value_err) == Left(TypeError())
- inner: Any¶
- is_ok() Literal[False]¶
Returns
Trueif the result type isRight. ReturnsFalseif the result type isLeft.>>> from danom import Left, Right >>> Right().is_ok() == True True >>> Left().is_ok() == False True
- map(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe a pure function and wrap the return value with
Right. Given anLeftwill returnself.from danom import Left, Right Right(1).map(add_one) == Right(2) Left(1).map(add_one) == Left(1)
- map_err(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Left[F_co]¶
Pipe a pure function and wrap the return value with
Left. Given anRightwill returnself.from danom import Left, Right Left(TypeError()).map_err(type_err_to_value_err) == Left(ValueError()) Right(1).map(type_err_to_value_err) == Right(1)
- or_else(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.Either[~danom._either.U_co, ~danom._either.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Either[U_co, E_co]¶
Pipe a function that returns a monad to recover from an
Left. ForRightwill return originalEither.from danom import Left, Right Right(1).or_else(replace_err_with_zero) == Right(1) Left(TypeError()).or_else(replace_err_with_zero) == Right(0)
- unwrap() T_co¶
Unwrap the Right or
Leftmonad to get the inner value.>>> from danom import Left, Right >>> Right().unwrap() == None True >>> Right(1).unwrap() == 1 True >>> Right("ok").unwrap() == 'ok' True >>> Left(-1).unwrap() == -1 True
- class danom.Ok(inner: Any = None)¶
Bases:
Result[T_co,Never]- and_then(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.Result[~danom._result.U_co, ~danom._result.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Result[U_co, E_co]¶
Pipe another function that returns a monad. For
Errwill return original error.from danom import Err, Ok Ok(1).and_then(add_one) == Ok(2) Ok(1).and_then(raise_err) == Err(error=TypeError()) Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
- inner: Any¶
- is_ok() Literal[True]¶
Returns
Trueif the result type isOk. ReturnsFalseif the result type isErr.>>> from danom import Err, Ok >>> Ok().is_ok() == True True >>> Err().is_ok() == False True
- map(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Ok[U_co]¶
Pipe a pure function and wrap the return value with
Ok. Given anErrwill return self.from danom import Err, Ok Ok(1).map(add_one) == Ok(2) Err(error=TypeError()).map(add_one) == Err(error=TypeError())
- map_err(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe a pure function and wrap the return value with
Err. Given anOkwill return self.from danom import Err, Ok Err(error=TypeError()).map_err(type_err_to_value_err) == Err(error=ValueError()) Ok(1).map(type_err_to_value_err) == Ok(1)
- or_else(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.Result[~danom._result.U_co, ~danom._result.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe a function that returns a monad to recover from an
Err. ForOkwill return originalResult.from danom import Err, Ok Ok(1).or_else(replace_err_with_zero) == Ok(1) Err(error=TypeError()).or_else(replace_err_with_zero) == Ok(0)
- unwrap() T_co¶
Unwrap the
Okmonad and get the inner value. Unwrap theErrmonad will raise the inner error.>>> from danom import Err, Ok >>> Ok().unwrap() == None True >>> Ok(1).unwrap() == 1 True >>> Ok("ok").unwrap() == 'ok' True >>> Err(error=TypeError()).unwrap() Traceback (most recent call last): ... TypeError:
- class danom.Result¶
Bases:
ABC,GenericResultmonad. Consists ofOkandErrfor successful and failed operations respectively. Each monad is a frozen instance to prevent further mutation.- abstractmethod and_then(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.Result[~danom._result.U_co, ~danom._result.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Result[U_co, E_co]¶
Pipe another function that returns a monad. For
Errwill return original error.from danom import Err, Ok Ok(1).and_then(add_one) == Ok(2) Ok(1).and_then(raise_err) == Err(error=TypeError()) Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
- abstractmethod is_ok() bool¶
Returns
Trueif the result type isOk. ReturnsFalseif the result type isErr.>>> from danom import Err, Ok >>> Ok().is_ok() == True True >>> Err().is_ok() == False True
- abstractmethod map(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Result[U_co, E_co]¶
Pipe a pure function and wrap the return value with
Ok. Given anErrwill return self.from danom import Err, Ok Ok(1).map(add_one) == Ok(2) Err(error=TypeError()).map(add_one) == Err(error=TypeError())
- abstractmethod map_err(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Result[U_co, E_co]¶
Pipe a pure function and wrap the return value with
Err. Given anOkwill return self.from danom import Err, Ok Err(error=TypeError()).map_err(type_err_to_value_err) == Err(error=ValueError()) Ok(1).map(type_err_to_value_err) == Ok(1)
- abstractmethod or_else(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._result.T_co, ~P]], ~danom._result.Result[~danom._result.U_co, ~danom._result.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Result[U_co, E_co]¶
Pipe a function that returns a monad to recover from an
Err. ForOkwill return originalResult.from danom import Err, Ok Ok(1).or_else(replace_err_with_zero) == Ok(1) Err(error=TypeError()).or_else(replace_err_with_zero) == Ok(0)
- static result_is_ok(result: Result[T_co, E_co]) bool¶
Check whether the monad is ok. Allows for
filterorpartitionin aStreamwithout needing a lambda or custom function.from danom import Stream, Result Stream.from_iterable([Ok(), Ok(), Err()]).filter(Result.result_is_ok).collect() == (Ok(), Ok())
- static result_unwrap(result: Result[T_co, E_co]) T_co¶
Unwrap the
Okmonad and get the inner value. Unwrap theErrmonad will raise the inner error.from danom import Err, Ok, Stream, Result oks, errs = Stream.from_iterable([Ok(1), Ok(2), Err()]).partition(Result.result_is_ok) oks.map(Result.result_unwrap).collect == (1, 2)
- classmethod unit(inner: T_co) Ok[T_co]¶
Unit method. Given an item of type
TreturnOk(T)>>> from danom import Err, Ok, Result >>> Result.unit(0) == Ok(0) True >>> Ok.unit(0) == Ok(0) True >>> Err.unit(0) == Ok(0) True
- abstractmethod unwrap() T_co¶
Unwrap the
Okmonad and get the inner value. Unwrap theErrmonad will raise the inner error.>>> from danom import Err, Ok >>> Ok().unwrap() == None True >>> Ok(1).unwrap() == 1 True >>> Ok("ok").unwrap() == 'ok' True >>> Err(error=TypeError()).unwrap() Traceback (most recent call last): ... TypeError:
- class danom.Right(inner: Any = None)¶
Bases:
Either[T_co,Never]- and_then(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.Either[~danom._either.U_co, ~danom._either.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Either[U_co, E_co]¶
Pipe another function that returns a monad. For
Leftwill return original inner.from danom import Left, Right Right(1).and_then(add_one) == Right(2) Right(1).and_then(raise_err) == Left(TypeError()) Left(TypeError()).and_then(add_one) == Left(TypeError()) Left(TypeError()).and_then(raise_value_err) == Left(TypeError())
- inner: Any¶
- is_ok() Literal[True]¶
Returns
Trueif the result type isRight. ReturnsFalseif the result type isLeft.>>> from danom import Left, Right >>> Right().is_ok() == True True >>> Left().is_ok() == False True
- map(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Right[U_co]¶
Pipe a pure function and wrap the return value with
Right. Given anLeftwill returnself.from danom import Left, Right Right(1).map(add_one) == Right(2) Left(1).map(add_one) == Left(1)
- map_err(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.U_co], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe a pure function and wrap the return value with
Left. Given anRightwill returnself.from danom import Left, Right Left(TypeError()).map_err(type_err_to_value_err) == Left(ValueError()) Right(1).map(type_err_to_value_err) == Right(1)
- or_else(func: ~collections.abc.Callable[[~typing.Concatenate[~danom._either.T_co, ~P]], ~danom._either.Either[~danom._either.U_co, ~danom._either.E_co]], *args: ~typing.~P, **kwargs: ~typing.~P) Self¶
Pipe a function that returns a monad to recover from an
Left. ForRightwill return originalEither.from danom import Left, Right Right(1).or_else(replace_err_with_zero) == Right(1) Left(TypeError()).or_else(replace_err_with_zero) == Right(0)
- unwrap() T_co¶
Unwrap the Right or
Leftmonad to get the inner value.>>> from danom import Left, Right >>> Right().unwrap() == None True >>> Right(1).unwrap() == 1 True >>> Right("ok").unwrap() == 'ok' True >>> Left(-1).unwrap() == -1 True
- class danom.Stream(seq: tuple, ops: tuple = ())¶
Bases:
_BaseStream,GenericAn immutable lazy iterator with functional operations.
Why bother?¶
Readability counts, abstracting common operations helps reduce cognitive complexity when reading code.
Comparison¶
Take this imperative pipeline of operations, it iterates once over the data, skipping the value if it fails one of the filter checks:
res = [] for x in range(1_000_000): item = triple(x) if not is_gt_ten(item): continue item = min_two(item) if not is_even_num(item): continue item = square(item) if not is_lt_400(item): continue res.append(item) [100, 256]
number of tokens: 90
number of keywords: 11
keyword breakdown: {‘for’: 1, ‘in’: 1, ‘if’: 3, ‘not’: 3, ‘continue’: 3}
After a bit of experience with python you might use list comprehensions, however this is arguably _less_ clear and iterates multiple times over the same data
mul_three = [triple(x) for x in range(1_000_000)] gt_ten = [x for x in mul_three if is_gt_ten(x)] sub_two = [min_two(x) for x in gt_ten] is_even = [x for x in sub_two if is_even_num(x)] squared = [square(x) for x in is_even] lt_400 = [x for x in squared if is_lt_400(x)] [100, 256]
number of tokens: 92
number of keywords: 15
keyword breakdown: {‘for’: 6, ‘in’: 6, ‘if’: 3}
This still has a lot of tokens that the developer has to read to understand the code. The extra keywords add noise that cloud the actual transformations.
Using a
Streamresults in this:from danom import Stream ( Stream.from_iterable(range(1_000_000)) .map(triple) .filter(is_gt_ten) .map(min_two) .filter(is_even_num) .map(square) .filter(is_lt_400) .collect() ) (100, 256)
number of tokens: 60
number of keywords: 0
keyword breakdown: {}
The business logic is arguably much clearer like this.
Version changes¶
0.13.0:Stream.map,Stream.filterandStream.tapnow take kwargs andpartialthem into the passed in function.- async async_collect() Awaitable[tuple[U, ...]]¶
Async version of collect. Note that all functions in the stream should be
Awaitable.from danom import Stream Stream.from_iterable(file_paths).map(async_read_files).async_collect()
If there are no operations in the
Streamthen this will act as a normal collect.from danom import Stream Stream.from_iterable(file_paths).async_collect()
- collect() tuple[U, ...]¶
Materialise the sequence from the
Stream.from danom import Stream stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) stream.collect() == (1, 2, 3, 4)
- filter(fn: ~collections.abc.Callable[[~P], bool] | ~collections.abc.Callable[[~P], ~collections.abc.Awaitable[bool]], *args: ~typing.~P, **kwargs: ~typing.~P) Stream[T]¶
Filter the stream based on a predicate. Will return a new
Streamwith the modified sequence.>>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2) True
Keyword arguments can be passed into the functions:
from danom import Stream Stream.from_iterable(range(20)).filter(divisible_by, x=3).filter(divisible_by, x=5).collect() == (0, 15)
- fold(initial: T, fn: Callable[[T, U], T], *, workers: int = 1, use_threads: bool = False) T¶
Fold the results into a single value.
foldtriggers an action so will incur acollect.>>> from danom import Stream >>> Stream.from_iterable([1, 2, 3, 4]).fold(0, lambda a, b: a + b) == 10 True >>> Stream.from_iterable([[1], [2], [3], [4]]).fold([0], lambda a, b: a + b) == [0, 1, 2, 3, 4] True >>> Stream.from_iterable([1, 2, 3, 4]).fold(1, lambda a, b: a * b) == 24 True
As
foldtriggers an action, the parameters will be forwarded to thepar_collectcall if theworkersare greater than 1. This will only effect thecollectthat is used to create the iterable to reduce, not thefoldoperation itself.from danom import Stream Stream.from_iterable([1, 2, 3, 4]).map(some_expensive_fn).fold(0, add, workers=4, use_threads=False)
- classmethod from_iterable(it: Iterable) Stream[T]¶
This is the recommended way of creating a Stream object.
>>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3) True
- map(fn: ~collections.abc.Callable[[~P], ~danom._stream.U] | ~collections.abc.Callable[[~P], ~collections.abc.Awaitable[~danom._stream.U]], *args: ~typing.~P, **kwargs: ~typing.~P) Stream[T]¶
Map a function to the elements in the
Stream. Will return a newStreamwith the modified sequence.from danom import Stream Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4)
This can also be mixed with
safefunctions:from danom import Stream Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(1), Ok(2), Ok(3), Ok(4)) @safe def two_div_value(x: float) -> float: return 2 / x Stream.from_iterable([0, 1, 2, 4]).map(two_div_value).collect() == (Err(error=ZeroDivisionError('division by zero')), Ok(2.0), Ok(1.0), Ok(0.5))
Keyword arguments can be passed into the functions:
from danom import Stream Stream.from_iterable(range(5)).map(mul, b=2).map(add, b=1).collect() == (1, 3, 5, 7, 9)
- par_collect(workers: int = 4, *, use_threads: bool = False) tuple[U, ...]¶
Materialise the sequence from the
Streamin parallel.from danom import Stream stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) stream.par_collect() == (1, 2, 3, 4)
Use the
workersarg to select the number of workers to use. Use-1to use all available processors (except 1). Defaults to4.from danom import Stream stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) stream.par_collect(workers=-1) == (1, 2, 3, 4)
For smaller I/O bound tasks use the
use_threadsflag asTrue. If False the processing will useProcessPoolExecutorelse it will useThreadPoolExecutor.from danom import Stream stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) stream.par_collect(use_threads=True) == (1, 2, 3, 4)
Note that all operations should be pickle-able, for that reason
Streamdoes not support lambdas or closures.
- partition(fn: Callable[[P], bool], *, workers: int = 1, use_threads: bool = False) tuple[Stream[T], Stream[U]]¶
Similar to
filterexcept splits theTrueandFalsevalues. Will return a two newStreamwith the partitioned sequences.Each partition is independently replayable.
from danom import Stream >>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0) >>> part1.collect() == (0, 2) True >>> part2.collect() == (1, 3) True
As
partitiontriggers an action, the parameters will be forwarded to thepar_collectcall if theworkersare greater than 1.from danom import Stream Stream.from_iterable(range(10)).map(add_one).map(add_one).partition(divisible_by_3, workers=4) part1.map(add_one).par_collect() == (4, 7, 10) part2.collect() == (2, 4, 5, 7, 8, 10, 11)
- sequence(*, workers: int = 1, use_threads: bool = False) Result[T, E] | Either[T, E]¶
Convert a
StreamofResultorEithermonads to a monad of Stream>>> from danom import Ok, Stream >>> Stream.from_iterable((Ok(0), Ok(1), Ok(2))).sequence() == Ok(Stream.from_iterable((0, 1, 2))) True >>> Stream.from_iterable((Right(0), Right(1), Right(2))).sequence() == Right(Stream.from_iterable((0, 1, 2))) True >>> Stream.from_iterable((Ok(0), Err(1), Ok(2))).sequence() == Err(1) True >>> Stream.from_iterable((Right(0), Left(1), Right(2))).sequence() == Left(1) True
If the
Streamis of mixed monads, the final wrapped result will be ok the last seen monad type>>> from danom import Ok, Stream >>> Stream.from_iterable((Right(0), Right(1), Ok(2))).sequence() == Ok(Stream.from_iterable((0, 1, 2))) True
- tap(fn: ~collections.abc.Callable[[~P], None] | ~collections.abc.Callable[[~P], ~collections.abc.Awaitable[None]], *args: ~typing.~P, **kwargs: ~typing.~P) Stream[T]¶
Tap the values to another process that returns None. Will return a new
Streamwith the modified sequence.The value passed to the tap function will be deep-copied to avoid any modification to the
Streamitem for downstream consumers.from danom import Stream Stream.from_iterable([0, 1, 2, 3]).tap(log_value).collect() == (0, 1, 2, 3)
Simple functions can be passed in sequence for multiple
tapoperationsfrom danom import Stream Stream.from_iterable([0, 1, 2, 3]).tap(log_value).tap(print_value).collect() == (0, 1, 2, 3)
tapis useful for logging and similar actions without effecting the individual items, in this example eligible and dormant users are logged usingtap:from danom import Stream active_users, inactive_users = ( Stream.from_iterable(users).map(parse_user_objects).partition(inactive_users) ) active_users.filter(eligible_for_promotion).tap(log_eligible_users).map(construct_promo_email).map(send_with_confirmation).collect() inactive_users.tap(log_inactive_users).map(create_dormant_user_entry).map(add_to_dormant_table).collect()
- danom.all_of(*fns: Callable[[T_co], bool]) Callable[[T_co], bool]¶
Trueif all of the given functions returnTrue.from danom import all_of is_valid_user = all_of(is_subscribed, is_active, has_2fa) is_valid_user(user) == True
- danom.any_of(*fns: Callable[[T_co], bool]) Callable[[T_co], bool]¶
Trueif any of the given functions returnTrue.from danom import any_of is_eligible = any_of(has_coupon, is_vip, is_staff) is_eligible(user) == True
- danom.compose(*fns: Callable[[T_co], T_co | U_co]) Callable[[T_co], T_co | U_co]¶
Compose multiple functions into one.
The functions will be called in sequence with the result of one being used as the input for the next.
from danom import compose add_two = compose(add_one, add_one) add_two(0) == 2 add_two_is_even = compose(add_one, add_one, is_even) add_two_is_even(0) == True
- danom.identity(x: T_co) T_co¶
Basic identity function.
from danom import identity identity("abc") == "abc" identity(1) == 1 identity(ComplexDataType(a=1, b=2, c=3)) == ComplexDataType(a=1, b=2, c=3)
Papertrail examples:
>>> identity(1) == 1 True
>>> identity("abc") == "abc" True
>>> identity([0, 1, 2]) == [0, 1, 2] True
>>> identity(Ok(inner=1)) == Ok(inner=1) True
- danom.invert(func: Callable[[T_co], bool]) Callable[[T_co], bool]¶
Invert a boolean function so it returns
Falsewhere it would’ve returnedTrue.from danom import invert invert(has_len)("abc") == False invert(has_len)("") == True
- danom.new_type(name: str, base_type: type, validators: Callable | Sequence[Callable] | None = None, converters: Callable | Sequence[Callable] | None = None, *, frozen: bool = True)¶
Create a NewType based on another type.
>>> from danom import new_type >>> def is_positive[T](value: T) -> bool: ... return value >= 0 >>> ValidBalance = new_type("ValidBalance", float, validators=[is_positive]) >>> ValidBalance(20.0) == ValidBalance(inner=20.0) True
Unlike an inherited class, the type will not return
Truefor an isinstance check.isinstance(ValidBalance(20.0), ValidBalance) == True isinstance(ValidBalance(20.0), float) == False
The methods of the given
base_typewill be forwarded to the specialised type. Alternatively the map method can be used to return a new type instance with the transformation.from danom import new_type def has_len(email: str) -> bool: return len(email) > 0 Email = new_type("Email", str, validators=[has_len]) Email("some_email@domain.com").upper() == "SOME_EMAIL@DOMAIN.COM" Email("some_email@domain.com").map(str.upper) == Email(inner='SOME_EMAIL@DOMAIN.COM')
- danom.none_of(*fns: Callable[[T_co], bool]) Callable[[T_co], bool]¶
Trueif none of the given functions returnTrue.from danom import none_of is_valid = none_of(is_empty, exceeds_size_limit, contains_unsupported_format) is_valid(submission) == True
- danom.safe(func: Callable[[P], U]) Callable[[P], Result[U, Exception]]¶
Decorator for functions that wraps the function in a try except returns Ok on success else Err.
from danom import safe @safe def add_one(a: int) -> int: return a + 1 add_one(1) == Ok(inner=2)
- danom.safe_method(func: Callable[[Concatenate[T, P]], U]) Callable[[Concatenate[T, P]], Result[U, Exception]]¶
The same as
safeexcept it forwards on theselfof the class instance to the wrapped function.from danom import safe_method class Adder: def __init__(self, result: int = 0) -> None: self.result = result @safe_method def add_one(self, a: int) -> int: return self.result + 1 Adder.add_one(1) == Ok(inner=1)