分流即将一个流分成多个流!应该属于组合操作符。发现并没有类似功能操作符,合并流居多,如果有读者发现有类似分流操作符,可以告知我!在此感谢!
类似功能 filter,filter的作用是过滤掉部分杂质,而我需要的是杂质也有用!把杂质当成额外的流引走!相当于挖一个杂质排放通道!下面自己实现一个!
实现 division 分流器
predicate: (value: T, index: number) => boolean 开始截取,也可叫做前缀函数 suffixdicate: (value: T, index: number) => boolean 终止截取,也可叫做后缀函数 observer: Observer 垃圾排放通道
import {
MonoTypeOperatorFunction, Observable,
Operator, Subscriber, TeardownLogic, Observer
} from "rxjs";
export function division<T>(
predicate: (value: T, index: number) => boolean,
suffixdicate: (value: T, index: number) => boolean,
observer: Observer<T>
): MonoTypeOperatorFunction<T> {
return function divisionOperatorFunction(source: Observable<T>): Observable<T> {
return source.lift(new DivisionOperator(predicate, suffixdicate, observer));
}
}
class DivisionOperator<T> implements Operator<T, T> {
constructor(
private predicate: (value: T, index: number) => boolean,
private suffixdicate: (value: T, index: number) => boolean,
private observer: Observer<T>
) { }
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer));
}
}
class DivisionSubscriber<T> extends Subscriber<T> {
count: number = 0;
__isStart: boolean;
constructor(
destination: Subscriber<T>,
private predicate: (value: T, index: number) => boolean,
private suffixdicate: (value: T, index: number) => boolean,
private observer: Observer<T>,
private thisOrgs?: any
) {
super(destination);
}
protected _next(value: T) {
let predicate: boolean;
let suffixdicate: boolean;
try {
this.count++;
predicate = this.predicate.call(this.thisOrgs, value);
suffixdicate = this.suffixdicate.call(this.thisOrgs, value);
} catch (err) {
this.destination.error(err);
return;
}
if (predicate) {
this.__isStart = true;
this.observer.next(value);
} else if (this.__isStart && !suffixdicate) {
this.observer.next(value);
} else if (suffixdicate) {
this.__isStart = false;
this.observer.next(value);
this.observer.complete();
} else {
this.destination.next(value);
}
}
}
复制代码
使用
import { interval, Subject } from "rxjs";
import { division } from './index';
const oneTo100 = new Subject();
interval(100).pipe(
division(
(value, index) => {
return value === 1;
},
(value, index) => {
return value === 100;
},
oneTo100
)
).subscribe(res => {
console.log(res)
})
oneTo100.subscribe(res => {
console.log(`1-100:${res}`)
});
复制代码
不仅能实现filter的过滤主流的功能,还实现了将过滤的部分数据转移到其他流中!
深入
问题来了,这仅仅实现了单次过滤,并没有循环利用,相当于一次性卫生纸,擦完就废了!下面是对上面的方案改造升级
class DivisionSubscriber<T> extends Subscriber<T> {
count: number = 0;
__isStart: boolean;
__division: Subject<T>;
constructor(
destination: Subscriber<T>,
private predicate: (value: T, index: number) => boolean,
private suffixdicate: (value: T, index: number) => boolean,
private observer: Observer<Observable<T>>,
private thisOrgs?: any
) {
super(destination);
}
protected _next(value: T) {
let predicate: boolean;
let suffixdicate: boolean;
try {
this.count++;
predicate = this.predicate.call(this.thisOrgs, value);
suffixdicate = this.suffixdicate.call(this.thisOrgs, value);
} catch (err) {
this.destination.error(err);
return;
}
// 前缀开始
if (predicate) {
this.__isStart = true;
// 开启一个流
this._createNewDivisionItem();
this.__division.next(value);
// 开始但没有结束
} else if (this.__isStart && !suffixdicate) {
this.__division.next(value);
// 结束
} else if (suffixdicate) {
this.__isStart = false;
this.__division.next(value);
this.__division.complete();
} else {
this.destination.next(value);
}
}
private _createNewDivisionItem(): void {
this.__division = new Subject();
// 这里没有直接返回this.__division是为了防止下游改变上游数据
const divition = Observable.create((obser: Observer<T>) => {
this.__division.subscribe(obser);
});
this.observer.next(divition);
}
}
复制代码
使用
import { interval, Subject, Observable } from "rxjs";
import { concatAll, tap } from "rxjs/operators";
import { division } from './division';
const oneTo100 = new Subject<Observable<number>>();
interval(100).pipe(
division(
(value, index) => {
return value === 1 || value === 105;
},
(value, index) => {
return value === 100 || value === 205;
},
oneTo100
)
).subscribe(res => {
console.log(res)
})
oneTo100.pipe(
concatAll()
).subscribe(res => {
console.log(`垃圾处理:${res}`)
});
复制代码
继续深入
当匹配字符串时,有时候需要两个或多个一起匹配!比如开始的条件是/结束的条件是/2个最近字符,或者html中开始的是最近四个字符,这种怎么实现呢!苦恼ing!想到了一个操作符叫scan的!模仿他的思路利用bufferCount进行改进
class DivisionSubscriber<T> extends Subscriber<T> {
constructor( ... // 加入缓存长度控制 private length: number = 1 ... ) {
super(destination);
}
protected _next(value: T) {
let predicate: boolean;
let suffixdicate: boolean;
try {
this.count++;
if (this._buffer.length < this.length) {
// 如果长度不够 继续push
this._buffer.push(value);
} else {
// 否则去掉第一个后push
this._buffer.push(value);
// 保存 前this.length个数据
this._buffer = this._buffer.slice(this._buffer.length - this.length);
}
predicate = this.predicate.call(this.thisOrgs, ...this._buffer);
suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer);
} catch (err) {
this.destination.error(err);
return;
}
... 其他不变
}
private _createNewDivisionItem(): void {
this.__division = new Subject();
// 这里没有直接返回this.__division是为了防止下游改变上游数据
const divition = Observable.create((obser: Observer<T>) => {
this.__division.subscribe(obser);
});
this.observer.next(divition);
}
}
复制代码
使用
interval(100).pipe(
division(
(...values: any[]) => equals([2, 3, 4, 5])(values),
(...values: any[]) => equals([99, 100, 101, 102])(values),
oneTo100,
4
)
).subscribe(res => {
console.log(res)
})
oneTo100.pipe(
concatAll()
).subscribe(res => {
console.log(`垃圾处理:${res}`)
});
复制代码
到此为止就够我用的了,仅把开发思路整理分享,代码有待优化,欢迎各位大佬指正!共同进步学习
最终完整代码!
import {
MonoTypeOperatorFunction, Observable,
Operator, Subscriber, TeardownLogic, Observer, Subject
} from "rxjs";
export function division<T>(
predicate: (value: T, index: number) => boolean,
suffixdicate: (value: T, index: number) => boolean,
observer: Observer<Observable<T>>,
length: number = 1
): MonoTypeOperatorFunction<T> {
return function divisionOperatorFunction(source: Observable<T>): Observable<T> {
return source.lift(new DivisionOperator(predicate, suffixdicate, observer, length));
}
}
class DivisionOperator<T> implements Operator<T, T> {
constructor(
private predicate: (value: T, index: number) => boolean,
private suffixdicate: (value: T, index: number) => boolean,
private observer: Observer<Observable<T>>,
private length: number = 1
) { }
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer, this.length));
}
}
class DivisionSubscriber<T> extends Subscriber<T> {
count: number = 0;
__isStart: boolean;
__division: Subject<T>;
_buffer: any[] = [];
constructor(
destination: Subscriber<T>,
private predicate: (value: T, index: number) => boolean,
private suffixdicate: (value: T, index: number) => boolean,
private observer: Observer<Observable<T>>,
private length: number = 1,
private thisOrgs?: any
) {
super(destination);
}
protected _next(value: T) {
let predicate: boolean;
let suffixdicate: boolean;
try {
this.count++;
if (this._buffer.length < this.length) {
// 如果长度不够 继续push
this._buffer.push(value);
} else {
// 否则去掉第一个后push
this._buffer.push(value);
// 保存 前this.length个数据
this._buffer = this._buffer.slice(this._buffer.length - this.length);
}
predicate = this.predicate.call(this.thisOrgs, ...this._buffer);
suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer);
} catch (err) {
this.destination.error(err);
return;
}
// 前缀开始 predicate = true
if (predicate) {
this.__isStart = true;
// 开启一个流
this._createNewDivisionItem();
this.__division.next(value);
// 开始但没有结束 __isStart = true suffixdicate = false
} else if (this.__isStart && !suffixdicate) {
this.__division.next(value);
// 结束
} else if (suffixdicate) {
this.__isStart = false;
this.__division.next(value);
this.__division.complete();
// 没有开始 长度不够
} else if (!predicate && this._buffer.length < this.length) {
// console.log('没有开始 长度不够', value);
// 没有开始 长度够了
} else if (!predicate && this._buffer.length === this.length) {
this.destination.next(this._buffer[0]);
} else {
this.destination.next(value);
}
}
private _createNewDivisionItem(): void {
this.__division = new Subject();
// 这里没有直接返回this.__division是为了防止下游改变上游数据
const divition = Observable.create((obser: Observer<T>) => {
this.__division.subscribe(obser);
});
this.observer.next(divition);
}
}
复制代码
今天的文章《记》rxjs分流操作符简单实现分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/63243.html