This document explains the S3PrefetchingInputStream
and the various components it uses.
This input stream implements prefetching and caching to improve read performance of the input stream. A high level overview of this feature was published in Pinterest Engineering’s blog post titled “Improving efficiency and reducing runtime using S3 read optimization”.
With prefetching, the input stream divides the remote file into blocks of a fixed size, associates buffers to these blocks and then reads data into these buffers asynchronously. It also potentially caches these blocks.
Property | Meaning | Default |
---|---|---|
fs.s3a.prefetch.enabled |
Enable the prefetch input stream | false |
fs.s3a.prefetch.block.size |
Size of a block | 8M |
fs.s3a.prefetch.block.count |
Number of blocks to prefetch | 8 |
The default size of a block is 8MB, and the minimum allowed block size is 1 byte. Decreasing block size will increase the number of blocks to be read for a file. A smaller block size may negatively impact performance as the number of prefetches required will increase.
S3PrefetchingInputStream
- When prefetching is enabled, S3AFileSystem will return an instance of this class as the input stream. Depending on the remote file size, it will either use the S3InMemoryInputStream
or the S3CachingInputStream
as the underlying input stream.
S3InMemoryInputStream
- Underlying input stream used when the remote file size < configured block size. Will read the entire remote file into memory.
S3CachingInputStream
- Underlying input stream used when remote file size > configured block size. Uses asynchronous prefetching of blocks and caching to improve performance.
BlockData
- Holds information about the blocks in a remote file, such as:
BufferData
- Holds the buffer and additional information about it such as:
CachingBlockManager
- Implements reading data into the buffer, prefetching and caching.
BufferPool
- Manages a fixed sized pool of buffers. It’s used by CachingBlockManager
to acquire buffers.
S3File
- Implements operations to interact with S3 such as opening and closing the input stream to the remote file in S3.
S3Reader
- Implements reading from the stream opened by S3File
. Reads from this input stream in blocks of 64KB.
FilePosition
- Provides functionality related to tracking the position in the file. Also gives access to the current buffer in use.
SingleFilePerBlockCache
- Responsible for caching blocks to the local file system. Each cache block is stored on the local disk as a separate block file.
For a remote file with size 5MB, and block size = 8MB, since file size is less than the block size, the S3InMemoryInputStream
will be used.
If the caller makes the following read calls:
in.read(buffer, 0, 3MB); in.read(buffer, 0, 2MB);
When the first read is issued, there is no buffer in use yet. The S3InMemoryInputStream
gets the data in this remote file by calling the ensureCurrentBuffer()
method, which ensures that a buffer with data is available to be read from.
The ensureCurrentBuffer()
then:
S3Reader.read(ByteBuffer buffer, long offset, int size)
.S3Reader
uses S3File
to open an input stream to the remote file in S3 by making a getObject()
request with range as (0, filesize)
.S3Reader
reads the entire remote file into the provided buffer, and once reading is complete closes the S3 stream and frees all underlying resources.FilePosition
so it can be accessed by the input stream.The read operation now just gets the required bytes from the buffer in FilePosition
.
When the second read is issued, there is already a valid buffer which can be used. Don’t do anything else, just read the required bytes from this buffer.
If there is a remote file with size 40MB and block size = 8MB, the S3CachingInputStream
will be used.
If the caller makes the following calls:
in.read(buffer, 0, 5MB) in.read(buffer, 0, 8MB)
For the first read call, there is no valid buffer yet. ensureCurrentBuffer()
is called, and for the first read()
, prefetch count is set as 1.
The current block (block 0) is read synchronously, while the blocks to be prefetched (block 1) is read asynchronously.
The CachingBlockManager
is responsible for getting buffers from the buffer pool and reading data into them. This process of acquiring the buffer pool works as follows:
Once a buffer has been acquired by CachingBlockManager
, if the buffer is in a READY state, it is returned. This means that data was already read into this buffer asynchronously by a prefetch. If its state is BLANK then data is read into it using S3Reader.read(ByteBuffer buffer, long offset, int size).
For the second read call, in.read(buffer, 0, 8MB)
, since the block sizes are of 8MB and only 5MB of block 0 has been read so far, 3MB of the required data will be read from the current block 0. Once all data has been read from this block, S3CachingInputStream
requests the next block ( block 1), which will already have been prefetched and so it can just start reading from it. Also, while reading from block 1 it will also issue prefetch requests for the next blocks. The number of blocks to be prefetched is determined by fs.s3a.prefetch.block.count
.
The CachingInputStream
also caches prefetched blocks. This happens when read()
is issued after a seek()
outside the current block, but the current block still has not been fully read.
For example, consider the following calls:
in.read(buffer, 0, 5MB) in.seek(10MB) in.read(buffer, 0, 4MB) in.seek(2MB) in.read(buffer, 0, 4MB)
For the above read sequence, after the seek(10MB)
call is issued, block 0 has not been read completely so the subsequent call to read()
will cache it, on the assumption that the caller will probably want to read from it again.
After seek(2MB)
is called, the position is back inside block 0. The next read can now be satisfied from the locally cached block file, which is typically orders of magnitude faster than a network based read.
NB: seek()
is implemented lazily, so it only keeps track of the new position but does not otherwise affect the internal state of the stream. Only when a read()
is issued, it will call the ensureCurrentBuffer()
method and fetch a new block if required.