《记》rxjs分流操作符简单实现

《记》rxjs分流操作符简单实现分流即将一个流分成多个流!应该属于组合操作符

分流即将一个流分成多个流!应该属于组合操作符。发现并没有类似功能操作符,合并流居多,如果有读者发现有类似分流操作符,可以告知我!在此感谢!

类似功能 filter,filter的作用是过滤掉部分杂质,而我需要的是杂质也有用!把杂质当成额外的流引走!相当于挖一个杂质排放通道!下面自己实现一个!

实现 division 分流器

predicate: (value: T, index: number) => boolean 开始截取,也可叫做前缀函数 suffixdicate: (value: T, index: number) => boolean 终止截取,也可叫做后缀函数 observer: Observer 垃圾排放通道

import {
    MonoTypeOperatorFunction, Observable,
    Operator, Subscriber, TeardownLogic, Observer
} from "rxjs";

export function division<T>(
    predicate: (value: T, index: number) => boolean,
    suffixdicate: (value: T, index: number) => boolean,
    observer: Observer<T>
): MonoTypeOperatorFunction<T> {
    return function divisionOperatorFunction(source: Observable<T>): Observable<T> {
        return source.lift(new DivisionOperator(predicate, suffixdicate, observer));
    }
}

class DivisionOperator<T> implements Operator<T, T> {
    constructor(
        private predicate: (value: T, index: number) => boolean,
        private suffixdicate: (value: T, index: number) => boolean,
        private observer: Observer<T>
    ) { }

    call(subscriber: Subscriber<T>, source: any): TeardownLogic {
        return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer));
    }
}

class DivisionSubscriber<T> extends Subscriber<T> {
    count: number = 0;
    __isStart: boolean;
    constructor(
        destination: Subscriber<T>,
        private predicate: (value: T, index: number) => boolean,
        private suffixdicate: (value: T, index: number) => boolean,
        private observer: Observer<T>,
        private thisOrgs?: any
    ) {
        super(destination);
    }

    protected _next(value: T) {
        let predicate: boolean;
        let suffixdicate: boolean;
        try {
            this.count++;
            predicate = this.predicate.call(this.thisOrgs, value);
            suffixdicate = this.suffixdicate.call(this.thisOrgs, value);
        } catch (err) {
            this.destination.error(err);
            return;
        }
        if (predicate) {
            this.__isStart = true;
            this.observer.next(value);
        } else if (this.__isStart && !suffixdicate) {
            this.observer.next(value);
        } else if (suffixdicate) {
            this.__isStart = false;
            this.observer.next(value);
            this.observer.complete();
        } else {
            this.destination.next(value);
        }
    }
}
复制代码

使用

import { interval, Subject } from "rxjs";
import { division } from './index';
const oneTo100 = new Subject();
interval(100).pipe(
    division(
        (value, index) => {
            return value === 1;
        },
        (value, index) => {
            return value === 100;
        },
        oneTo100
    )
).subscribe(res => {
    console.log(res)
})

oneTo100.subscribe(res => {
    console.log(`1-100:${res}`)
});
复制代码

不仅能实现filter的过滤主流的功能,还实现了将过滤的部分数据转移到其他流中!

深入

问题来了,这仅仅实现了单次过滤,并没有循环利用,相当于一次性卫生纸,擦完就废了!下面是对上面的方案改造升级


class DivisionSubscriber<T> extends Subscriber<T> {
    count: number = 0;
    __isStart: boolean;
    __division: Subject<T>;
    constructor(
        destination: Subscriber<T>,
        private predicate: (value: T, index: number) => boolean,
        private suffixdicate: (value: T, index: number) => boolean,
        private observer: Observer<Observable<T>>,
        private thisOrgs?: any
    ) {
        super(destination);
    }

    protected _next(value: T) {
        let predicate: boolean;
        let suffixdicate: boolean;
        try {
            this.count++;
            predicate = this.predicate.call(this.thisOrgs, value);
            suffixdicate = this.suffixdicate.call(this.thisOrgs, value);
        } catch (err) {
            this.destination.error(err);
            return;
        }
        // 前缀开始
        if (predicate) {
            this.__isStart = true;
            // 开启一个流
            this._createNewDivisionItem();
            this.__division.next(value);
            // 开始但没有结束
        } else if (this.__isStart && !suffixdicate) {
            this.__division.next(value);
            // 结束
        } else if (suffixdicate) {
            this.__isStart = false;
            this.__division.next(value);
            this.__division.complete();
        } else {
            this.destination.next(value);
        }
    }

    private _createNewDivisionItem(): void {
        this.__division = new Subject();
        // 这里没有直接返回this.__division是为了防止下游改变上游数据
        const divition = Observable.create((obser: Observer<T>) => {
            this.__division.subscribe(obser);
        });
        this.observer.next(divition);
    }
}

复制代码

使用

import { interval, Subject, Observable } from "rxjs";
import { concatAll, tap } from "rxjs/operators";

import { division } from './division';
const oneTo100 = new Subject<Observable<number>>();
interval(100).pipe(
    division(
        (value, index) => {
            return value === 1 || value === 105;
        },
        (value, index) => {
            return value === 100 || value === 205;
        },
        oneTo100
    )
).subscribe(res => {
    console.log(res)
})

oneTo100.pipe(
    concatAll()
).subscribe(res => {
    console.log(`垃圾处理:${res}`)
});

复制代码

继续深入

当匹配字符串时,有时候需要两个或多个一起匹配!比如开始的条件是/结束的条件是/2个最近字符,或者html中开始的是最近四个字符,这种怎么实现呢!苦恼ing!想到了一个操作符叫scan的!模仿他的思路利用bufferCount进行改进

class DivisionSubscriber<T> extends Subscriber<T> {
    constructor( ... // 加入缓存长度控制 private length: number = 1 ... ) {
        super(destination);
    }

    protected _next(value: T) {
        let predicate: boolean;
        let suffixdicate: boolean;
        try {
            this.count++;
            if (this._buffer.length < this.length) {
                // 如果长度不够 继续push
                this._buffer.push(value);
            } else {
                // 否则去掉第一个后push
                this._buffer.push(value);
                // 保存 前this.length个数据
                this._buffer = this._buffer.slice(this._buffer.length - this.length);
            }
            predicate = this.predicate.call(this.thisOrgs, ...this._buffer);
            suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer);
        } catch (err) {
            this.destination.error(err);
            return;
        }
        ... 其他不变
    }

    private _createNewDivisionItem(): void {
        this.__division = new Subject();
        // 这里没有直接返回this.__division是为了防止下游改变上游数据
        const divition = Observable.create((obser: Observer<T>) => {
            this.__division.subscribe(obser);
        });
        this.observer.next(divition);
    }
}

复制代码

使用

interval(100).pipe(
    division(
        (...values: any[]) => equals([2, 3, 4, 5])(values),
        (...values: any[]) => equals([99, 100, 101, 102])(values),
        oneTo100,
        4
    )
).subscribe(res => {
    console.log(res)
})

oneTo100.pipe(
    concatAll()
).subscribe(res => {
    console.log(`垃圾处理:${res}`)
});

复制代码

到此为止就够我用的了,仅把开发思路整理分享,代码有待优化,欢迎各位大佬指正!共同进步学习

最终完整代码!

import {
    MonoTypeOperatorFunction, Observable,
    Operator, Subscriber, TeardownLogic, Observer, Subject
} from "rxjs";

export function division<T>(
    predicate: (value: T, index: number) => boolean,
    suffixdicate: (value: T, index: number) => boolean,
    observer: Observer<Observable<T>>,
    length: number = 1
): MonoTypeOperatorFunction<T> {
    return function divisionOperatorFunction(source: Observable<T>): Observable<T> {
        return source.lift(new DivisionOperator(predicate, suffixdicate, observer, length));
    }
}

class DivisionOperator<T> implements Operator<T, T> {
    constructor(
        private predicate: (value: T, index: number) => boolean,
        private suffixdicate: (value: T, index: number) => boolean,
        private observer: Observer<Observable<T>>,
        private length: number = 1
    ) { }

    call(subscriber: Subscriber<T>, source: any): TeardownLogic {
        return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer, this.length));
    }
}

class DivisionSubscriber<T> extends Subscriber<T> {
    count: number = 0;
    __isStart: boolean;
    __division: Subject<T>;
    _buffer: any[] = [];
    constructor(
        destination: Subscriber<T>,
        private predicate: (value: T, index: number) => boolean,
        private suffixdicate: (value: T, index: number) => boolean,
        private observer: Observer<Observable<T>>,
        private length: number = 1,
        private thisOrgs?: any
    ) {
        super(destination);
    }

    protected _next(value: T) {
        let predicate: boolean;
        let suffixdicate: boolean;
        try {
            this.count++;
            if (this._buffer.length < this.length) {
                // 如果长度不够 继续push
                this._buffer.push(value);
            } else {
                // 否则去掉第一个后push
                this._buffer.push(value);
                // 保存 前this.length个数据
                this._buffer = this._buffer.slice(this._buffer.length - this.length);
            }
            predicate = this.predicate.call(this.thisOrgs, ...this._buffer);
            suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer);
        } catch (err) {
            this.destination.error(err);
            return;
        }
        // 前缀开始 predicate = true
        if (predicate) {
            this.__isStart = true;
            // 开启一个流
            this._createNewDivisionItem();
            this.__division.next(value);
            // 开始但没有结束 __isStart = true suffixdicate = false
        } else if (this.__isStart && !suffixdicate) {
            this.__division.next(value);
            // 结束
        } else if (suffixdicate) {
            this.__isStart = false;
            this.__division.next(value);
            this.__division.complete();
            // 没有开始 长度不够
        } else if (!predicate && this._buffer.length < this.length) {
            // console.log('没有开始 长度不够', value);
            // 没有开始 长度够了
        } else if (!predicate && this._buffer.length === this.length) {
            this.destination.next(this._buffer[0]);
        } else {
            this.destination.next(value);
        }
    }

    private _createNewDivisionItem(): void {
        this.__division = new Subject();
        // 这里没有直接返回this.__division是为了防止下游改变上游数据
        const divition = Observable.create((obser: Observer<T>) => {
            this.__division.subscribe(obser);
        });
        this.observer.next(divition);
    }
}

复制代码

今天的文章《记》rxjs分流操作符简单实现分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/63243.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注