大文件异步分片上传到Seaweed服务器

大文件异步分片上传到Seaweed服务器大文件异步分片上传到Seaweed服务器大文件分片上传到服务器临时目录主要过程客户端把大文件分片上传,服务器接收到文件后,按照每段的序号和每段大小重新拼接成完整的临时文件.然后再将临时文件上传到文件



大文件异步分片上传到Seaweed服务器

大文件分片上传到服务器临时目录

主要过程

客户端把大文件分片上传, 服务器接收到文件后, 按照每段的序号和每段大小重新拼接成完整的临时文件. 然后再将临时文件上传到文件服务器(Seaweed).

大文件上传到临时目录

  • 接受文件的类

    /** * 文件传输对象 */
    @ApiModel("大文件分片入参实体")
    @Data
    public class MultipartFileParam { 
         
        @ApiModelProperty("文件传输任务ID")
        private String taskId;
    
        @ApiModelProperty("当前为第几分片")
        private int chunk;
    
        @ApiModelProperty("每个分块的大小")
        private long size;
    
        @ApiModelProperty("分片总数")
        private int chunkTotal;
    
        @ApiModelProperty("分块文件传输对象")
        private MultipartFile file;
    }
    
  • 文件处理工具类

    所有文件上传完毕后, 返回临时文件的存放路径, 否则返回空字符串

    import java.io.File;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.lang.reflect.Method;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.util.UUID;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    import org.apache.commons.io.FileUtils;
    
    import com.gato.cloud.sppc.common.bean.MultipartFileParam;
    
    public class FileUtil { 
         
    
        private static final Logger logger = Logger.getLogger(FileUtil.class.getName());
    
        private static String OS = System.getProperty("os.name").toLowerCase();
        // 文件临时存放位置
        private static final String URL_PROPERTIES_UNIX = "/opt/big_file_tmp";
        private static final String URL_PROPERTIES_WIN = "c:\\big_file_tmp";
        private static String basePath = URL_PROPERTIES_UNIX;
        static { 
         
            try { 
         
                if (isWindows()) { 
         
                    basePath = URL_PROPERTIES_WIN;
                }
            } catch (Exception e) { 
         
                logger.log(Level.SEVERE, e.getMessage(), e);
                throw new RuntimeException(e);
            }
        }
    
        // 第一步:获取RandomAccessFile,随机访问文件类的对象
        // 第二步:调用RandomAccessFile的getChannel()方法,打开文件通道 FileChannel
        // 第三步:获取当前是第几个分块,计算文件的最后偏移量
        // 第四步:获取当前文件分块的字节数组,用于获取文件字节长度
        // 第五步:使用文件通道FileChannel类的 map()方法创建直接字节缓冲器 MappedByteBuffer
        // 第六步:将分块的字节数组放入到当前位置的缓冲区内 mappedByteBuffer.put(byte[] b);
        // 第七步:释放缓冲区
        // 第八步:检查文件是否全部完成上传
        public static String uploadFileByMappedByteBuffer(MultipartFileParam param) throws IOException { 
         
            if (param.getTaskId() == null || "".equals(param.getTaskId())) { 
         
                param.setTaskId(UUID.randomUUID().toString());
            }
            /** * 1:原文件名改为UUID 2:创建临时文件,和源文件一个路径 3:如果文件路径不存在重新创建 */
            String fileName = param.getFile().getOriginalFilename();
            // fileName.substring(fileName.lastIndexOf(".")) 这个地方可以直接写死 写成你的上传路径
            String tempFileName = param.getTaskId() + fileName.substring(fileName.lastIndexOf(".")) + "_tmp";
            String filePath = basePath + "/original";
            File fileDir = new File(filePath);
            if (!fileDir.exists()) { 
         
                fileDir.mkdirs();
            }
            File tempFile = new File(filePath, tempFileName);
            try (RandomAccessFile raf = new RandomAccessFile(tempFile, "rw"); // 第一步
                FileChannel fileChannel = raf.getChannel()) // 第二步
            { 
         
                // 第三步
                long offset = param.getChunk() * param.getSize();
                // 第四步
                byte[] fileData = param.getFile().getBytes();
                // 第五步
                MappedByteBuffer mappedByteBuffer =
                    fileChannel.map(FileChannel.MapMode.READ_WRITE, offset, fileData.length);
                // 第六步
                mappedByteBuffer.put(fileData);
                // 第七步
                freedMappedByteBuffer(mappedByteBuffer);
            } catch (IOException e) { 
         
                logger.log(Level.SEVERE, "上传大文件出错", e);
            }
            // 第八步
            boolean isComplete = checkUploadStatus(param, fileName, filePath);
            if (isComplete) { 
         
                String storeName = param.getTaskId() + fileName.substring(fileName.lastIndexOf("."));
                boolean flag = renameFile(tempFile, storeName);
                if (flag) { 
         
                    // 返回临时文件存放路径
                    return filePath + "/" + storeName;
                }
            }
            return "";
        }
    
        /** * 文件重命名 * * @param toBeRenamed * 将要修改名字的文件 * @param toFileNewName * 新的名字 * @return */
        public static boolean renameFile(File toBeRenamed, String toFileNewName) { 
         
            // 检查要重命名的文件是否存在,是否是文件
            if (!toBeRenamed.exists() || toBeRenamed.isDirectory()) { 
         
                return false;
            }
            String p = toBeRenamed.getParent();
            File newFile = new File(p + File.separatorChar + toFileNewName);
            // 修改文件名
            return toBeRenamed.renameTo(newFile);
        }
    
        /** * 检查文件上传进度 * * @return */
        public static boolean checkUploadStatus(MultipartFileParam param, String fileName, String filePath)
            throws IOException { 
         
            File confFile = new File(filePath, fileName + ".conf");
            try (RandomAccessFile confAccessFile = new RandomAccessFile(confFile, "rw")) { 
         
                // 设置文件长度
                confAccessFile.setLength(param.getChunkTotal());
                // 设置起始偏移量
                confAccessFile.seek(param.getChunk());
                // 将指定的一个字节写入文件中 127,
                confAccessFile.write(Byte.MAX_VALUE);
            } catch (IOException e) { 
         
                logger.log(Level.SEVERE, "读取上传大文件临时文件异常", e);
                return false;
            }
            byte[] completeStatusList = FileUtils.readFileToByteArray(confFile);
            byte isComplete = Byte.MAX_VALUE;
            // 创建conf文件文件长度为总分片数,
            // 每上传一个分块即向conf文件中写入一个127,
            // 那么没上传的位置就是默认的0,已上传的就是Byte.MAX_VALUE 127
            for (int i = 0; i < completeStatusList.length && isComplete == Byte.MAX_VALUE; i++) { 
         
                // 按位与运算,将&两边的数转为二进制进行比较,有一个为0结果为0,全为1结果为1 eg.3&5 即 0000 0011 & 0000 0101 = 0000 0001 因此,3&5的值得1。
                isComplete = (byte)(isComplete & completeStatusList[i]);
                System.out.println("check part " + i + " complete?:" + completeStatusList[i]);
            }
            if (isComplete == Byte.MAX_VALUE) { 
         
                // 如果全部文件上传完成,删除conf文件
                confFile.delete();
                return true;
            }
            return false;
        }
    
        public static boolean isWindows() { 
         
            return (OS.indexOf("win") >= 0);
        }
    
        // 在MappedByteBuffer释放后再对它进行读操作的话就会引发jvm crash,在并发情况下很容易发生
        // 正在释放时另一个线程正开始读取,于是crash就发生了。
        // 所以为了系统稳定性释放前一般需要检查是否还有线程在读或写
        public static void freedMappedByteBuffer(final MappedByteBuffer mappedByteBuffer) { 
         
            try { 
         
                if (mappedByteBuffer == null) { 
         
                    return;
                }
                mappedByteBuffer.force();
                AccessController.doPrivileged(new PrivilegedAction<Object>() { 
         
                    @Override
                    public Object run() { 
         
                        try { 
         
                            Method getCleanerMethod = mappedByteBuffer.getClass().getMethod("cleaner", new Class[0]);
                            // 可以访问private的权限
                            getCleanerMethod.setAccessible(true);
                            // 在具有指定参数的 方法对象上调用此 方法对象表示的底层方法
                            sun.misc.Cleaner cleaner =
                                (sun.misc.Cleaner)getCleanerMethod.invoke(mappedByteBuffer, new Object[0]);
                            cleaner.clean();
                        } catch (Exception e) { 
         
                            logger.log(Level.SEVERE, "clean MappedByteBuffer error!!!", e);
                        }
                        logger.info("clean MappedByteBuffer completed!!!");
                        return null;
                    }
                });
            } catch (Exception e) { 
         
                e.printStackTrace();
            }
        }
    }
    

临时文件上传到文件服务器(Seaweed)

     /** * 文件系统上传文件的地址 */
	@Value("${custom.fileTargetUrl}")
    private String FILE_TARGET_URL ;

	public String uploadFile(MultipartFileParam param) throws IOException { 
   
        String filePath = FileUtil.uploadFileByMappedByteBuffer(param);
        if (StringUtils.isNotEmpty(filePath)) { 
   
            // 文件已经整合完了, 上传文件到服务器
            String result = sendToFileStoreFromLocalPath(filePath);
            String url = JSONObject.parseObject(result).get("fileUrl").toString();
            // 文件上传完后, 删除临时文件
            File temFile = new File(filePath);
            temFile.delete();
            return url;
        }
        return null;
    }

	/** * 获取本地文件,转存到文件系统 * @param filPath 本地文件地址 * @return */
    public static String sendToFileStoreFromLocalPath(String filPath) { 
   
        if (filPath == null || "".equals(filPath)) { 
   
            return null;
        }
        HttpPost postRequest = new HttpPost(FILE_TARGET_URL);
        try(
                //获取本地文件输入流
                InputStream inputStream = new FileInputStream(filPath);
                CloseableHttpClient client = HttpClientBuilder.create().build()
            ) { 
   
            // 将流写入文件系统
            MultipartEntityBuilder builder = MultipartEntityBuilder.create();
            builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
            builder.addBinaryBody("file", inputStream);

            postRequest.setEntity(builder.build());

            HttpResponse postResponse = client.execute(postRequest);

            // 获取响应信息
            String result = EntityUtils.toString(postResponse.getEntity());
            postRequest.releaseConnection();
            return result;
        } catch (Exception e) { 
   
            log.info("获取本地文件,转存到文件系统出错:{}",e.getMessage(),e);
            return null;
        } finally { 
   
            postRequest.releaseConnection();
        }
    }

今天的文章大文件异步分片上传到Seaweed服务器分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/65507.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注