Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
530 views
in Technique[技术] by (71.8m points)

multithreading - How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

I understand that for a normal Spindle Drive system, reading files using multiple threads is inefficient.

This is a different case, I have a high-throughput file systems available to me, which provides read speeds up to 3GB/s, with 196 CPU cores and 2TB RAM

A single threaded Java program reads the file with maximum 85-100 MB/s, so I have potential to get better than single thread. I have to read files as big as 1TB in size and I have enough RAM to load it.

Currently I use the following or something similar, but need to write something with multi-threading to get better throughput:

Java 7 Files: 50 MB/s

List<String> lines = Files.readAllLines(Paths.get(path), encoding);

Java commons-io: 48 MB/s

List<String> lines = FileUtils.readLines(new File("/path/to/file.txt"), "utf-8");

The same with guava: 45 MB/s

List<String> lines = Files.readLines(new File("/path/to/file.txt"), Charset.forName("utf-8"));

Java Scanner Class: Very Slow

Scanner s = new Scanner(new File("filepath"));
ArrayList<String> list = new ArrayList<String>();
while (s.hasNext()){
    list.add(s.next());
}
s.close();

I want to be able to load the file and build the same ArrayList, in the correct sorted sequence, as fast as possible.

There is another question that reads similar, but it is actually different, because of : The question is discussing about systems where multi-threaded file I/O is physically impossible to be efficient, but due to technological advancements, we now have systems that are designed to support high-throughput I/O , and so the limiting factor is CPU/SW , which can be overcome by multi-threading the I/O.

The other question does not answer how to write code to multi-thread I/O.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Here is the solution to read a single file with multiple threads.

Divide the file into N chunks, read each chunk in a thread, then merge them in order. Beware of lines that cross chunk boundaries. It is the basic idea as suggested by user slaks

Bench-marking below implementation of multiple-threads for a single 20 GB file:

1 Thread : 50 seconds : 400 MB/s

2 Threads: 30 seconds : 666 MB/s

4 Threads: 20 seconds : 1GB/s

8 Threads: 60 seconds : 333 MB/s

Equivalent Java7 readAllLines() : 400 seconds : 50 MB/s

Note: This may only work on systems that are designed to support high-throughput I/O , and not on usual personal computers

package filereadtests;

import java.io.*;
import static java.lang.Math.toIntExact;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
    try
    {
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

    } catch (Exception e)
    {
        e.printStackTrace();
    }
}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads

    //Max allocation size allowed is ~2GB
    if (chunk_size > (Integer.MAX_VALUE - 5))
    {
        chunk_size = (Integer.MAX_VALUE - 5);
    }

    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down
    executor.shutdown();

    //Wait for all threads to finish
    while (!executor.isTerminated())
    {
        //wait for infinity time
    }
    System.out.println("Finished all threads");
    fileInputStream.close();
}

}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...