在上一篇文章的结尾,我原本计划在本篇介绍 System.Reactive的基础操作符,比如如何创建、转换和过滤数据流。但在撰写内容时,我意识到,对于刚接触System.Reactive的读者来说,直接介绍操作符可能有些仓促,因为 操作符的使用必须建立在对IObservable和IObserver这两个核心接口的深刻理解之上。正如在传统编程中,你需要先理解 集合(Collection)和迭代器(Iterator)的本质,才能更好地使用LINQ操作符一样。而在 Rx 中,IObservable是数据流的生产者,是数据流的消费者,理解这两个接口是掌握 Rx 的基础。因此,我决定调整顺序,在本篇文章中,深入介绍IObservable和IObserver的核心概念、方法和使用方式,为后续学习操作符打下坚实的基础。摘要:在上一篇文章的结尾,我原本计划在本篇介绍System.Reactive的基础操作符,比如如何创建、转换和过滤数据流。但在撰写内容时,我意识到,对于刚接触
在 Rx 中,数据流的生产和消费是通过 观察者模式(Observer Pattern)实现的。这种模式定义了两种角色:
IObservable(可观察对象/数据流的生产者)
IObserver(观察者/数据流的消费者)
二者的关系可以简单理解为:
IObservable负责“推送”数据项
。
IObserver负责“接收”数据项
。
订阅(Subscribe)是连接这两者的桥梁。当IObserver订阅一个IObservable时,数据流开始传递。1. IObservable 的定义和职责IObservable 接口定义public interface IObservable{
IDisposable Subscribe(IObserver observer);
}
2. IObserver 的定义和职责IObserver 接口定义public interface IObserver
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted;
}
3. IObservable 和 IObserver 的交互流程让我们通过一个实际的交互流程图来直观地理解IObservable和的关系:
观察者(Observer)通过 Subscribe方法订阅可观察对象(Observable)。
可观察对象(Observable)调用 Observer 的 OnNext方法推送数据。
如果发生错误,可观察对象(Observable)调用 OnError方法终止数据流。
如果数据流正常结束,可观察对象(Observable)调用 OnCompleted方法终止数据流。
为了更好地理解这两个接口,我们从零开始,手动实现一个简单的IObservable和。实现自定义 Observableusing System;using System.Threading;
using System.Threading.Tasks;
public sealed class SimpleObservable : IObservable
{
IDisposable IObservableint.Subscribe(IObserver observer)
{
SimpleDisposable disposable=new;
Task.Run(=>
{
// 模拟数据的生产,以及假设每次生产都需要时间,消费者可以随时调用Dispose方法取消订阅
for (int i=1 ; i5 ; i++)
{
if (disposable.IsDisposed)
{
return;
}
observer.OnNext(i);
// 模拟产生数据需要耗时50毫秒
Thread.Sleep(50 );
}
observer.OnCompleted;
});
return disposable;
}
private sealed class SimpleDisposable : IDisposable
{
internal bool IsDisposed { get; private set; }
void IDisposable.Dispose
{
IsDisposed=true;
Console.WriteLine("Subscription disposed.");
}
}
}
实现自定义 Observerusing System;
public sealed class SimpleObserver : IObserver
{
void IObserverint.OnNext(int value)=>Console.WriteLine($"Received: {value}");
void IObserverint.OnError(Exception error)=>Console.WriteLine($"Error: {error.Message}");
void IObserverint.OnCompleted=>Console.WriteLine("Sequence Completed.");
}
订阅和运行using System;
using System.Threading;
class Program
{
static void Main(string args)
{
IObservable observable=new SimpleObservable;
IObserver observer=new SimpleObserver;
IDisposable subscription=observable.Subscribe(observer);
// 模拟消费数据100毫秒后取消订阅
Thread.Sleep(100);
subscription.Dispose;
}
}
输出结果:
Received:1Received:2
Subscription disposed.
5. 常见问题解答Q1:为什么 Subscribe 方法返回 IDisposable?Subscribe方法返回一个IDisposable对象,允许订阅者在不再需要数据流时取消订阅,以释放资源,避免内存泄漏。Q2:OnError 和 OnCompleted 可以同时调用吗?
不能。数据流要么以错误终止,要么正常结束,二者是互斥的。
Q3:IObservable 可以被多个 IObserver 订阅吗?可以。一个IObservable在本篇文章中,我们深入探讨了IObservable和IObserver这两个核心接口的定义和职责,并通过代码示例展示了它们如何交互。核心要点:IObservable是数据流的生产者
,它负责推送数据。
IObserver是数据流的消费者
,它负责接收和处理数据。
Subscribe方法将生产者和消费者连接起来
,并返回一个 IDisposable对象,用于取消订阅。
下一篇文章预告《.NET 响应式编程 System.Reactive 系列文章(三):Subscribe 和 IDisposable 的深入理解》在下一篇文章中,我们将重点探讨Subscribe方法的内部工作机制、IDisposable的作用,以及如何优雅地管理订阅的生命周期。敬请期待!来源:opendotnet
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!