fpstreams
A robust, type-safe functional programming library for Python.
fpstreams brings the power of Java Streams, Rust Results, and JavaScript Array methods to Python. It provides a fluent interface for data processing, null safety, and error handling without the boilerplate, all while remaining fully typed for IDE autocompletion.
Features
- Fluent Streams: Lazy evaluation chains (
map,filter,reduce,zip). - Parallel Processing: Automatic multi-core distribution with
.parallel(). - Clean Code Syntax: Syntactic sugar like
.pick()and.filter_none()to replace lambdas. - Data Science Ready: Convert streams directly to Pandas DataFrames, NumPy arrays, or CSV/JSON files.
- Null Safety:
Optionto eliminateNonechecks. - Error Handling:
Result(Success/Failure) to replace uglytry/exceptblocks.
Installation
pip install fpstreams
Quick Start
1. Basic
Replace messy loops with clean, readable pipelines.
from fpstreams import Stream, Collectors
data = ["apple", "banana", "cherry", "apricot", "blueberry"]
# Filter, transform, and group in one
result = (
Stream(data)
.filter(lambda s: s.startswith("a") or s.startswith("b"))
.map(str.upper)
.collect(Collectors.grouping_by(lambda s: s[0]))
)
# Output: {'A': ['APPLE', 'APRICOT'], 'B': ['BANANA', 'BLUEBERRY']}
2. Clean Code Shortcuts
Stop writing repetitive lambdas for dictionaries.
users = [
{"id": 1, "name": "Alice", "role": "admin"},
{"id": 2, "name": "Bob", "role": None},
{"id": 3, "name": None, "role": "user"},
]
names = (
Stream(users)
.pick("name") # Extract "name" key
.filter_none() # Remove None values
.to_list()
)
# Output: ["Alice", "Bob"]
3. Parallel Processing
fpstreams can automatically distribute heavy workloads across all CPU cores using the .parallel() method. It uses an optimized Map-Reduce architecture to minimize memory usage.
import math
from fpstreams import Stream
def heavy_task(x):
return math.factorial(5000)
# Automatically uses all available CPU cores
results = (
Stream(range(1000))
.parallel()
.map(heavy_task)
.to_list()
)
4. Data Science & I/O
Seamlessly integrate with the scientific stack.
# Quick statistics
stats = Stream([1, 2, 3, 4, 5, 100]).describe()
# Output: {'count': 6, 'sum': 115, 'mean': 19.16, 'min': 1, 'max': 100, ...}
# Convert to Pandas
df = Stream(users).to_df()
# Stream directly to file
Stream(users).to_csv("output.csv")
Stream(users).to_json("output.json")
Infinite Streams & Lazy Evaluation
Process massive datasets efficiently. Operations are only executed when needed.
def infinite_counter():
n = 0
while True:
yield n
n += 1
# Take only the first 10 even numbers
evens = (
Stream(infinite_counter())
.filter(lambda x: x % 2 == 0)
.limit(10)
.to_list()
)
Benchmark
Comparison between standard streams and fpstreams.parallel() on a 4-core machine:
| Task | Sequential(s) | Parallel(s) | Speedup |
|---|---|---|---|
| Heavy Calculation (Factorials) | 24.7603 | 10.8182 | 2.29x |
| I/O Simulation (Sleep) | 2.0986 | 0.8405 | 2.50x |
| Light Calculation (Multiplication) | 0.0151 | 0.3796 | 0.04x |
Note: Parallel streams have overhead. Use them for CPU-intensive tasks or slow I/O, not simple arithmetic.
Project Structure
Stream: The core wrapper for sequential data processing.ParallelStream: A multi-core wrapper for heavy parallel processing.Option: Null-safe container.Result: Error-handling container.Collectors: Accumulation utilities (grouping, joining, summary stats).
Functional Coverage & Roadmap
fpstreams already provides composable pipelines, collectors, and Option/Result containers, and there are clear next steps for additional combinators and richer statistics. There is also a candidate path to accelerate expensive calculations using a Rust extension module while keeping the Python API unchanged. See Functional Coverage & Roadmap for the full assessment.
Licence
This project is licensed under the MIT License - see the LICENSE file for details.