大文件异步分片上传到Seaweed服务器
大文件分片上传到服务器临时目录
主要过程
客户端把大文件分片上传, 服务器接收到文件后, 按照每段的序号和每段大小重新拼接成完整的临时文件. 然后再将临时文件上传到文件服务器(Seaweed).
大文件上传到临时目录
-
接受文件的类
/** * 文件传输对象 */ @ApiModel("大文件分片入参实体") @Data public class MultipartFileParam { @ApiModelProperty("文件传输任务ID") private String taskId; @ApiModelProperty("当前为第几分片") private int chunk; @ApiModelProperty("每个分块的大小") private long size; @ApiModelProperty("分片总数") private int chunkTotal; @ApiModelProperty("分块文件传输对象") private MultipartFile file; }
-
文件处理工具类
所有文件上传完毕后, 返回临时文件的存放路径, 否则返回空字符串
import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.reflect.Method; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.commons.io.FileUtils; import com.gato.cloud.sppc.common.bean.MultipartFileParam; public class FileUtil { private static final Logger logger = Logger.getLogger(FileUtil.class.getName()); private static String OS = System.getProperty("os.name").toLowerCase(); // 文件临时存放位置 private static final String URL_PROPERTIES_UNIX = "/opt/big_file_tmp"; private static final String URL_PROPERTIES_WIN = "c:\\big_file_tmp"; private static String basePath = URL_PROPERTIES_UNIX; static { try { if (isWindows()) { basePath = URL_PROPERTIES_WIN; } } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); throw new RuntimeException(e); } } // 第一步:获取RandomAccessFile,随机访问文件类的对象 // 第二步:调用RandomAccessFile的getChannel()方法,打开文件通道 FileChannel // 第三步:获取当前是第几个分块,计算文件的最后偏移量 // 第四步:获取当前文件分块的字节数组,用于获取文件字节长度 // 第五步:使用文件通道FileChannel类的 map()方法创建直接字节缓冲器 MappedByteBuffer // 第六步:将分块的字节数组放入到当前位置的缓冲区内 mappedByteBuffer.put(byte[] b); // 第七步:释放缓冲区 // 第八步:检查文件是否全部完成上传 public static String uploadFileByMappedByteBuffer(MultipartFileParam param) throws IOException { if (param.getTaskId() == null || "".equals(param.getTaskId())) { param.setTaskId(UUID.randomUUID().toString()); } /** * 1:原文件名改为UUID 2:创建临时文件,和源文件一个路径 3:如果文件路径不存在重新创建 */ String fileName = param.getFile().getOriginalFilename(); // fileName.substring(fileName.lastIndexOf(".")) 这个地方可以直接写死 写成你的上传路径 String tempFileName = param.getTaskId() + fileName.substring(fileName.lastIndexOf(".")) + "_tmp"; String filePath = basePath + "/original"; File fileDir = new File(filePath); if (!fileDir.exists()) { fileDir.mkdirs(); } File tempFile = new File(filePath, tempFileName); try (RandomAccessFile raf = new RandomAccessFile(tempFile, "rw"); // 第一步 FileChannel fileChannel = raf.getChannel()) // 第二步 { // 第三步 long offset = param.getChunk() * param.getSize(); // 第四步 byte[] fileData = param.getFile().getBytes(); // 第五步 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, offset, fileData.length); // 第六步 mappedByteBuffer.put(fileData); // 第七步 freedMappedByteBuffer(mappedByteBuffer); } catch (IOException e) { logger.log(Level.SEVERE, "上传大文件出错", e); } // 第八步 boolean isComplete = checkUploadStatus(param, fileName, filePath); if (isComplete) { String storeName = param.getTaskId() + fileName.substring(fileName.lastIndexOf(".")); boolean flag = renameFile(tempFile, storeName); if (flag) { // 返回临时文件存放路径 return filePath + "/" + storeName; } } return ""; } /** * 文件重命名 * * @param toBeRenamed * 将要修改名字的文件 * @param toFileNewName * 新的名字 * @return */ public static boolean renameFile(File toBeRenamed, String toFileNewName) { // 检查要重命名的文件是否存在,是否是文件 if (!toBeRenamed.exists() || toBeRenamed.isDirectory()) { return false; } String p = toBeRenamed.getParent(); File newFile = new File(p + File.separatorChar + toFileNewName); // 修改文件名 return toBeRenamed.renameTo(newFile); } /** * 检查文件上传进度 * * @return */ public static boolean checkUploadStatus(MultipartFileParam param, String fileName, String filePath) throws IOException { File confFile = new File(filePath, fileName + ".conf"); try (RandomAccessFile confAccessFile = new RandomAccessFile(confFile, "rw")) { // 设置文件长度 confAccessFile.setLength(param.getChunkTotal()); // 设置起始偏移量 confAccessFile.seek(param.getChunk()); // 将指定的一个字节写入文件中 127, confAccessFile.write(Byte.MAX_VALUE); } catch (IOException e) { logger.log(Level.SEVERE, "读取上传大文件临时文件异常", e); return false; } byte[] completeStatusList = FileUtils.readFileToByteArray(confFile); byte isComplete = Byte.MAX_VALUE; // 创建conf文件文件长度为总分片数, // 每上传一个分块即向conf文件中写入一个127, // 那么没上传的位置就是默认的0,已上传的就是Byte.MAX_VALUE 127 for (int i = 0; i < completeStatusList.length && isComplete == Byte.MAX_VALUE; i++) { // 按位与运算,将&两边的数转为二进制进行比较,有一个为0结果为0,全为1结果为1 eg.3&5 即 0000 0011 & 0000 0101 = 0000 0001 因此,3&5的值得1。 isComplete = (byte)(isComplete & completeStatusList[i]); System.out.println("check part " + i + " complete?:" + completeStatusList[i]); } if (isComplete == Byte.MAX_VALUE) { // 如果全部文件上传完成,删除conf文件 confFile.delete(); return true; } return false; } public static boolean isWindows() { return (OS.indexOf("win") >= 0); } // 在MappedByteBuffer释放后再对它进行读操作的话就会引发jvm crash,在并发情况下很容易发生 // 正在释放时另一个线程正开始读取,于是crash就发生了。 // 所以为了系统稳定性释放前一般需要检查是否还有线程在读或写 public static void freedMappedByteBuffer(final MappedByteBuffer mappedByteBuffer) { try { if (mappedByteBuffer == null) { return; } mappedByteBuffer.force(); AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Method getCleanerMethod = mappedByteBuffer.getClass().getMethod("cleaner", new Class[0]); // 可以访问private的权限 getCleanerMethod.setAccessible(true); // 在具有指定参数的 方法对象上调用此 方法对象表示的底层方法 sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(mappedByteBuffer, new Object[0]); cleaner.clean(); } catch (Exception e) { logger.log(Level.SEVERE, "clean MappedByteBuffer error!!!", e); } logger.info("clean MappedByteBuffer completed!!!"); return null; } }); } catch (Exception e) { e.printStackTrace(); } } }
临时文件上传到文件服务器(Seaweed)
/** * 文件系统上传文件的地址 */
@Value("${custom.fileTargetUrl}")
private String FILE_TARGET_URL ;
public String uploadFile(MultipartFileParam param) throws IOException {
String filePath = FileUtil.uploadFileByMappedByteBuffer(param);
if (StringUtils.isNotEmpty(filePath)) {
// 文件已经整合完了, 上传文件到服务器
String result = sendToFileStoreFromLocalPath(filePath);
String url = JSONObject.parseObject(result).get("fileUrl").toString();
// 文件上传完后, 删除临时文件
File temFile = new File(filePath);
temFile.delete();
return url;
}
return null;
}
/** * 获取本地文件,转存到文件系统 * @param filPath 本地文件地址 * @return */
public static String sendToFileStoreFromLocalPath(String filPath) {
if (filPath == null || "".equals(filPath)) {
return null;
}
HttpPost postRequest = new HttpPost(FILE_TARGET_URL);
try(
//获取本地文件输入流
InputStream inputStream = new FileInputStream(filPath);
CloseableHttpClient client = HttpClientBuilder.create().build()
) {
// 将流写入文件系统
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
builder.addBinaryBody("file", inputStream);
postRequest.setEntity(builder.build());
HttpResponse postResponse = client.execute(postRequest);
// 获取响应信息
String result = EntityUtils.toString(postResponse.getEntity());
postRequest.releaseConnection();
return result;
} catch (Exception e) {
log.info("获取本地文件,转存到文件系统出错:{}",e.getMessage(),e);
return null;
} finally {
postRequest.releaseConnection();
}
}
今天的文章大文件异步分片上传到Seaweed服务器分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/65507.html