Home  >  Article  >  Java  >  What is the method of Java multi-threaded breakpoint copying?

What is the method of Java multi-threaded breakpoint copying?

WBOY
WBOYforward
2023-04-13 17:31:19545browse

Details

I use a Timer class (java.util.Timer) to implement the breakpoint function. I use this class, Every once in a while A record is made, and the recorded content is the progress of each thread's replication.

Introduction to the Timer class:

A facility for threads to schedule tasks for future execution in a background thread. Tasks may be scheduled for one -time execution, or for repeated execution at regular intervals. A facility for threads to schedule tasks in background threads for future execution. Tasks can be scheduled to be executed once or to be repeated periodically.

According to the introduction in the API, it can be seen that this Timer class can only perform tasks once, or it can perform tasks periodically. (Note that this class is the java.util.Timer class, not the class under the javax package.)

This class has many time-related methods, which will not be introduced here. Those who are interested can learn more. Here we only introduce one method we need to use.

public void schedule(TimerTask task, long delay, long period)

Schedules the specified task for repeated fixed-delay execution beginning after the specified delay. Subsequent executions take place at approximately regular intervals separated by the specified period. Repeated fixed delay execution starting after delay. Subsequent executions occur at approximate intervals at specified intervals.

Use this method to record the copy progress information of each thread at a fixed time interval.

Code part

Timing task class

package dragon.local;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class RecordTask extends TimerTask {
public static final String filename = "breakPointRecord.txt";
private Timer timer;
private List copyThreads;
private String outputPath;

public RecordTask(Timer timer, List copyThreads, String outputPath) {
this.timer = timer;
this.copyThreads = copyThreads;
this.outputPath = outputPath;
}

@Override
public void run() {
try {
this.breakPointRecord();
} catch (IOException e) {
e.printStackTrace();
}
}

public void breakPointRecord() throws FileNotFoundException, IOException {
int aliveThreadNum = 0;  //存活线程数目
//不使用追加方式,这里只需要最新的记录即可。
File recordFile = new File(outputPath, filename);
try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(recordFile))){
//每次记录一个线程的下载位置,但是取出来又需要进行转换,太麻烦了。
//我们直接使用序列化来进行操作,哈哈!
long[] curlen = new long[4];
int index = 0;
for (FileCopyThread copyThread : copyThreads) {
if (copyThread.isAlive()) {
aliveThreadNum++;
}
curlen[index++] = copyThread.getCurlen();
System.out.println(index+" curlen: "+copyThread.getCurlen());
}
//创建 Record 对象,并序列化。
oos.writeObject(new Record(curlen));
}
//当所有的线程都死亡时,关闭计时器,删除记录文件。(所有线程死亡的话,就是文件已经复制完成了!)
if (aliveThreadNum == 0) {
timer.cancel();
recordFile.delete();
}
System.out.println("线程数量: "+aliveThreadNum);
}
}

Description:

if (aliveThreadNum == 0) {
timer.cancel();
recordFile.delete();
}

If the threads have ended, it means that the program has Normal execution ends.

Delete the record file at this time. The record file here is a flag. If the record file exists, it means that the program did not end normally. When it is started again, breakpoint copy will be performed.

Note: IO exceptions during the copy process are not considered here. If the thread throws an IO exception, then the thread's status is also over. However, considering that there are still relatively few IO exceptions in local file copying, I did not consider that if it is downloaded over the Internet, the function of this program may need to be improved.

Record information class

Every time, the information of each thread needs to be written in sequence, but it still needs to be converted after reading it, which still feels too troublesome. Here, we directly use Java sequences. ization mechanism. Sometimes, it is convenient to manipulate objects directly. Note: The subscript of the array represents the position of each thread.

package dragon.local;

import java.io.Serializable;

public class Record implements Serializable{
/**
 * 序列化 id
 */
private static final long serialVersionUID = 1L;
private long[] curlen;

public Record(long[] curlen) {
this.curlen = curlen;
} 

public long[] getCurlen() {
return this.curlen;
}
}

Copy thread class

package dragon.local;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileCopyThread extends Thread {
private int index;
private long position;
private long size;
private File targetFile;
private File outputFile;
private long curlen;      //当前下载的长度

public FileCopyThread(int index, long position, long size, File targetFile, File outputFile) {
this.index = index;
this.position = position;
this.size = size;
this.targetFile = targetFile;
this.outputFile = outputFile;
this.curlen = 0L;
}

@Override
public void run() {
try (
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(targetFile));
RandomAccessFile raf = new RandomAccessFile(outputFile, "rw")){
bis.skip(position);  //跳过不需要读取的字节数,注意只能先后跳
raf.seek(position);  //跳到需要写入的位置,没有这句话,会出错,但是很难改。
int hasRead = 0;
byte[] b = new byte[1024];
/**
 * 注意,每个线程只是读取一部分数据,不能只以 -1 作为循环结束的条件
 * 循环退出条件应该是两个,即写入的字节数大于需要读取的字节数 或者 文件读取结束(最后一个线程读取到文件末尾)
 */
while(curlen < size && (hasRead = bis.read(b)) != -1) {
raf.write(b, 0, hasRead);
curlen += (long)hasRead;
//强制停止程序。
//if (curlen > 17_000_000) {
//System.exit(0);
//}
}

System.out.println(index+" "+position+" "+curlen+" "+size);
} catch (IOException e) {
e.printStackTrace();
}
}

public long getCurlen() {   //获取当前的进度,用于记录,以便必要时恢复读取进度。
return position+this.curlen;
}
}

This code is copied to test breakpoints. If you want to test, you can adjust the conditions in the if judgment accordingly according to the size of the file you want to copy. If you want to test, you can first uncomment this code and then execute the program (then the program exits, and the file is not copied at this time.), and then comment this code and execute the program again, the file will be copied successfully.

//强制停止程序。
//if (curlen > 17_000_000) {
//System.exit(0);
//}

Copy Tool Class

package dragon.local;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;



/**
 * 设计思路:
 * 获取目标文件的大小,然后设置复制文件的大小(这样做是有好处的),
 * 然后使用将文件分为 n 分,使用 n 个线程同时进行复制(这里我将 n 取为 4)。
 * 
 * 进一步拓展:
 * 加强为断点复制功能,即程序中断以后,
 * 仍然可以继续从上次位置恢复复制,减少不必要的重复开销
 * */

public class FileCopyUtil {
//设置一个常量,复制线程的数量
private static final int THREAD_NUM = 4;

private FileCopyUtil() {}

/**
 * @param targetPath 目标文件的路径
 * @param outputPath 复制输出文件的路径
 * @throws IOException 
 * @throws ClassNotFoundException 
 * */
public static void transferFile(String targetPath, String outputPath) throws IOException, ClassNotFoundException {
File targetFile = new File(targetPath);
File outputFilePath = new File(outputPath);
if (!targetFile.exists() || targetFile.isDirectory()) {   //目标文件不存在,或者是一个文件夹,则抛出异常
throw new FileNotFoundException("目标文件不存在:"+targetPath);
}
if (!outputFilePath.exists()) {     //如果输出文件夹不存在,将会尝试创建,创建失败,则抛出异常。
if(!outputFilePath.mkdir()) {
throw new FileNotFoundException("无法创建输出文件:"+outputPath);
}
}

long len = targetFile.length();

File outputFile = new File(outputFilePath, "copy"+targetFile.getName());
createOutputFile(outputFile, len);    //创建输出文件,设置好大小。

//创建计时器 Timer 对象
Timer timer = new Timer();

long[] position = new long[4];
//每一个线程需要复制文件的起点
long size = len / FileCopyUtil.THREAD_NUM + 1;     //保存复制线程的集合
List copyThreads = new ArrayList<>();
Record record = getRecord(outputPath);

for (int i = 0; i < FileCopyUtil.THREAD_NUM; i++) {
//如果已经有了 记录文件,就从使用记录数据,否则就是新的下载。
position[i] = record == null ? i*size : record.getCurlen()[i];
FileCopyThread copyThread = new FileCopyThread(i, position[i], size, targetFile, outputFile);
copyThread.start();     //启动复制线程
copyThreads.add(copyThread);   //将复制线程添加到集合中。
}

timer.schedule(new RecordTask(timer, copyThreads, outputPath), 0L, 100L);  //立即启动计时器,每隔10秒记录一次位置。
System.out.println("开始了!");
}

//创建输出文件,设置好大小。
private static void createOutputFile(File file, long length) throws IOException {
try (   
RandomAccessFile raf = new RandomAccessFile(file, "rw")){
raf.setLength(length);
}
}

//获取以及下载的位置
private static Record getRecord(String outputPath) throws FileNotFoundException, IOException, ClassNotFoundException {
File recordFile = new File(outputPath, RecordTask.filename);
if (recordFile.exists()) {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(recordFile))){
return (Record) ois.readObject();
}
}
return null;
}
}

Description: Determine whether to start breakpoint copying based on whether there is a record file in the copied directory.

private static Record getRecord(String outputPath) throws FileNotFoundException, IOException, ClassNotFoundException {
File recordFile = new File(outputPath, RecordTask.filename);
if (recordFile.exists()) {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(recordFile))){
return (Record) ois.readObject();
}
}
return null;
}

Starting breakpoint copy turns out to be very simple. It is the same as copy, except that the starting copy position becomes the recorded position.

//如果已经有了 记录文件,就从使用记录数据,否则就是新的下载。
position[i] = record == null ? i*size : record.getCurlen()[i];

The above is the detailed content of What is the method of Java multi-threaded breakpoint copying?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete