# Extensions
**Repository Path**: mirrors_reactiveui/Extensions
## Basic Information
- **Project Name**: Extensions
- **Description**: Extensions for System.Reactive
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-09-17
- **Last Updated**: 2026-02-15
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
[](https://github.com/reactiveui/Extensions/actions/workflows/ci-build.yml)
[](https://codecov.io/gh/reactiveui/Extensions)
# ReactiveUI.Extensions
A focused collection of high–value Reactive Extensions (Rx) operators that do **not** ship with `System.Reactive` but are commonly needed when building reactive .NET applications.
The goal of this library is to:
- Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.)
- Provide pragmatic, allocation?aware helpers for performance sensitive scenarios
- Avoid additional dependencies – only `System.Reactive` is required
Supported Target Frameworks: `.NET 4.6.2`, `.NET 4.7.2`, `.NET 4.8.1`, `.NET 8`, `.NET 9`, `.NET 10`.
---
## Table of Contents
1. [Installation](#installation)
2. [Quick Start](#quick-start)
3. [API Catalog](#api-catalog)
4. [Operator Categories & Examples](#operator-categories--examples)
- [Null / Signal Helpers](#null--signal-helpers)
- [Timing, Scheduling & Flow Control](#timing-scheduling--flow-control)
- [Inactivity / Liveness](#inactivity--liveness)
- [Error Handling & Resilience](#error-handling--resilience)
- [Combining, Partitioning & Logical Helpers](#combining-partitioning--logical-helpers)
- [Async / Task Integration](#async--task-integration)
- [Backpressure / Conflation](#backpressure--conflation)
- [Selective & Conditional Emission](#selective--conditional-emission)
- [Buffering & Transformation](#buffering--transformation)
- [Subscription / Side Effects](#subscription--side-effects)
- [Utility & Miscellaneous](#utility--miscellaneous)
5. [Performance Notes](#performance-notes)
6. [Thread Safety](#thread-safety)
7. [License](#license)
---
## Installation
```bash
# Package coming soon (example)
dotnet add package ReactiveUI.Extensions
```
Reference the project directly while developing locally.
---
## Quick Start
```csharp
using System;
using System.Reactive.Linq;
using ReactiveUI.Extensions;
var source = Observable.Interval(TimeSpan.FromMilliseconds(120))
.Take(10)
.Select(i => (long?) (i % 3 == 0 ? null : i));
// 1. Filter nulls + convert to a Unit signal.
var signal = source.WhereIsNotNull().AsSignal();
// 2. Add a heartbeat if the upstream goes quiet for 500ms.
var withHeartbeat = source.WhereIsNotNull()
.Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default);
// 3. Retry with exponential backoff up to 5 times.
var resilient = Observable.Defer(() =>
Observable.Throw(new InvalidOperationException("Boom")))
.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));
// 4. Conflate bursty updates.
var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default);
using (conflated.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
```
---
## API Catalog
Below is the full list of extension methods (grouped logically).
Some overloads omitted for brevity.
| Category | Operators |
|----------|-----------|
| Null & Signal | `WhereIsNotNull`, `AsSignal` |
| Timing & Scheduling | `SyncTimer`, `Schedule` (overloads), `ScheduleSafe`, `ThrottleFirst`, `ThrottleDistinct`, `DebounceImmediate` |
| Inactivity / Liveness | `Heartbeat`, `DetectStale`, `BufferUntilInactive` |
| Error Handling | `CatchIgnore`, `CatchAndReturn`, `OnErrorRetry` (overloads), `RetryWithBackoff` |
| Combining & Aggregation | `CombineLatestValuesAreAllTrue`, `CombineLatestValuesAreAllFalse`, `GetMax`, `GetMin`, `Partition` |
| Logical / Boolean | `Not`, `WhereTrue`, `WhereFalse` |
| Async / Task | `SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`, `SubscribeAsync` (overloads), `SynchronizeSynchronous`, `SynchronizeAsync`, `SubscribeSynchronous` (overloads), `ToHotTask` |
| Backpressure | `Conflate` |
| Filtering / Conditional | `Filter` (Regex), `TakeUntil` (predicate), `WaitUntil`, `SampleLatest`, `SwitchIfEmpty`, `DropIfBusy` |
| Buffering | `BufferUntil`, `BufferUntilInactive`, `BufferUntilIdle`, `Pairwise`, `ScanWithInitial` |
| Transformation & Utility | `Shuffle`, `ForEach`, `FromArray`, `Using`, `While`, `Start`, `OnNext` (params helper), `DoOnSubscribe`, `DoOnDispose`, `ToReadOnlyBehavior`, `ToPropertyObservable` |
---
## Operator Categories & Examples
### Null / Signal Helpers
```csharp
IObservable raw = GetPossiblyNullStream();
IObservable cleaned = raw.WhereIsNotNull();
IObservable signal = cleaned.AsSignal();
```
### Timing, Scheduling & Flow Control
```csharp
// Shared timer for a given period (one underlying timer per distinct TimeSpan)
var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1));
// Delay emission of a single value
42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default)
.Subscribe(v => Console.WriteLine($"Delayed: {v}"));
// Safe scheduling when a scheduler may be null
IScheduler? maybeScheduler = null;
maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline"));
// ThrottleFirst: allow first item per window, ignore rest
var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50))
.ThrottleFirst(TimeSpan.FromMilliseconds(200));
// DebounceImmediate: emit first immediately then debounce rest
var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40))
.DebounceImmediate(TimeSpan.FromMilliseconds(250));
// ThrottleDistinct: throttle but only emit when the value actually changes
var source = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(20);
var distinctThrottled = source.ThrottleDistinct(TimeSpan.FromMilliseconds(200));
```
### Inactivity / Liveness
```csharp
// Heartbeat emits IHeartbeat where IsHeartbeat == true during quiet periods
var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400))
.Take(5)
.Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default);
// DetectStale emits IStale: one stale marker after inactivity, or fresh update wrappers
var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
.Take(3)
.DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default);
// BufferUntilInactive groups events separated by inactivity
var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20);
var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200));
```
### Error Handling & Resilience
```csharp
var flaky = Observable.Create(o =>
{
o.OnNext(1);
o.OnError(new InvalidOperationException("Fail"));
return () => { };
});
// Ignore all errors and complete silently
a flakySafe = flaky.CatchIgnore();
// Replace error with a fallback value
var withFallback = flaky.CatchAndReturn(-1);
// Retry only specific exception type with logging
var retried = flaky.OnErrorRetry(ex => Console.WriteLine(ex.Message), retryCount: 3);
// Retry with exponential backoff
var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));
```
### Combining, Partitioning & Logical Helpers
```csharp
var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0);
var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0);
var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue();
var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse();
var numbers = Observable.Range(1, 10);
var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream
var toggles = a.Not(); // Negate booleans
```
### Async / Task Integration
```csharp
IObservable inputs = Observable.Range(1, 5);
// Sequential (preserves order)
var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; });
// Latest only (cancels previous)
var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; });
// Limited parallelism
var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2);
// Asynchronous subscription (serializing tasks)
inputs.SubscribeAsync(async i => await Task.Delay(10));
// Synchronous gate: ensures per-item async completion before next is emitted
a inputs.SubscribeSynchronous(async i => await Task.Delay(25));
// ToHotTask: convert an observable to a Task that starts immediately
var source = Observable.Return(42);
var task = source.ToHotTask();
var result = await task; // 42
```
### Backpressure / Conflation
```csharp
// Conflate: enforce minimum spacing between emissions while always outputting the most recent value
a var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30);
var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default);
```
### Selective & Conditional Emission
```csharp
// TakeUntil predicate (inclusive)
var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5);
// WaitUntil first match then complete
var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0);
// SampleLatest: sample the latest value whenever a trigger fires
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
var trigger = Observable.Interval(TimeSpan.FromMilliseconds(300)).Take(3);
var sampled = source.SampleLatest(trigger);
// SwitchIfEmpty: provide a fallback if the source completes without emitting
var empty = Observable.Empty();
var fallback = Observable.Return(42);
var result = empty.SwitchIfEmpty(fallback); // emits 42
// DropIfBusy: drop values if the previous async operation is still running
var inputs = Observable.Range(1, 5);
var processed = inputs.DropIfBusy(async x => { await Task.Delay(200); Console.WriteLine(x); });
```
### Buffering & Transformation
```csharp
// BufferUntil - collect chars between delimiters
var chars = "".ToCharArray().ToObservable();
var frames = chars.BufferUntil('<', '>'); // emits "", "", ""
// Shuffle arrays in-place
var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 });
var shuffled = arrays.Shuffle();
// BufferUntilIdle: emit a batch when the stream goes quiet
var events = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
var batches = events.BufferUntilIdle(TimeSpan.FromMilliseconds(250));
// Pairwise: emit consecutive pairs
var numbers = Observable.Range(1, 5);
var pairs = numbers.Pairwise(); // emits (1,2), (2,3), (3,4), (4,5)
// ScanWithInitial: scan that always emits the initial value first
var values = Observable.Return(5);
var accumulated = values.ScanWithInitial(10, (acc, x) => acc + x); // emits 10, then 15
```
### Subscription & Side Effects
```csharp
var stream = Observable.Range(1, 3)
.DoOnSubscribe(() => Console.WriteLine("Subscribed"))
.DoOnDispose(() => Console.WriteLine("Disposed"));
using (stream.Subscribe(Console.WriteLine))
{
// auto dispose at using end
}
```
### Utility & Miscellaneous
```csharp
// Emit list contents quickly with low allocations
var listSource = Observable.Return>(new List { 1, 2, 3 });
listSource.ForEach().Subscribe(Console.WriteLine);
// Using helper for deterministic disposal
var value = new MemoryStream().Using(ms => ms.Length);
// While loop (reactive)
var counter = 0;
ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter))
.Subscribe();
// Batch push with OnNext params
var subj = new Subject();
subj.OnNext(1, 2, 3, 4);
// ToReadOnlyBehavior: create a read-only behavior subject
var (observable, observer) = ReactiveExtensions.ToReadOnlyBehavior(10);
observer.OnNext(20); // observable emits 10, then 20
// ToPropertyObservable: observe property changes on INotifyPropertyChanged
public class ViewModel : INotifyPropertyChanged
{
private string _name;
public string Name
{
get => _name;
set { _name = value; PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Name))); }
}
public event PropertyChangedEventHandler? PropertyChanged;
}
var vm = new ViewModel();
var nameChanges = vm.ToPropertyObservable(x => x.Name);
vm.Name = "Hello"; // observable emits "Hello"
```
---
## Performance Notes
- `FastForEach` path avoids iterator allocations for `List`, `IList`, and arrays.
- `SyncTimer` ensures only one shared timer per period reducing timer overhead.
- `Conflate` helps tame high–frequency producers without dropping the final value of a burst.
- `Heartbeat` and `DetectStale` use lightweight scheduling primitives.
- Most operators avoid capturing lambdas in hot loops where practical.
## Thread Safety
- All operators are pure functional transformations unless documented otherwise.
- `SyncTimer` uses a `ConcurrentDictionary` and returns a hot `IConnectableObservable` that connects once per unique `TimeSpan`.
- Methods returning shared observables (`SyncTimer`, `Partition` result sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting.
## License
MIT – see LICENSE file.
---
## Contributing
Issues / PRs welcome. Please keep additions dependency–free and focused on broadly useful reactive patterns.
---
## Change Log (Excerpt)
(Keep this section updated as the library evolves.)
- Added async task projection helpers (`SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`).
- Added liveness operators (`Heartbeat`, `DetectStale`, `BufferUntilInactive`).
- Added resilience (`RetryWithBackoff`, expanded `OnErrorRetry` overloads).
- Added flow control (`Conflate`, `ThrottleFirst`, `DebounceImmediate`, `ThrottleDistinct`).
- Added buffering and transformation operators (`BufferUntilIdle`, `Pairwise`, `ScanWithInitial`).
- Added filtering and conditional operators (`SampleLatest`, `SwitchIfEmpty`, `DropIfBusy`).
- Added utility operators (`ToReadOnlyBehavior`, `ToHotTask`, `ToPropertyObservable`).
- Fixed `SynchronizeSynchronous` to properly propagate OnError and OnCompleted events.
- Removed DisposeWith extension use System.Reactive.Disposables.Fluent from System.Reactive.
---
Happy reactive coding!