啊哈,我也实现了大文件的断点续传

我心飞翔 分类:javascript

之前看到了几位大佬写的前端大文件上传的文章,感觉这个功能还挺有意思的,我也试了试,在大佬的代码上加了一些自己的想法。这篇文章算是一个自己的总结,也希望能给想做这个功能的同学一些帮助。

功能总结如下:

  1. 大文件的分片上传
  2. 文件的断点续传
  3. 并发控制
  4. 错误重试

参考文章(非常推荐大家阅读):

  1. 写给新手前端的各种文件上传攻略,从小图片到大文件断点续传
  2. 字节跳动面试官:请你实现一个大文件上传和断点续传
  3. 字节跳动面试官,我也实现了大文件上传和断点续传

项目地址:gitee

话不多说,下面开始

本文技术栈:

前端:vue,element-ui,webWorker,axios,spark-MD5以及一些文件上传api

后端:express+multiparty

需求整理

在正式开始之前,先来梳理一下要实现的功能:

  1. 由于要上传的文件比较大,因此我们不可能直接将整个文件直接上传,因此需要对文件进行切割,也就是分片,将多个分片依次上传,最终由服务端将切片进行合并,从而得到最终文件
  2. 大文件上传时间比较长,并且分片后请求较多,上传过程中,可能会遇到用户断网等问题,因此,我们的上传功能需要支持断点续传(参考迅雷的下载过程)
  3. 如果将每个分片设为1M,那么一个200M的文件就会有200个分片,也就是会有200个请求。这200个请求不可能同时发出去,因此,我们需要对并发的数量做一个控制。
  4. 前面说了,用户上传过程中可能会遇到断网的问题,或者某个切片会丢失,为了优化用户体验,应该对每个失败的请求进行重试。

确定好功能之后,下面来梳理一下整个流程以及一些技术细节

  1. 使用<input type='file'>来承载上传的文件,当用户选择文件时,触发change事件,可以获取到文件的信息,并且给用户一个本地预览

  2. 当用户点击上传时,开始对文件进行切片,使用Blob.slice方法

  3. 使用文件的hash作为后端存储时的文件名,因此在上传前应该先计算文件的hash。计算hash是一个比较费时的操作,因此我们使用Worker线程计算,防止页面假死。

  4. 考虑断点续传的情况:如果用户上传期间断开了链接,之后重新上传时,应该只上传还没有上传的部分。为了实现这个功能,我们有两种方案:

    1. 在客户端记录用户上传过哪些切片,比如localStorage
    2. 在上传之前向服务端发一个verify请求,服务器查找用户上传文件的文件夹,获取用户已经上传的部分并返回给客户端

    我们采用第二种方案。

  5. 过滤出还没有上传的部分,开始上传

  6. 用户点击暂停,这里我没有使用原生xhrabort,没有取消已经发出去的请求,而是暂停了后续的请求,也算是一个不一样的地方吧,后文会讲到我是怎么实现的。

  7. 用户点击恢复,恢复被暂停的请求

  8. 如果某些切片超出了重试的次数之后仍然失败,用户可以点击重试按钮,将失败的部分重新上传

  9. 当全部切片发送完成时,请求服务器合并切片

流程图如下:

 大文件上传.jpg

基本结构

两个配置项:

const CHUNK_SIZE = 1024 * 1024 * 0.5 // 每个切片大小,单位字节
const CONCURRENCY_LIMIT = 4 // 并发数量限制
 

定义上传的状态

const UPLOAD_STATUS = {
  calculatingHash: "calculatingHash", // 正在计算hash
  waiting: "waiting",                 // 等待用户开始上传
  uploading: "uploading",             // 正在上传
  abort: "abort",                     // 暂停上传
  success: "success",                 // 后端文件合并完毕,文件秒传时也应该是这个状态
  fail: "fail"                        // 上传失败
}
 

data中的量:

data() {
  return {
    showPreview: false, // 本地文件预览加载完成时为true
    file: null,         // 文件
    hash: "",           // 文件hash
    worker: null,       // worker线程
    hashPercent: 0,     // hash计算进度,用于显示hash计算进度条
    uploadedLen: 0,     // 已经上传过的切片数量
    chunksLen: 0,       // 总切片数量
    fileChunks: [],     // 记录所有切片
    Scheduler: null,    // 并发任务调度器,所有的请求会通过该调度器来完成,并且实现了错误重试功能
    uploadStatus: UPLOAD_STATUS,
    curStatus: UPLOAD_STATUS.waiting // 当前的上传状态
  }
}
 

定义的方法:

// 当input触发change事件时触发,用于获取文件
handleFileChange(e) {}
// 用于预览文件
preview() {}
// 用户点击上传时触发,此方法中需要调用文件分片方法,计算hash方法,verify验证方法,上传切片方法
async handleUpload() {}
// 上传失败时,用户点击重试按钮时触发
async handleRetry() {}
// 文件切片的方法
createFileChunks() {}
// 计算文件hash的方法
getFileHash() {}
// 请求服务器哪些切片已经上传过
async verifyUpload(filename, fileHash) {}
// 上传切片
async uploadChunks(chunksNeedUpload) {}
// 请求服务器合并切片
async mergeRequest() {}
// 用户点击暂停时触发
handlePause() {}
// 用户点击继续按钮时触发
handleResume() {}
 

html结构如下,主要就是几个按钮,hash计算进度条,上传进度条和用于预览的video标签

<input type="file" @change="handleFileChange" />
<el-button
type="primary"
@click="handleUpload"
v-show="curStatus === uploadStatus.waiting"
>上传</el-button
>
<el-button
type="primary"
@click="handleRetry"
v-show="curStatus === uploadStatus.fail"
>重试</el-button
>
<el-button
type="warning"
@click="handlePause"
v-show="curStatus === uploadStatus.uploading"
>暂停</el-button
>
<el-button
type="primary"
@click="handleResume"
v-show="curStatus === uploadStatus.abort"
>继续</el-button
>
<br />
<el-progress
type="circle"
:percentage="hashPercent"
class="hash-progress"
:status="hashProgressStatus"
></el-progress>
<el-progress
:text-inside="true"
:stroke-width="20"
:percentage="uploadPercent"
:status="uploadProgressStatus"
class="file-progress"
></el-progress>
<div class="preview-container" v-show="showPreview">
<video ref="preview" controls></video>
</div>

用于进度条的几个计算属性:

// hash计算进度条状态,用于区分颜色
hashProgressStatus({ hashPercent }) {
return hashPercent >= 100 ? "success" : null
},
// 上传进度条状态,用于区分颜色
uploadProgressStatus({ curStatus, uploadStatus, uploadPercent }) {
if (curStatus === uploadStatus.fail) return "exception"
return uploadPercent >= 100 ? "success" : null
},
// 上传进度
uploadPercent({ uploadedLen, chunksLen, curStatus, uploadStatus }) {
if (curStatus === uploadStatus.waiting) return 0
if (curStatus === uploadStatus.success) return 100
return Math.floor((100 * uploadedLen) / chunksLen)
}

文件预览

首先是用户选择文件,触发change事件,处理函数为handleFileChange

handleFileChange(e) {
this.file = e.target.files[0] // 获取文件
this.preview() // 预览文件
}
preview() {
// URL.createObjectURL方法创建一个对于本地文件的引用url
const URLobj = window.URL.createObjectURL(this.file)
const preview = this.$refs.preview
preview.src = URLobj
preview.oncanplay = () => {
this.showPreview = true
}
}

这里需要注意,对于视频文件,我们不能用URL.revokeObjectURL()来销毁这个引用,否则视频就不能播放了。对于图片可以用该方法销毁引用。

上传前的准备

文件分片

通过前面的流程图可以看到,我们首先要对文件进行切片

async handleUpload() {
if (!this.file) {
this.$message({
type: "error",
message: "请选择要上传的文件"
})
return
}
// 文件切片
this.createFileChunks()
}

切片的方法比较简单,只需要用slice方法切分即可

function createFileChunks() {
const chunkList = []
let cur = 0
const file = this.file
const size = file.size
while (cur < size) {
chunkList.push(file.slice(cur, cur + CHUNK_SIZE))
cur += CHUNK_SIZE
}
this.chunksLen = chunkList.length
this.fileChunks = chunkList
}

计算hash

计算文件hash用到了worker线程,因此在/public下新建hash.jsworker线程接受文件切片作为参数,计算文件hash

// hash.js
importScript('./spark-md5.min.js')
self.onmessage = function(e) {
const { fileChunks } = e.data
const spark = new SparkMD5.ArrayBuffer()
const fileReader = new FileReader()
const len = fileChunks.length
let curChunk = 0
// 注意要用箭头函数,否则self指向错误
fileReader.onload = (e) => {
spark.append(e.target.result)
curChunk++
if (curChunk >= len) {
const hash = spark.end()
self.postMessage({ hash, percent: 100 }) // 当解析完毕时,将hash传递出去
self.close()
} else {
fileReader.readAsArrayBuffer(fileChunks[curChunk])
self.postMessage({ percent: 100 * curChunk / len }) // 没有解析完毕时,只传递解析进度
}
}
fileReader.readAsArrayBuffer(fileChunks[curChunk])
}

Spark-md5文档中提到,使用增量计算性能更好,这部分代码见官方文档即可

Incremental md5 performs a lot better for hashing large amounts of data, such as files. One could read files in chunks, using the FileReader & Blob's, and append each chunk for md5 hashing while keeping memory usage low.

主线程与worker线程的通信的逻辑如下

function getFileHash() {
this.curStatus = this.uploadStatus.calculatingHash
return new Promise((resolve) => {
this.worker = new Worker('/hash.js')
this.worker.postMessage({ fileChunks: this.fileChunks })
this.woker.onMessage = (e) => {
const { hash, percent } = e.data
this.hashPercent = Math.ceil(percent)
if (hash) {
this.hash = hash
resolve()
}
}
})
}

验证是否需要上传

下一步就是向服务器发送请求,得到已经上传的切片列表,这样就能够过滤出还需要上传哪些切片

现在讲一下服务器的逻辑:服务器会以文件hash为名字建立一个临时文件夹,改文件夹内放了所有切片,当服务器收到merge请求时,说明所有切片上传完毕,服务器就会将该文件夹内所有切片合并,放到存储文件的地址下,然后删除临时文件夹。

服务器会根据用户上传的文件名(用于提取文件后缀)和文件hash进行查找,如果存在该文件(注意不是临时文件夹),说明该文件不需要再上传了,也就是实现了文件秒传。如果不存在,服务器会找到该临时文件夹,然后得到该文件夹内已经存在的切片,并将已经上传过的切片列表返回。

async handleUpload() {
if (!this.file) {
this.$message({
type: "error",
message: "请选择要上传的文件"
})
return
}
// 文件切片
this.createFileChunks()
const { shouldUpload, uploadedList }await this.verifyUplaod() // 增加
// 说明文件已经上传过了
if (!shouldUpload) {
this.$message({
type: "success",
message: "文件秒传"
})
this.curStatus = this.uploadStatus.success
return
}
}
async verifyUplaod() {
const { data } = await axios({
url: '/verify',
method: 'post',
data: { filename: this.file.name, fileHash: this.hash }
})
return data
}

文件上传

添加任务

接下来过滤掉已经上传过的切片,将未上传过的切片上传即可

async handleUpload() {
// ...
const chunksNeedUpload = this.fileChunks
.map((chunk, index) => ({
chunk,
fileHash: this.hash, // 整个文件的hash
hash: `${this.hash}_${index}` // 每个切片的hash,这里用下划线分割,后端应该与之一致
}))
.filter(hash => !uploadedList.includes(hash))
this.uploadedLen = uploadedList.length
this.uploadChunks(chunksNeedUpload)
}

接下来的uploadChunks方法就是核心内容了,首先,我们上传文件时用的MIME类型是mutipart/form-data,因此,我们用FormData来构建请求体

async uploadChunks(chunksNeedUpload) {
this.curStatus = this.uploadStatus.uploading
chunksNeedUpload
.map({chunk, fileHash, hash} => {
const formData = new FormData()
formData.append('chunk', chunk)
formData.append('hash', hash) // 文件hash
formData.append('fileHash', fileHash) // 切片hash
})
}

接下来构建请求列表,将所有请求用一个函数封装,用并发调度器来执行

async uploadChunks(chunksNeedUpload) {
this.curStatus = this.uploadStatus.uploading
this.Scheduler = new Scheduler(CONCURRENCY_LIMIT) // 增加
chunksNeedUpload
.map({chunk, fileHash, hash} => {
const formData = new FormData()
formData.append('chunk', chunk)
formData.append('hash', hash) // 文件hash
formData.append('fileHash', fileHash) // 切片hash
})
.forEach((formData, index) => {
const taskFn = () => {
axios({
url: '/',
method: 'post',
headers: { 'Content-Type': 'multipart/form-data' },
data: formData
}).then(() => this.uploadedLen++) // 用于显示进度条
}
this.Scheduler.append(taskFn, index) // 增加
})
const { status } = await this.Scheduler.done() // 增加
if (status === 'success') {
this.mergeRequest()
} else {
this.$message({
type: 'error',
message: '文件上传失败,请重试'
})
}
}

这里的重点就是Scheduler类,通过调用append方法添加任务,执行done方法,会返回一个promisepromise返回两个值:status表示所有任务的调度状态,success为成功,即所有任务成功,fail为失败,即有些任务超过了重试次数仍然失败;第二个值是一个结果数组,与Promise.allSettled的返回值相同。

任务调度之并发控制与错误重试

接下来是讲解异步任务调度器是如何实现的,见/utils/scheduler.js

该任务调度器的功能如下:保证异步任务的并发数量,并且每个任务失败时可以重试,重试次数过多后彻底失败,并且可以暂停和恢复任务的执行。

首先定义了每个任务的状态:

const STATUS = {
waiting: 'waiting', // 正在等待执行
running: 'running', // 正在执行
error: 'error',     // 失败,但是还可以重试
fail: 'fail',       // 超出重试次数后仍然失败
success: 'success'  // 成功
}

整个任务调度的结果

const PENDING = 'pending'
const SUCCESS = 'success' // 所有任务成功
const FIAL = 'fail'       // 某些任务多次重试后仍然失败

接下来看构造方法

constructor(max = 4, retryTime = 3) {
this.status = PENDING
this.max = max             // 最大并发量
this.tasks = []            // 任务数组
this.promises = []         // 任务结果promise数组,顺序与任务添加顺序对应
this.settledCount = 0      // 已经有结果的任务数量,成功或者重试多次后彻底失败
this.abort = false         // 是否暂停执行
this.retryTime = retryTime // 重试次数
}

append方法用于添加任务,比较简单

append(handler, index) {
// 任务数组中添加了处理函数,任务状态,已经重试了几次,索引
this.tasks.push({ handler, status: STATUS.waiting, retryTime: 0, index })
}

外部会执行done方法来启动

done() {
// run方法来开始调度任务
return this.run().then(() =>
// 这里用this.promises保存了每个task返回的promise,用于返回每个任务的结果
// 对于本demo来说是多余的,但是这样更有通用性                   
Promise.allSettled(this.promises).then((res) => ({
status: this.status,
res
}))
)
}

接下来就是核心的run方法

run() {
return new Promise((resolve) => {
const start = async () => {}
for (let i = 0; i < this.max; i++) {
start()
}
})
}

初始时一次性启动this.max个任务,在start方法中,一个任务结束后会递归调用start,接下来是start方法

const start = async () => {
const index = this.tasks.findIndex(
{ status } => status === STATUS.waiting || status === STATUS.error
)
if (index === -1) return // 注意,这里有个大坑
const task = tasks[index]
task.status = STATUS.running
const promise = task.handler()
this.promises[task.index] = promise
promise
.then(() => {
task.status = STATUS.success
this.settledCount += 1
if (this.settledCount >= this.tasks.length) {
if (this.status === PENDING) {
this.status = SUCCESS
}
resolve()
} else {
start()
}
})
.catch(() => {
// 如果超出了重试次数,该任务彻底失败
if (task.retryTime >= this.retryTime) {
task.status = STATUS.fail
this.settledCount += 1
this.status = FAIL // 有一个任务彻底失败,整个任务调度就失败
} else {
task.status = STATUS.error
task.retryTime += 1
}
if (this.settledCount >= this.tasks.length) {
resolve()
} else {
start()
}
})
}

这部分实现了两个功能:并发控制和错误重试。可以看到,start函数的递归调用只有在then和回调catch回调中存在,因此保证了一个任务结束后才能执行下一个任务,而初始状态时,一口气执行了this.max个任务,因此实现了并发数量的控制。接下来是错误重试,原理在于

const index = this.tasks.findIndex(
{ status } => status === STATUS.waiting || status === STATUS.error
)

这一行代码是用来寻找下一个需要执行的任务,目标是状态为waitingerror的任务。在then回调中,我们把任务的状态置为了success,失败回调中,还可以继续重试的任务状态为error,彻底失败,不能继续重试的任务状态为fail,这样,下次寻找任务时就能找到失败的任务,从而完成重试。

if (index === -1) return // 注意,这里有个大坑

这里的坑在于,比如并发数量设置为4,当最后4个任务执行时,任务A率先执行成功,进入then回调,此时还有三个任务还没有结果,因此settledCount<this.tasks.length,会递归执行start方法,因此会寻找状态为waiting和error的任务,但是此时,任务的状态只有可能三种:success, running, fail,因此index为-1,那么task就是undefined,后续代码就会报错。所有任务都有结果后,将外层的promiseresolve掉。

这里的thencatch回调函数都形成了闭包,所以能够得到对应的task对象

任务调度之暂停与恢复

下一个功能是任务的暂停与恢复。之所以没有使用原生xhrabort,是想把这个功能实现地更有通用性,对于非网络请求的并发控制可以使用。

任务的暂停也很简单,外部控制promise的状态即可(貌似是promise/defer模式?),手写过promise的同学应该比较熟悉,在promiseA+测试时会用到这个方法。原理就是将promiseresolvereject方法保存到外部,从而在外部控制promise的状态。

setDeferred() {
let deferred, resolveFn
deferred = new Promise((resolve) => {
resolveFn = resolve
})
this.deferred = deferred
this.resolve = resolveFn
}

在构造函数中执行该方法

constructor() {
// ...
this.setDeferred()
}

当执行this.resolve时,this.deferred就被resolve了。

start函数中加上一个判断:

const start = async () => {
if (this.abort) await this.deferred
// ...
}

初始状态,this.abortfalse,当需要暂停时,调用pause方法

pause() {
this.abort = true
}

由于this.deferred一直是pending状态,后面的任务需要等待该promiseresolve,从而实现了任务暂停。

至于继续任务也很简单

resume() {
this.abort = false
this.resolve() // 放行
this.setDeferred() // 重置deferred
}

这样,一个异步任务调度器就算完成了。当用户点击暂停和继续按钮时,分别触发handlePausehandleResume方法,这两个方法分别调用Scheduler.pauseScheduler.resume方法即可

handlePause() {
this.Scheduler.pause()
this.curStatus = this.uploadStatus.abort
}
handleResume() {
this.Scheduler.resume()
this.curStatus = this.uploadStatus.uploading
}

后台接口说明

熟悉nodejs的同学可以直接跳过这部分

后端是基于@yeyan1996的代码(作为一个0年前端,确实不会node),稍微改造了一下,用了express,看起来更简单一点,现在来介绍一下后端接口,主要是帮助和我一样不懂node的同学。

作者确实是不会nodejs,因此传的参数比较乱,会的同学可以自己改造

请求地址:/verify,请求方式post

字段 是否必须 说明
fileHash 整个文件的hash
filename 原本的文件名

返回数据类型:json

字段 说明
shouldUpload 如果该文件已经存在,为false,否则为true
uoloadedList 已经上传过的切片列表,当shouldUpload为false时为空

请求地址:/,请求方式post,数据类型:form-data

fomrData字段 是否必须 说明
chunk 文件切片
hash 文件hash_切片索引
fileHash 整个文件的hash
filename 原本的文件名

在这个接口中用随机数模拟了上传错误

请求地址:/merge,请求方式post

字段 是否必须 说明
hash 整个文件的hash
suffix 文件后缀

返回数据类型:json

字段 说明
code 0

总结

一个大文件的断点续传分为以下几个步骤

  1. 获取文件,将文件分片
  2. 根据切片获取文件hash
  3. 验证该文件是否已经上传过
  4. 上传切片
  5. 通知服务器合并切片

这些基本流程,在很多文章中都已经讲到了,本文的demo则是在这些的基础上,增加了断点续传,并发控制,报错重传的功能,个人认为这些才是大文件上传的核心内容。本文中实现的Scheduler类是一个更具通用性的异步任务调度器,不仅仅局限在ajax请求上。

源码

gitee

不足与展望

本文只是实现了一个提供基本功能的大文件断点续传,其实能够扩展的地方还有很多,比如:

  1. 使用websocket,由服务器主动推送进度
  2. 用户选择文件,但还没有点击上传之前,实现一个类似预计算的功能
  3. 文件分片大小和并发数量是固定的,可以根据用户网速进行调整,比如当用户网速较快时,可以将多个切片通过可写流合并为一个切片上传,提高速度
  4. 上传前的类型校验等等

大家可以结合以上的不足,或者根据自身的需求进行扩展,希望本文能够帮到大家!!!

如果感觉文章不错的话,就请点个赞吧👍🏻 !!!

回复

我来回复
  • 暂无回复内容