This article will introduce you to the observable objects (Observable), observers (observer) and RxJS operators in Angular. I hope it will be helpful to everyone!
Notification Type |
Description |
##next | Required. Use the received value as input parameter and execute under normal circumstances. May be executed zero or more times. |
error | Optional. Executed in case of error. Errors interrupt the execution of this observable object instance. |
complete | Optional. Executed when the transfer is completed. |
Subscription
Only when someone subscribes to an instance of Observable
will it start publishing values. When subscribing, you must first call the subscribe()
method of the observable object and pass it an observer object to receive notifications. As follows:
In order to demonstrate the principle of subscription, a new observable object needs to be created first. It has a constructor that can be used to create new instances, but to be more concise, you can also use some static methods defined on Observable
to create some commonly used simple observable objects:
-
of(...items)
: Returns an Observable
instance, which sends the values provided in the parameters one by one
in a synchronous manner.
-
from(iterable)
: Converts its argument to an Observable
instance. This method is usually used to convert an array into an observable object (which sends multiple values).
import { of } from "rxjs";
// 1、通过 of() 方法返回一个可观察对象,并准备将1,2,3三个数据发送出去
const observable = of(1, 2, 3);
// 2、实现 observer 接口,观察者
const observer = {
next: (num: number) => console.log(num),
error: (err: Error) => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
}
// 3、订阅。调用可观察对象的 subscribe() 方法订阅,subscribe() 方法中传入的对象就是一个观察者
observable.subscribe(observer);
The running results are as follows:
# The above subscription can be directly changed to the following: The parameter is not an object
observable.subscribe(
num => console.log(num),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Subscriber function
In the above example, the of()
method is used to create an observable object. This section uses the constructor to create an observable object.
Observable
The constructor can create any type of observable stream. When the subscribe()
method of the observable object is executed, this constructor will run the parameters it receives as the subscription function
. The subscription function will receive an Observer
object and publish the value to the next()
method of the observer.
// 1、自定义订阅者函数
function sequenceSubscriber(observer: Observer<number>) {
observer.next(1); // 发送数据
observer.next(2); // 发送数据
observer.next(3); // 发送数据
observer.complete();
return {unsubscribe() {}};
}
// 2、通过构造函数创建一个新的可观察对象,参数就是一个订阅者函数
const sequence = new Observable(sequenceSubscriber);
// 3、订阅
sequence.subscribe({
next(num) { console.log(num); }, // 接受数据
complete() { console.log('Finished sequence'); }
});
The running results are as follows:
The above example demonstrates how to customize the subscription function, so since you can customize the subscriber function, we can encapsulate the asynchronous code into the subscriber function of the observable object, and then send the data after the asynchronous code is executed. As follows:
import { Observable } from 'rxjs'
// 异步函数
function fn(num) {
return new Promise((reslove, reject) => {
setTimeout(() => {
num++
reslove(num)
}, 1000)
})
}
// 创建可观察对象,并传入订阅者函数
const observable = new Observable((x) => {
let num = 1
fn(num).then(
res => x.next(res) // 异步代码执行完成,发送数据
)
})
// 订阅,接收数据,可以改为链式调用
observable.subscribe(data => console.log(data)) // 2
Multicast
https://angular.cn/guide/observables#multicasting
RxJS Operator
We can use a series of RxJS operators
to perform a series of processing and conversion on these messages before they are received by the receiver, because these operators are all pure functions.
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// 1、创建可观察对象,并发送数据
const nums = of(1, 2, 3);
// 2、创建函数以接受可观察对象
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
I don’t understand the above method and it is difficult to accept it. Generally, the following method is commonly used. Use pipe
to link multiple operators.
import { map, Observable, filter } from 'rxjs'
// 创建可观察对象,并传入订阅者函数
const observable = new Observable((x) => {
x.next(1)
x.next(2)
x.next(3)
x.next(4)
}).pipe(
map(value => value*100), // 操作符
filter(value => value == 200) // 操作符
)
.subscribe(data => console.log(data)) // 200
Error handling
RxJS
also provides the catchError
operator, which allows you to handle known errors in the pipeline.
Suppose you have an observable that makes API requests and then maps the responses returned by the server. If the server returns an error or the value does not exist, an error is generated. If you catch this error and provide a default value, the stream will continue processing those values without reporting an error. As follows:
import { map, Observable, filter, catchError, of } from 'rxjs'
const observable = new Observable((x) => {
x.next(1) // 发送数据 1 和 2
x.next(2)
}).pipe(
map(value => {
if (value === 1) { // 1、当发送的数据为 1 时,将其乘以 100
return value*100
} else { // 2、否则抛出错误
throw new Error('抛出错误');
}
}),
// 3、此处捕获错误并处理错误,对外发送数据 0
catchError((err) => {
console.log(err)
return of(0)
})
)
.subscribe(
data => console.log(data),
// 4、由于上面抛出的错误被 catchError 操作符处理(重新发送数据)了,所以这里能顺利订阅到数据而不报错
err => console.log('接受不到数据:', err)
)
The final running result is as follows:
For more programming-related knowledge, please visit: Programming Video! !
The above is the detailed content of A brief analysis of observable objects, observers and RxJS operators in Angular. For more information, please follow other related articles on the PHP Chinese website!