Skip to content

Stream API

fpstreams.AsyncStream

Bases: Generic[T]

AsyncStream Implementation. Wrapper around an AsyncIterator for processing I/O bound tasks.

__aexit__(exc_type, exc_val, exc_tb) async

Ensures underlying resources (like file handles) are closed.

of(*elements) staticmethod

Creates a stream from a sequence of values. Usage: AsyncStream.of(1, 2, 3)

from_iterable(iterable) staticmethod

Creates a stream from a synchronous iterable (list, range, etc).

from_aiterable(aiterable) staticmethod

Wraps an existing async iterable or generator.

from_file(path, encoding='utf-8') staticmethod

Reads a file line-by-line asynchronously. Requires aiofiles to be installed.

interval(seconds) staticmethod

Emits an increasing integer counter every N seconds. Infinite stream.

from_paginated(first_page_fetcher, next_page_fetcher, data_extractor, has_next) staticmethod

Stream from a paginated API.

Parameters:

Name Type Description Default
first_page_fetcher Callable[[], Awaitable[Any]]

Async func to get the first response.

required
next_page_fetcher Callable[[Any], Awaitable[Any]]

Async func taking previous response to get next.

required
data_extractor Callable[[Any], Iterable[U]]

Func extracting a list of items from the response.

required
has_next Callable[[Any], bool]

Func checking if more pages exist.

required

map(mapper)

Applies a synchronous function to each element.

map_async(func)

Maps elements to Coroutines/Awaitables.

Returns a stream of PENDING tasks. Use .gather() to execute them.

debounce(wait_ms)

Drops items that arrive less than 'wait_ms' after the previous item. Useful for filtering high-frequency sensor data or event streams.

concurrent(limit)

Sets the max number of concurrent tasks for the next .gather() call.

unordered()

Allows .gather() to yield results out-of-order (faster).

gather()

Executes pending coroutines generated by .map_async().

Applies concurrency limits and ordering rules.

timeout(seconds)

Wraps pending coroutines in asyncio.wait_for with a timeout.

to_list() async

Collects all elements into a list.

for_each(func) async

Executes func for each element.

to_file_async(path, encoding='utf-8') async

Writes elements to a file asynchronously.

collect(collector) async

Materializes the stream and applies a standard (sync) Collector.

Example: await stream.collect(Collectors.to_list())