danom documentation

class danom.Err(error: E_co | Exception = None, input_args: tuple[()] | tuple[tuple[Any, ...], dict[str, Any]] | tuple[object, tuple[Any, ...], dict[str, Any]] = ())

Bases: Result, Generic

and_then(func: ~collections.abc.Callable[[~P], ~danom._result.ResultReturnType], **kwargs: ~typing.~P) Err

Pipe another function that returns a monad. For Err will return original error.

from danom import Err, Ok

Ok(1).and_then(add_one) == Ok(2)
Ok(1).and_then(raise_err) == Err(error=TypeError())
Err(error=TypeError()).and_then(add_one) == Err(error=TypeError())
Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
details: list[dict[str, Any]]
error: E_co | Exception
input_args: tuple[()] | tuple[tuple[Any, ...], dict[str, Any]] | tuple[object, tuple[Any, ...], dict[str, Any]]
is_ok() Literal[False]

Returns True if the result type is Ok. Returns False if the result type is Err.

from danom import Err, Ok

Ok().is_ok() == True
Err().is_ok() == False
map(func: ~collections.abc.Callable[[~P], ~danom._result.U_co], **kwargs: ~typing.~P) Err

Pipe a pure function and wrap the return value with Ok. Given an Err will return self.

from danom import Err, Ok

Ok(1).map(add_one) == Ok(2)
Err(error=TypeError()).map(add_one) == Err(error=TypeError())
match(if_ok_func: Callable[[P], U_co], if_err_func: Callable[[P], U_co]) ResultReturnType

Map ok_func to Ok and err_func to Err

from danom import Err, Ok

Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')
unwrap() None

Unwrap the Ok monad and get the inner value. Unwrap the Err monad will raise the inner error.

from danom import Err, Ok

Ok().unwrap() == None
Ok(1).unwrap() == 1
Ok("ok").unwrap() == 'ok'
Err(error=TypeError()).unwrap() raise TypeError(...)
class danom.Ok(inner: Any = None)

Bases: Result, Generic

and_then(func: ~collections.abc.Callable[[~P], ~danom._result.ResultReturnType], **kwargs: ~typing.~P) ResultReturnType

Pipe another function that returns a monad. For Err will return original error.

from danom import Err, Ok

Ok(1).and_then(add_one) == Ok(2)
Ok(1).and_then(raise_err) == Err(error=TypeError())
Err(error=TypeError()).and_then(add_one) == Err(error=TypeError())
Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
inner: Any
is_ok() Literal[True]

Returns True if the result type is Ok. Returns False if the result type is Err.

from danom import Err, Ok

Ok().is_ok() == True
Err().is_ok() == False
map(func: ~collections.abc.Callable[[~P], ~danom._result.U_co], **kwargs: ~typing.~P) Ok

Pipe a pure function and wrap the return value with Ok. Given an Err will return self.

from danom import Err, Ok

Ok(1).map(add_one) == Ok(2)
Err(error=TypeError()).map(add_one) == Err(error=TypeError())
match(if_ok_func: Callable[[P], U_co], if_err_func: Callable[[P], U_co]) ResultReturnType

Map ok_func to Ok and err_func to Err

from danom import Err, Ok

Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')
unwrap() T_co

Unwrap the Ok monad and get the inner value. Unwrap the Err monad will raise the inner error.

from danom import Err, Ok

Ok().unwrap() == None
Ok(1).unwrap() == 1
Ok("ok").unwrap() == 'ok'
Err(error=TypeError()).unwrap() raise TypeError(...)
class danom.Result

Bases: ABC

Result monad. Consists of Ok and Err for successful and failed operations respectively. Each monad is a frozen instance to prevent further mutation.

abstractmethod and_then(func: ~collections.abc.Callable[[~P], ~danom._result.ResultReturnType], **kwargs: ~typing.~P) ResultReturnType

Pipe another function that returns a monad. For Err will return original error.

from danom import Err, Ok

Ok(1).and_then(add_one) == Ok(2)
Ok(1).and_then(raise_err) == Err(error=TypeError())
Err(error=TypeError()).and_then(add_one) == Err(error=TypeError())
Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
abstractmethod is_ok() bool

Returns True if the result type is Ok. Returns False if the result type is Err.

from danom import Err, Ok

Ok().is_ok() == True
Err().is_ok() == False
abstractmethod map(func: ~collections.abc.Callable[[~P], ~danom._result.U_co], **kwargs: ~typing.~P) ResultReturnType

Pipe a pure function and wrap the return value with Ok. Given an Err will return self.

from danom import Err, Ok

Ok(1).map(add_one) == Ok(2)
Err(error=TypeError()).map(add_one) == Err(error=TypeError())
abstractmethod match(if_ok_func: Callable[[P], U_co], if_err_func: Callable[[P], U_co]) ResultReturnType

Map ok_func to Ok and err_func to Err

from danom import Err, Ok

Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')
classmethod unit(inner: T_co) Ok

Unit method. Given an item of type T_co return Ok(T_co)

from danom import Err, Ok, Result

Result.unit(0) == Ok(inner=0)
Ok.unit(0) == Ok(inner=0)
Err.unit(0) == Ok(inner=0)
abstractmethod unwrap() T_co

Unwrap the Ok monad and get the inner value. Unwrap the Err monad will raise the inner error.

from danom import Err, Ok

Ok().unwrap() == None
Ok(1).unwrap() == 1
Ok("ok").unwrap() == 'ok'
Err(error=TypeError()).unwrap() raise TypeError(...)
class danom.Stream(seq: Iterable, ops: tuple = ())

Bases: _BaseStream

An immutable lazy iterator with functional operations.

Why bother?

Readability counts, abstracting common operations helps reduce cognitive complexity when reading code.

Comparison

Take this imperative pipeline of operations, it iterates once over the data, skipping the value if it fails one of the filter checks:

res = []

for x in range(1_000_000):
    item = triple(x)

    if not is_gt_ten(item):
        continue

    item = min_two(item)

    if not is_even_num(item):
        continue

    item = square(item)

    if not is_lt_400(item):
        continue

    res.append(item)
[100, 256]

number of tokens: 90

number of keywords: 11

keyword breakdown: {‘for’: 1, ‘in’: 1, ‘if’: 3, ‘not’: 3, ‘continue’: 3}

After a bit of experience with python you might use list comprehensions, however this is arguably _less_ clear and iterates multiple times over the same data

mul_three = [triple(x) for x in range(1_000_000)]
gt_ten = [x for x in mul_three if is_gt_ten(x)]
sub_two = [min_two(x) for x in gt_ten]
is_even = [x for x in sub_two if is_even_num(x)]
squared = [square(x) for x in is_even]
lt_400 = [x for x in squared if is_lt_400(x)]
[100, 256]

number of tokens: 92

number of keywords: 15

keyword breakdown: {‘for’: 6, ‘in’: 6, ‘if’: 3}

This still has a lot of tokens that the developer has to read to understand the code. The extra keywords add noise that cloud the actual transformations.

Using a Stream results in this:

from danom import Stream

(
    Stream.from_iterable(range(1_000_000))
    .map(triple)
    .filter(is_gt_ten)
    .map(min_two)
    .filter(is_even_num)
    .map(square)
    .filter(is_lt_400)
    .collect()
)
(100, 256)

number of tokens: 60

number of keywords: 0

keyword breakdown: {}

The business logic is arguably much clearer like this.

async async_collect() Awaitable[tuple[U, ...]]

Async version of collect. Note that all functions in the stream should be Awaitable.

from danom import Stream

Stream.from_iterable(file_paths).map(async_read_files).async_collect()

If there are no operations in the Stream then this will act as a normal collect.

from danom import Stream

Stream.from_iterable(file_paths).async_collect()
collect() tuple[U, ...]

Materialise the sequence from the Stream.

from danom import Stream

stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
stream.collect() == (1, 2, 3, 4)
filter(*fns: FilterFn | AsyncFilterFn) Stream[T]

Filter the stream based on a predicate. Will return a new Stream with the modified sequence.

from danom import Stream

Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)

Simple functions can be passed in sequence to compose more complex filters

from danom import Stream

Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)
fold(initial: T, fn: Callable[[T, U], T], *, workers: int = 1, use_threads: bool = False) T

Fold the results into a single value. fold triggers an action so will incur a collect.

from danom import Stream

Stream.from_iterable([1, 2, 3, 4]).fold(0, lambda a, b: a + b) == 10
Stream.from_iterable([[1], [2], [3], [4]]).fold([0], lambda a, b: a + b) == [0, 1, 2, 3, 4]
Stream.from_iterable([1, 2, 3, 4]).fold(1, lambda a, b: a * b) == 24

As fold triggers an action, the parameters will be forwarded to the par_collect call if the workers are greater than 1. This will only effect the collect that is used to create the iterable to reduce, not the fold operation itself.

from danom import Stream

Stream.from_iterable([1, 2, 3, 4]).map(some_expensive_fn).fold(0, add, workers=4, use_threads=False)
classmethod from_iterable(it: Iterable) Stream[T]

This is the recommended way of creating a Stream object.

from danom import Stream

Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)
map(*fns: MapFn | AsyncMapFn) Stream[U]

Map a function to the elements in the Stream. Will return a new Stream with the modified sequence.

from danom import Stream

Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4)

This can also be mixed with safe functions:

from danom import Stream

Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(inner=1), Ok(inner=2), Ok(inner=3), Ok(inner=4))

@safe
def two_div_value(x: float) -> float:
    return 2 / x

Stream.from_iterable([0, 1, 2, 4]).map(two_div_value).collect() == (Err(error=ZeroDivisionError('division by zero')), Ok(inner=2.0), Ok(inner=1.0), Ok(inner=0.5))

Simple functions can be passed in sequence to compose more complex transformations

from danom import Stream

Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9)
par_collect(workers: int = 4, *, use_threads: bool = False) tuple[U, ...]

Materialise the sequence from the Stream in parallel.

from danom import Stream

stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
stream.par_collect() == (1, 2, 3, 4)

Use the workers arg to select the number of workers to use. Use -1 to use all available processors (except 1). Defaults to 4.

from danom import Stream

stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
stream.par_collect(workers=-1) == (1, 2, 3, 4)

For smaller I/O bound tasks use the use_threads flag as True. If False the processing will use ProcessPoolExecutor else it will use ThreadPoolExecutor.

from danom import Stream

stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
stream.par_collect(use_threads=True) == (1, 2, 3, 4)

Note that all operations should be pickle-able, for that reason Stream does not support lambdas or closures.

partition(fn: FilterFn, *, workers: int = 1, use_threads: bool = False) tuple[Stream[T], Stream[U]]

Similar to filter except splits the True and False values. Will return a two new Stream with the partitioned sequences.

Each partition is independently replayable.

from danom import Stream

part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
part1.collect() == (0, 2)
part2.collect() == (1, 3)

As partition triggers an action, the parameters will be forwarded to the par_collect call if the workers are greater than 1.

from danom import Stream

Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
part1.map(add_one).par_collect() == (4, 7, 10)
part2.collect() == (2, 4, 5, 7, 8, 10, 11)
tap(*fns: TapFn | AsyncTapFn) Stream[T]

Tap the values to another process that returns None. Will return a new Stream with the modified sequence.

The value passed to the tap function will be deep-copied to avoid any modification to the Stream item for downstream consumers.

from danom import Stream

Stream.from_iterable([0, 1, 2, 3]).tap(log_value).collect() == (0, 1, 2, 3)

Simple functions can be passed in sequence for multiple tap operations

from danom import Stream

Stream.from_iterable([0, 1, 2, 3]).tap(log_value, print_value).collect() == (0, 1, 2, 3)

tap is useful for logging and similar actions without effecting the individual items, in this example eligible and dormant users are logged using tap:

from danom import Stream

active_users, inactive_users = (
    Stream.from_iterable(users).map(parse_user_objects).partition(inactive_users)
)

active_users.filter(eligible_for_promotion).tap(log_eligible_users).map(
    construct_promo_email, send_with_confirmation
).collect()

inactive_users.tap(log_inactive_users).map(
    create_dormant_user_entry, add_to_dormant_table
).collect()
danom.all_of(*fns: Callable[[T_co], bool]) Callable[[T_co], bool]

True if all of the given functions return True.

from danom import all_of

is_valid_user = all_of(is_subscribed, is_active, has_2fa)
is_valid_user(user) == True
danom.any_of(*fns: Callable[[T_co], bool]) Callable[[T_co], bool]

True if any of the given functions return True.

from danom import any_of

is_eligible = any_of(has_coupon, is_vip, is_staff)
is_eligible(user) == True
danom.compose(*fns: Callable[[T_co], T_co | U_co]) Callable[[T_co], T_co | U_co]

Compose multiple functions into one.

The functions will be called in sequence with the result of one being used as the input for the next.

from danom import compose

add_two = compose(add_one, add_one)
add_two(0) == 2
add_two_is_even = compose(add_one, add_one, is_even)
add_two_is_even(0) == True
danom.identity(x: T_co) T_co

Basic identity function.

from danom import identity

identity("abc") == "abc"
identity(1) == 1
identity(ComplexDataType(a=1, b=2, c=3)) == ComplexDataType(a=1, b=2, c=3)
danom.invert(func: Callable[[T_co], bool]) Callable[[T_co], bool]

Invert a boolean function so it returns False where it would’ve returned True.

from danom import invert

invert(has_len)("abc") == False
invert(has_len)("") == True
danom.new_type(name: str, base_type: type, validators: Callable | Sequence[Callable] | None = None, converters: Callable | Sequence[Callable] | None = None, *, frozen: bool = True)

Create a NewType based on another type.

from danom import new_type

def is_positive(value):
    return value >= 0

ValidBalance = new_type("ValidBalance", float, validators=[is_positive])
ValidBalance("20") == ValidBalance(inner=20.0)

Unlike an inherited class, the type will not return True for an isinstance check.

isinstance(ValidBalance(20.0), ValidBalance) == True
isinstance(ValidBalance(20.0), float) == False

The methods of the given base_type will be forwarded to the specialised type. Alternatively the map method can be used to return a new type instance with the transformation.

from danom import new_type

def has_len(email: str) -> bool:
return len(email) > 0

Email = new_type("Email", str, validators=[has_len])
Email("some_email@domain.com").upper() == "SOME_EMAIL@DOMAIN.COM"
Email("some_email@domain.com").map(str.upper) == Email(inner='SOME_EMAIL@DOMAIN.COM')
danom.none_of(*fns: Callable[[T_co], bool]) Callable[[T_co], bool]

True if none of the given functions return True.

from danom import none_of

is_valid = none_of(is_empty, exceeds_size_limit, contains_unsupported_format)
is_valid(submission) == True
danom.safe(func: Callable[[...], U]) Callable[[...], Result]

Decorator for functions that wraps the function in a try except returns Ok on success else Err.

from danom import safe

@safe
def add_one(a: int) -> int:
    return a + 1

add_one(1) == Ok(inner=2)
danom.safe_method(func: Callable[[...], U]) Callable[[...], Result]

The same as safe except it forwards on the self of the class instance to the wrapped function.

from danom import safe_method

class Adder:
    def __init__(self, result: int = 0) -> None:
        self.result = result

    @safe_method
    def add_one(self, a: int) -> int:
        return self.result + 1

Adder.add_one(1) == Ok(inner=1)

Indices and tables