本文为稀土掘金技术社区首发签约文章,30天内禁止转载,30天后未获授权禁止转载,侵权必究!
前言
什么是 Streaming ?
百度百科倒是有关于 Streaming Media 的解释:
流媒体(streaming media)是指将一连串的媒体数据压缩后,经过网上分段发送数据,在网上即时传输影音以供观赏的一种技术与过程,此技术使得数据包得以像流水一样发送;如果不使用此技术,就必须在使用前下载整个媒体文件。
放到 HTTP 请求上,Streaming 差不多也是同样的意思,将数据分段发送给客户端。
一个比较常见的应用是 ChatGPT 的打字效果:
如果我们要用 Next.js 实现一个具有 Streaming 效果的接口该如何实现呢?
注:本篇的最后我们会调用 OpenAI 的接口来实现这样一个效果。
Fetch API
我们先从 Fetch API 开始说起。
Fetch API 想必大家已经很熟悉了,我们常常会这样使用 fetch:
fetch("http://example.com/movies.json")
.then((response) => response.json())
.then((data) => console.log(data));
在这个例子中,我们获取了一个 JSON 文件并将其打印。为了获取 JSON 的内容,我们需要使用 json()
方法(该方法返回一个将 response body 解析成 JSON 的 promise)。
实际上,response 还有一个 body 只读属性,它是一个简单的 getter,用于暴露一个 ReadableStream 类型的 body 内容。简单来说,fetch 的 response.body 会返回一个流类型的内容。这样的设计在请求大体积文件的时候会很有用。
ReadableStream
如何读取
response.body 返回一个 ReadableStream 类型的内容。ReadableStream 有一个 getReader() 实例方法,它会创建一个读取器并将流锁定。一旦流被锁定,其他读取器将不能读取它,直到它被释放。
读取器是 ReadableStreamDefaultReader 类型,它是用于读取来自网络提供的流数据(例如 fetch 请求)的默认 reader。它有一个 read() 实例方法,返回一个 Promise,提供对流内部队列中下一个分块的访问权限。而这个 Promise 的值,其 resolve / reject 的结果取决于流的状态:
- 如果有 chuck 可用,则 promise 将使用
{ value: theChunk, done: false }
形式的对象来 resolve。 - 如果流已经关闭,则 promise 将使用
{ value: undefined, done: true }
形式的对象来 resolve。 - 如果流发生错误,promise 将因相关错误被拒绝。
根据 MDN ReadableStream.getReader() 中的例子尝试读取一下 ReadableStream 中的内容:
const decoder = new TextDecoder('utf-8');
fetch('https://api.thecatapi.com/v1/images/search')
.then((response) => response.body)
.then((body) => {
const reader = body.getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log('Stream finished');
return;
}
const text = decoder.decode(value);
console.log('Received data chunk', text);
return reader.read().then(process);
});
})
这里我们写了一个递归,不断读取流中的内容,直到流关闭(done 为 true)。我们可以复制这段代码,在浏览器中运行,效果如下:
因为接口本身不是流式,所以这里只有一个 chunk,正是接口的返回内容。
如何创建
知道了如何读取,我们又该如何创建一个 ReadableStream 类型的内容呢?
可以使用 ReadableStream() 构造函数,示例代码如下:
const stream = new ReadableStream(
{
start(controller) {},
pull(controller) {},
cancel() {},
type,
autoAllocateChunkSize,
}
);
构造函数第一个参数对象包含着五个属性,仅有第一个是必要的:
-
start(controller)
—— 一个在 ReadableStream 构建后,立即被调用一次的方法。在这个方法中,你应该包含设置流功能的代码,例如开始生成数据或者以其他的方式访问资源时 -
pull(controller)
—— 一个方法,当被包含时,它会被重复的调用直到填满流的内置队列。当排入更多的分块时,这可以用于控制流 -
cancel()
—— 一个方法,当被包含时,如果应用发出流将被取消的信号,它将被调用(例如,调用 ReadableStream.cancel())。内容应该采取任何必要的措施释放对流源的访问 -
type
和autoAllocateChunkSize
—— 当它们被包含时,会被用来表示流将是一个字节流。字节流将在未来的教程中单独涵盖,因为它们在目的和用例上与常规的(默认的)流有些不同。它们也未在任何地方实施。
让我们写个例子熟悉一下:
fetch('https://mdn.github.io/dom-examples/streams/simple-pump/tortoise.png')
.then(response => response.body)
.then(rs => {
const reader = rs.getReader();
return new ReadableStream({
async start(controller) {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
controller.enqueue(value);
}
controller.close();
reader.releaseLock();
}
})
})
.then(rs => new Response(rs))
.then(response => response.blob())
.then(blob => URL.createObjectURL(blob))
.then(url => {
var img = new Image();
img.src = url;
document.body.append(img)
})
.catch(console.error);
在这个例子中,我们 fetch 了一张图片,先用读取器访问流,根据流的内容创建新的流文件(相当于拷贝一遍),然后获取新的流文件,最终将其转换为图片元素,添加到 body 中。我们复制这段代码到浏览器中,效果如下:
在这段代码中,可能有点困惑的是 start 函数的第一个参数 controller,它是 ReadableStreamDefaultController 类型,用于控制 ReadableStream 的状态和内部队列。
简单来说,它有三个实例方法,close()
用于关闭流,enqueue()
用于加入流,error()
用于报错。
Next.js 实现 Streaming
基础示例
有了这些基础知识,让我们用 Next.js 实现一个 Streaming 接口吧。
使用官方脚手架创建一个 Next.js 项目:
npx create-next-app@latest
运行效果如下:
为了样式美观,我们会用到 Tailwind CSS,所以注意勾选 Tailwind CSS,其他随意。
新建 api/chat/route.js
,代码如下:
function iteratorToStream(iterator) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
},
})
}
function sleep(time) {
return new Promise((resolve) => {
setTimeout(resolve, time)
})
}
const encoder = new TextEncoder()
async function* makeIterator() {
yield encoder.encode('<p>One</p>')
await sleep(1000)
yield encoder.encode('<p>Two</p>')
await sleep(1000)
yield encoder.encode('<p>Three</p>')
}
export async function GET() {
const iterator = makeIterator()
const stream = iteratorToStream(iterator)
return new Response(stream)
}
这段代码是 Next.js 官方提供的关于使用底层 API 实现 Streaming 的示例代码,其中又参考了 MDN ReadableStream 的示例代码。代码逻辑并不复杂,主要功能是在运行迭代器,不断将内容推到流中。为了效果明显,我们加了 sleep 函数。
本地运行 npm run dev
,此时访问 http://localhost:3000/api/chat,效果如下:
注意:Next.js 开发模式默认开启 React Strict Mode,这会导致请求调用两次,影响这里的结果。你可以在 next.config.js
配置中关闭 React Strict Mode:
const nextConfig = {
reactStrictMode: false,
};
export default nextConfig;
随着请求的持续连接,这些内容会间隔 1s 出现。查看此接口的响应头:
请求之所以能够持续返回数据,也是得益于 HTTP 的 Transfer-Encoding 标头的值为 chunked,表示数据将以一系列分块的形式进行发送。
分块传输编码(Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,允许 HTTP由网页服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在 HTTP 协议1.1版本(HTTP/1.1)中提供。
接口写好了,前端又该如何调用呢?
这个时候就要用到前面讲 ReadableStream 读取的内容了。修改 app/page.js
,代码如下:
'use client'
const decoder = new TextDecoder('utf-8');
import { useEffect, useState } from "react";
export default function Chat() {
const [text, setText] = useState('')
useEffect(() => {
const fetchData = async () => {
const response = await fetch("http://localhost:3000/api/chat");
const reader = response.body.getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log('Stream finished');
return;
}
const text = decoder.decode(value);
console.log('Received data chunk', text);
setText((value) => {
return value + text
})
return reader.read().then(process);
});
}
fetchData()
}, [])
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{text}
</div>
);
}
刷新页面,交互效果如下:
调用 OpenAI 接口
写 Streaming 接口,一个很常见的应用是后端调用大模型的接口,比如 OpenAI 的接口:
import OpenAI from "openai";
const openai = new OpenAI();
async function main() {
const stream = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: "Say this is a test" }],
stream: true,
});
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta?.content || "");
}
}
main();
让我们实现一下开头的那个效果。
为此你需要准备一个 OpenAI API 3.5 的 KEY。修改 api/chat/route.js
,代码如下:
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY || '',
baseURL: "https://api.openai-proxy.com/v1"
});
const encoder = new TextEncoder()
async function* makeIterator(response) {
for await (const chunk of response) {
const delta = chunk.choices[0].delta.content
yield encoder.encode(delta)
}
}
function iteratorToStream(iterator) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
},
})
}
export async function POST(req) {
const { messages } = await req.json();
const response = await openai.chat.completions.create({
model: 'gpt-3.5-turbo',
stream: true,
messages,
});
return new Response(iteratorToStream(makeIterator(response)))
}
新建 .env.local
,代码如下:
OPENAI_API_KEY=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
修改 app/page.js
,代码如下:
'use client';
import { useState, useEffect } from "react";
const decoder = new TextDecoder('utf-8');
export default function Chat() {
const [text, setText] = useState('')
const [input, setInput] = useState('')
const handleInputChange = (e) => {
setInput(e.target.value)
}
const handleSubmit = async (e) => {
e.preventDefault()
setText('')
setInput('')
const response = await fetchData(input)
const reader = response.body.getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log('Stream finished');
return;
}
const text = decoder.decode(value);
console.log('Received data chunk', text);
setText((value) => {
return value + text
})
return reader.read().then(process);
});
}
const fetchData = async (input) => {
const response = await fetch("http://localhost:3000/api/chat", {
method: "POST",
body: JSON.stringify({messages: [{ role: "user", content: input }]})
});
return response
}
return (
<div className="flex flex-col w-full max-w-md p-2 mx-auto stretch">
<div className="whitespace-pre-wrap">
{text ? 'AI: ' + text : ''}
</div>
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
);
}
交互效果如下:
从右侧浏览器的打印中,我们也可以看出,随着内容的不断返回,React 在不断的渲染内容,这才实现了打字流的效果。
总结
本篇我们从 fetch API 开始讲起,response.body 返回的正是一个 ReadableStream 类型的只读流。接下来我们讲了 ReadableStream 中的内容如何读取以及 ReadableStream 如何创建。借助底层 API 实现流的时候,就需要通过创建 ReadableStream 的方式来实现。
最后我们讲了两个全栈示例,后端接口如何创建,前端又该如何调用,希望对大家业务中实现 Streaming 有借鉴意义。
参考链接
原文链接:https://juejin.cn/post/7344089411983802394 作者:冴羽