Stream API
fpstreams.AsyncStream
Bases: Generic[T]
AsyncStream Implementation. Wrapper around an AsyncIterator for processing I/O bound tasks.
__aexit__(exc_type, exc_val, exc_tb)
async
Ensures underlying resources (like file handles) are closed.
of(*elements)
staticmethod
Creates a stream from a sequence of values. Usage: AsyncStream.of(1, 2, 3)
from_iterable(iterable)
staticmethod
Creates a stream from a synchronous iterable (list, range, etc).
from_aiterable(aiterable)
staticmethod
Wraps an existing async iterable or generator.
from_file(path, encoding='utf-8')
staticmethod
Reads a file line-by-line asynchronously.
Requires aiofiles to be installed.
interval(seconds)
staticmethod
Emits an increasing integer counter every N seconds. Infinite stream.
from_paginated(first_page_fetcher, next_page_fetcher, data_extractor, has_next)
staticmethod
Stream from a paginated API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
first_page_fetcher
|
Callable[[], Awaitable[Any]]
|
Async func to get the first response. |
required |
next_page_fetcher
|
Callable[[Any], Awaitable[Any]]
|
Async func taking previous response to get next. |
required |
data_extractor
|
Callable[[Any], Iterable[U]]
|
Func extracting a list of items from the response. |
required |
has_next
|
Callable[[Any], bool]
|
Func checking if more pages exist. |
required |
map(mapper)
Applies a synchronous function to each element.
map_async(func)
Maps elements to Coroutines/Awaitables.
Returns a stream of PENDING tasks. Use .gather() to execute them.
debounce(wait_ms)
Drops items that arrive less than 'wait_ms' after the previous item. Useful for filtering high-frequency sensor data or event streams.
concurrent(limit)
Sets the max number of concurrent tasks for the next .gather() call.
unordered()
Allows .gather() to yield results out-of-order (faster).
gather()
Executes pending coroutines generated by .map_async().
Applies concurrency limits and ordering rules.
timeout(seconds)
Wraps pending coroutines in asyncio.wait_for with a timeout.
to_list()
async
Collects all elements into a list.
for_each(func)
async
Executes func for each element.
to_file_async(path, encoding='utf-8')
async
Writes elements to a file asynchronously.
collect(collector)
async
Materializes the stream and applies a standard (sync) Collector.
Example: await stream.collect(Collectors.to_list())