RxJS 中的网络请求

RxJS 系列文章

一、简介

因为 RxJS 是一个函数式和响应式的编程库,并且 RxJS 也在网络请求方面也有了自己工具封装封装 Ajax 和 fetch。

RxJS 中的网络请求

以上是 RxJS 请求先关的大致内容。

二、RxJS 使用 ajax

方法 描述
ajax.get(url) 发送GET请求
ajax.post(url, data) 发送POST请求
ajax.put(url, data) 发送PUT请求
ajax.patch(url, data) 发送PATCH请求
ajax.delete(url) 发送DELETE请求
ajax({ method, url, ...}) 通用的ajax方法,支持自定义HTTP方法和配置

ajax 方法 API 特别的好理解,ajax 方法上能配置对象(常常是 options),当然也有 ajax 的 http 方法(get/post/…) 等不同的方法。

三、RxJS ajax 示例

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';

const url = 'https://jsonplaceholder.typicode.com/posts/1';

ajax.getJSON(url).pipe(
  map(response => {
    console.log('成功的响应:', response);
    // 在这里处理响应数据
    return response;
  }),
  catchError(error => {
    console.error('发生错误:', error);
    // 在这里处理错误
    throw error;
  })
).subscribe({
    next(){},
});

此处使用 ajax 的 getJSON 方法获取 json数据,使用 map 操作符进行数据转换,在 catchError 操作符中进行错误捕获。

四、RxJS 使用 fetch

API 语法

fromFetch<T>(
    input: string | Request, 
    initWithSelector: RequestInit & { selector?: (response: Response) => ObservableInput<T>; } = {}): Observable<Response | T>

常用方法

方法 描述
fromFetch(url) 发送GET请求
fromFetch(url, initWithSelector) 发送GET请求

五、RxJS fromFetch

在 RxJS 提供的 fetch 方法中,rxjs具有客观对象的能力,使用 map 和 catchError 操作符,

import { fromFetch } from 'rxjs/fetch';
import { catchError, map } from 'rxjs/operators';

const url = 'https://jsonplaceholder.typicode.com/posts/1';

fromFetch(url).pipe(
  catchError(error => {
    console.error('发生错误:', error);
    // 在这里处理错误
    throw error;
  }),
  map(response => {
    if (!response.ok) {
      throw new Error(`网络错误: ${response.status}`);
    }
    return response.json();
  })
).subscribe(data => {
  console.log('成功的响应:', data);
  // 在这里处理响应数据
});

六、RxJS ajax 定义

它为一个Ajax请求创建一个observable,这个observable要么是一个带有url、header等的请求对象,要么是一个URL字符串。

在我们开始 RxJS ajax 之前,我们开始回顾 Axios 和 fetch API 的基本认知。

七、axios 基本内容

  • 从浏览器基于 XHR
  • 从 Node.js 基于 http
  • Promise API
  • 拦截器和转换器

关于本文和 axios 着重关注Promise 和拦截器。

7.1) 添加拦截器

// 添加请求拦截器
axios.interceptors.request.use(function (config) {
    // 在发送请求之前做些什么
    return config;
  }, function (error) {
    // 对请求错误做些什么
    return Promise.reject(error);
  });

// 添加响应拦截器
axios.interceptors.response.use(function (response) {
    // 2xx 范围内的状态码都会触发该函数。
    // 对响应数据做点什么
    return response;
  }, function (error) {
    // 超出 2xx 范围的状态码都会触发该函数。
    // 对响应错误做点什么
    return Promise.reject(error);
  });

7.2) 移除拦截器

const myInterceptor = axios.interceptors.request.use(function () {/*...*/});
axios.interceptors.request.eject(myInterceptor);

八、一个简单的 RxJS Ajax 拦截器实现

import { ajax, AjaxRequest, AjaxResponse } from 'rxjs/ajax';
import { Observable } from 'rxjs';
import { catchError, map, mergeMap } from 'rxjs/operators';

// 自定义拦截器类
class AjaxInterceptor {
  intercept(request: AjaxRequest): Observable<AjaxRequest> {
    // 在这里可以对请求进行修改或添加头部等操作
    console.log('Request Interceptor:', request);
    return Observable.of(request);
  }

  interceptResponse(response: AjaxResponse): Observable<AjaxResponse> {
    // 在这里可以对响应进行修改,处理错误等操作
    console.log('Response Interceptor:', response);
    return Observable.of(response);
  }
}

具体用法

// 创建配置实例
const ajaxConfig = new AjaxConfig();

// 注册拦截器
const interceptorA = new AjaxInterceptor();
const interceptorB = new AjaxInterceptor();

ajaxConfig.use(interceptorA);
ajaxConfig.use(interceptorB);

// 创建并发送 Ajax 请求
const request: AjaxRequest = {
  url: 'https://jsonplaceholder.typicode.com/posts/1',
  method: 'GET',
};

ajaxConfig.sendRequest(request).subscribe(
  (response) => console.log('Final Response:', response),
  (error) => console.error('Error:', error)
);

九、分析

我们需要一个拦截器类,管理拦截器相关的内容,整体实现的内容是拦截器和响应拦截器。然后就实现一个 AjaxConfig 类。

// 配置类,包含 use 方法用于注册中间件
class AjaxConfig {
  private interceptors: AjaxInterceptor[] = [];

  // 注册拦截器中间件的方法
  use(interceptor: AjaxInterceptor): void {
    this.interceptors.push(interceptor);
  }

  // 发送 Ajax 请求的方法,应用注册的拦截器
  sendRequest(request: AjaxRequest): Observable<AjaxResponse> {
    let observable: Observable<AjaxRequest> = Observable.of(request);

    // 应用注册的拦截器
    this.interceptors.forEach((interceptor) => {
      observable = observable.pipe(mergeMap((req) => interceptor.intercept(req)));
    });

    // 发送实际的 Ajax 请求
    return observable.pipe(
      mergeMap((req) => ajax(req)),
      map((response) => {
        // 应用响应拦截器
        this.interceptors.forEach((interceptor) => {
          response = interceptor.interceptResponse(response);
        });
        return response;
      }),
      catchError((error) => {
        console.error('Error:', error);
        throw error;
      })
    );
  }
}

十、使用 from 操作符将 Promise 转化成可观察对象

Axios, fetch 都是基于 Promise 的请求对象。Promise 在 RxJS 中可以方便进行转换。from 操作符就是能方便的将 Promise 转化为可观察对象。

10.1) fetch

直接将 fetch 使用 from 操作符进行封装,将 Promise 进行转换

import { from } from 'rxjs';

const fetch$ = from(fetch(someUrl));

10.2) 在 pipe 函数中转换

有时候我们的需要输入、或者点击一些内容之后,在 pipe 函数中才能拿到数据,此时我们需要流的转换,此时我们就输需要 mergeMap。

import { fromEvent } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const button = document.getElementById('myButton');

const clicl$ = fromEvent(button, 'click');

click$.pipe(
  mergeMap(() => fetch('https://api.example.com/data'))
).subscribe({
  next: (data) => {
    // 在这里处理接收到的数据
    console.log(data);
  },
  error: (error) => {
    // 处理错误
    console.error(error);
  }
});

十一、反向从客观到Promise

RxJS 中 from 操作符可方便的将各种不同的类型转换成可观察对象,也可以将可观察对象转换成 Promise。

11.1) toPromise

toPromsie 将在 v8 版本中被废弃

import { from } from 'rxjs';

from([1, 2, 3, 4, 5]).toPromise().then((result) => {
    // 处理成功时的结果
    console.log('Promise resolved with result:', result);
  },
  (error) => {
    // 处理发生错误时的情况
    console.error('Promise rejected with error:', error);
  }
);

次数输出的结果是 Promise resolved with result: 5

可观察对象具有 toPromise 方法,调用之后,可以使用 Promise thenable 处理异步操作。

11.2) lastValueFrom/firstValueFrom

为什么会有这些 API? 因为可观察对象可以发送处多个值,但是 Promise 只能决策出一个值。

11.2.1) lastValueFrom

lastValueFrom 使用Observable完成时到达的最后一个值进行解析,返回类型是 Promise<T>

import { interval, take, lastValueFrom } from 'rxjs'

async function execute() {
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$); // `Promise<Number>
  console.log(finalNumber);
}

execute()

11.2.2) firstValueFrom

第一个值到达时就取它,而不等待Observable完成,因此你可以使用 firstValueFrom 。 firstValueFrom 将使用从Observable发出的第一个值解析Promise,并立即取消订阅以保留资源。如果Observable完成而没有发出任何值,则 firstValueFrom 也将拒绝 EmptyError 。

import { interval, firstValueFrom } from 'rxjs'

async function execute() {
  const source$ = interval(2000)
  const firstNumber = await firstValueFrom(source$)
  console.log(`The first number is ${firstNumber}`);
}

execute()

十一、请求重试

import { from, throwError, of } from "rxjs";
import { mergeMap, retry, catchError } from "rxjs/operators";

const apiUrl = "your address";

from(fetch(apiUrl))
  .pipe(
    mergeMap((response) => {
      if (!response.ok) {
        throw new Error(`HTTP error! Status: ${response.status}`);
      }
      return response.json();
    }),
    retry(3), // 重试3次
    catchError((error, retryCount) => {
      if (retryCount < 3) {
        // 可以根据具体情况判断是否需要重试
        console.log(`Retrying (${retryCount + 1})...`);
        return of(null); // 返回一个新的Observable来触发重试
      } else {
        return throwError("Max retries reached"); // 超过重试次数,抛出错误
      }
    })
  )
  .subscribe({
    next: (data) => {
      console.log("Data received:", data);
    },
    error: (error) => {
      console.error("Error:", error);
    },
  });

十二、小结

本文主要关注 RxJS 网络请求相关的问题,包含了 Ajax 和 fetch 两个先关的技术,以及使用方法,同时也关注底层的Promise化和重试网络请求、错误处理等等,当然我们如果需求也自己封装 RxJS 计划的 axios 方便我们在 Node.js 等服务端环境中使用。最后希望能够帮助到大家。

原文链接:https://juejin.cn/post/7336020150867722277 作者:编程杂货铺

(0)
上一篇 2024年2月18日 上午10:20
下一篇 2024年2月18日 上午10:31

相关推荐

发表回复

登录后才能评论