Skip to content

Nodejs 大文件分片上传 #123

Open
@TieMuZhen

Description

@TieMuZhen

一、步骤

  1. 前端:大文件进行切片文件名-下标-token
  2. 前端:控制并发的分片数量
  3. 前端:分片上传完毕,通知后端进行合并操作
  4. 后端:接收切片,将分片从缓存目录中取出,重命名文件名-下标-token
  5. 后端:合并操作,根据下标获取对应切片 ,通过stream 进行合并

二、前端

文件获取通过input 标签生成的File 对象

  • Blob 对象 表示原始数据,提供了slice方法
  • File 继承了Blob 的功能

HTML:只需要input file标签

<input type="file" name="f1" id="test">
<button id="upload_btn">确认上传</button>

JS:切割,控制并发,发送合并信息

//提交数据
const UPLOAD_URL = "http://localhost:8081/upload";

//监听btn,触发上传
document.querySelector("#upload_btn").addEventListener("click", () => {
  submitUpload(UPLOAD_URL, getElFile("input#test"));
});

// 主体上传功能
async function submitUpload(url, file) {
  const CHUNKSIZE = 1 * 1024 * 1024; // 1M
  const TOKEN = Date.now();
  // 切割数组
  const chunkList = sliceFile(file, CHUNKSIZE);
  // 创建formdata 并上传
  console.log(file.name);
  let promiseList = createChunkPromiseList(chunkList, file.name, TOKEN);
  // 并发控制 上传
  await createLimitPromise(2, promiseList);
  // 单独发最后一个请求告诉后端分片上传完毕,并附带所有分片文件合并后总文件的信息
  let mergeFormData = new FormData();
  mergeFormData.append("type", "merge");
  mergeFormData.append("token", TOKEN);
  mergeFormData.append("chunkCount", chunkList.length);
  mergeFormData.append("fileName", file.name);
  let res = await axios.post(url, mergeFormData);

  console.log(res);
}

辅助函数

// 并发控制 
function createLimitPromise(limitNum, promiseListRaw) {
  let resArr = [];
  let handling = 0;
  let resolvedNum = 0;
  let promiseList = [...promiseListRaw]
  let runTime =  promiseListRaw.length

  return new Promise(resolve => {
    // 并发执行limitNum 次
    for (let i = 1; i <= limitNum; i++) {
      run();
    }

    function run() {
       if(!promiseList.length) return 
        handling += 1;
        console.log("cur handling:" + handling)
        handle(promiseList.shift())
          .then(res => {
            resArr.push(res);
          })
          .catch(e => {
            //ignore
            console.log("catch error");
          })
          .finally(() => {
            handling -= 1;
            resolvedNum += 1;
            console.log(`resolvedNum : ${resolvedNum}`);
            if(resolvedNum === runTime){
              resolve(resArr)
            }
            run();
          });
    }
    function handle(promise) {
      return new Promise((resolve, reject) => {
        promise.then(res => resolve(res)).catch(e => reject(e));
      });
    }
  });
}

// 分片二进制数据
function sliceFile(file, chunkSize) {
  let chunkList = [];
  let start = 0;
  let end = chunkSize;
  while (true) {
    let curChunk = file.slice(start, end);
    if (!curChunk.size) break;
    chunkList.push(curChunk);
    start += chunkSize;
    end = start + chunkSize;
  }
  return chunkList;
}

//获取HTML 中的file对象
function getElFile(selector) {
  return document.querySelector(selector).files[0];
}

//chunkList => formdata list => PromiseList
//切片数组 封装成 http 请求
function createChunkPromiseList(chunkList, name, TOKEN) {
  return chunkList
    .map((chunk, index) => {
      console.log(chunk);
      let formdata = new FormData();
      formdata.append("type", "upload");
      formdata.append("name", name);
      formdata.append("token", TOKEN);
      formdata.append("chunk", chunk);
      formdata.append("index", index);
      return formdata;
    })
    .map(formdata => {
      console.log(formdata.get("type"));
      return axios.post(UPLOAD_URL, formdata, axiosConfig);
    });
}

三、后端(基于koa)

注意:koa-bodyparser不能处理formdata数据,需要使用koa-body

const uploadChunkPath = path.resolve(__dirname,'../data')
app.use(KoaBody({
  multipart:true, // 支持文件上传 
  formidable: {
    //设置文件的默认保存目录,不设置则保存在系统临时目录下  os
    uploadDir: uploadChunkPath
  },
}))

基于Koa

//主体
router.post('/upload',ctx=>{
    if(ctx.request.body.type === 'merge'){
            const {token,chunkCount,fileName} = ctx.request.body
            mergeChunkFile(fileName,uploadChunkPath,chunkCount,token,'../data')    
            ctx.body =  'ok'
    }else if(ctx.request.body.type === 'upload'){
            const {index,token,name} = ctx.request.body
            const chunkFile = ctx.request.files.chunk
            const chunkName = chunkFile.path.split('/').pop()
            renameFile(uploadChunkPath,chunkName,`${name}-${index}-${token}`)
            ctx.body = 'upload chunk success'
    }else{
        ctx.body = "unkown type"
    }
})

merge 函数 (stream pipe)

/**
 *  
 * @param {String} fileName 生成文件的文件名
 * @param {String} chunkPath 缓存目录的路径
 * @param {String} fileToken 文件的token
 * @param {String} dataDir 可选,生成文件的相对路径
 * @returns {Boolean} 
 */
const mergeChunkFile = (fileName,chunkPath,chunkCount,fileToken,dataDir="./")=>{
    //如果chunkPath 不存在 则直接结束
    if(!fs.existsSync(chunkPath)) return 
    const dataPath = path.join(__dirname,dataDir,fileName);
    let writeStream = fs.createWriteStream(dataPath); 
    let mergedChunkNum = 0
    return mergeCore()
  	//闭包保存非递归数据
    function mergeCore(){
      	//结束标志为已合并数量大于总数(mergedChunkNum从0开始)
        if (mergedChunkNum >= chunkCount) return
        const curChunk = path.resolve(chunkPath,`${fileName}-${mergedChunkNum}-${fileToken}`)
        const curChunkReadStream = fs.createReadStream(curChunk);
        //将readStream 写入 writeStream
        curChunkReadStream.pipe(writeStream, { end: false }); //end = false 则可以连续给writeStream 写数据
        curChunkReadStream.on("end", () => {
            //readStream 传输结束 则 递归 进行下一个文件流的读写操作
            fs.unlinkSync(curChunk) //删除chunkFile
            mergedChunkNum += 1
            mergeCore();
        });
    }
}

辅助函数

//文件重命名
function renameFile(dir,oldName,newName){
    const oldPath = path.resolve(dir,oldName)
    const newPath = path.resolve(dir,newName)
    fs.renameSync(oldPath,newPath)
}

四、进度条

ProgressEvent:这个接口用于测量 HTTP请求 进度的时间

//XHR 
xhr.onprogress=updateProgress;
xhr.upload.onprogress = updateProgress;
  function updateProgress(event) {
    console.log(event);
    if (event.lengthComputable) {
      var completedPercent = (event.loaded / event.total * 100).toFixed(2);
      progressSpan.style.width= completedPercent+'%';
      progressSpan.innerHTML=completedPercent+'%';
      if(completedPercent>90){//进度条变色
        progressSpan.classList.add('green');
      }
      console.log('已上传',completedPercent);
    }
}
//axios

const config = {
  ...
  onUploadProgress: progressEvent => {
    console.log(progressEvent);
  }
  ...
}

axios.post(url,data,config)

大文件切片进度条,存在限制,所以采用已加载切片/ 总切片 * 100的方式实现进度条 文件越,切片越多,显示的越细致

//从limitPromise 函数中获取
let loadedLen = 0 
let fileChunkLen = 0 
//axios 配置
const axiosConfig = {
  onUploadProgress: progressEvent => {
    const curPercent =( loadedLen / fileChunkLen *100).toFixed(2)
    console.log(` percentage :${curPercent}%`)
  }
};

五、断点续传

  • 依赖服务端
    1. 服务端设置接口:获取当前缓存目录下的内容
    2. 前端进行比较,如果已经存在,则跳过切片
  • 依赖客户端
    1. 客户端在发送切片前,将切片打上标记spark-md5唯一标记,将已经上传的切片信息保存在本地
    2. 在重新上传的时候,对比本地的标记 ,如果相同就跳过

参考文献

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions