public abstract class WriteCacheService extends Object implements IWriteCache
WriteCacheService
is provisioned with some number of
WriteCache
buffers and a writer thread. Caller's populate
WriteCache
instances. When they are full, they are transferred to a
queue which is drained by the thread writing on the local disk. Hooks are
provided to wait until the current write set has been written (e.g., at a
commit point when the cached writes must be written through to the backing
channel). This implementation supports high availability using a write
replication pipeline.
A pool of WriteCache
instances is used. Readers test all of the
WriteCache
using a shared ConcurrentMap
and will return
immediately the desired record or null
if the record is not in
any of the WriteCache
instances. Write caches remain available to
readers until they need to be recycled as the current write cache (the one
servicing new writes).
The WriteCacheService
maintains a dirty list of WriteCache
instances. A single thread handle writes onto the disk and onto the write
replication pipeline (for HA). When the caller calls flush() on the write
cache service it flush() the current write cache is transferred to the dirty
list and then wait until the write cache instances now on the dirty list have
been serviced. In order to simplify the design and the provide boundary
conditions for HA decision making, writers block during
flush(boolean, long, TimeUnit)
.
Instances of this class are used by both the RWStrategy
and the
WORMStrategy
. These classes differ in how they allocate space on the
backing file and in the concurrency which they permit for writers.
WORMStrategy
WORMStrategy
serializes all calls to
#writeChk(long, ByteBuffer, int)
since it must guarantee the precise
offset at which each record is written onto the backing file. As a
consequence of its design, each WriteCache
is a single contiguous
chunk of data and is transferred directly to a known offset on the disk. This
append only strategy makes for excellent transfer rates to the disk.RWStrategy
RWStrategy
only needs to serialize the decision making about
the offset at which the records are allocated. Since the records may be
allocated at any location in the backing file, each WriteCache
results in a scattered write on the disk.WORMStrategy
and the RWStrategy
implementations need
to also establish a read-write lock to prevent changes in the file extent
from causing corrupt data for concurrent read or write operations on the
file. See #writeChk(long, ByteBuffer, int)
for more information on
this issue (it is a workaround for a JVM bug).
IAddressManager.getByteCount(long)
must be the actual on the disk
record length, not the size of the record when it reaches the application
layer. This on the disk length is the adjusted size after optional
compression and with the optional checksum. Applications which assume that
lengthOf(addr) == byte[].length will break, but that's life.
Start with hotCache AND hotReserve. If new reserve needed, because existing one is now used, try and compress new readCache into current hotCache - if won't fit, then call resetWith and lose those writes, cycle again, moving front hotCache to readList and compress that one.
LIMIT: If we begin with full caches with above threshold hitCounts then the whole list will cycle around until we hit original cache which will contain records with zero hitCounts - for practical purposes ignoring any concurrent reads.
WriteCache
Modifier and Type | Class and Description |
---|---|
static class |
WriteCacheService.AsynchronousCloseException
An instance of this exception is thrown if a thread notices that the
WriteCacheService was closed by a concurrent process. |
Modifier and Type | Field and Description |
---|---|
protected static org.apache.log4j.Logger |
log |
Constructor and Description |
---|
WriteCacheService(int nwriteBuffers,
int minCleanListSize,
int nreadBuffers,
boolean prefixWrites,
int compactionThreshold,
int hotCacheSize,
int hotCacheThreshold,
boolean useChecksum,
long fileExtent,
IReopenChannel<? extends Channel> opener,
Quorum quorum,
IBackingReader reader)
Allocates N buffers from the
DirectBufferPool . |
Modifier and Type | Method and Description |
---|---|
ByteBuffer |
_readFromCache(long offset,
int nbytes)
Attempt to read record from cache (either write cache or read cache
depending on the service map state).
|
String |
addrDebugInfo(long paddr)
An array of writeCache actions is maintained that can be used
to provide a breadcrumb of how that address has been written, saved,
freed or removed.
|
protected boolean |
canCompact()
Return
true iff we are allowed to compact buffers. |
boolean |
clearWrite(long offset,
int latchedAddr)
Called to check if a write has already been flushed.
|
void |
close()
Permanently take the
IWriteCache out of service. |
void |
debugAddrs(long offset,
int length,
char c) |
protected void |
finalized()
Ensures that
close() is eventually invoked so the buffers can be
returned to the DirectBufferPool . |
void |
flush(boolean force)
Flush the current write set through to the backing channel.
|
boolean |
flush(boolean force,
long timeout,
TimeUnit units)
Flush the writes to the backing channel but does not force anything to
the backing channel.
|
CounterSet |
getCounters()
Return the performance counters for the
WriteCacheService . |
protected Quorum<HAPipelineGlue,QuorumMember<HAPipelineGlue>> |
getQuorum()
The object which manages
Quorum state changes on the behalf of
this service. |
long |
getSendCount()
Return the #of
WriteCache blocks sent by the quorum leader to
the first downstream follower. |
long |
getSequence()
Return the then current write cache block sequence number.
|
boolean |
installReads(WriteCache cache) |
boolean |
isPresent(long addr)
Return
true iff the address is in the write
cache at the moment which the write cache is checked. |
abstract WriteCache |
newWriteCache(IBufferAccess buf,
boolean useChecksum,
boolean bufferHasData,
IReopenChannel<? extends Channel> opener,
long fileExtent)
Factory for
WriteCache implementations. |
protected Callable<Void> |
newWriteTask() |
ByteBuffer |
read(long offset,
int nbytes)
This is a non-blocking query of all write cache buffers (current, clean
and dirty).
|
void |
reset()
Reset the write cache, discarding any writes which have not been written
through to the backing channel yet.
|
void |
resetAndClear() |
long |
resetSequence()
Called from
IBufferStrategy.commit() and reset() to
reset WriteCache sequence for HA synchronization. |
void |
setExtent(long fileExtent)
Set the extent of the file on the current
WriteCache . |
boolean |
write(long offset,
ByteBuffer data,
int chk)
Write the record on the cache.
|
boolean |
write(long offset,
ByteBuffer data,
int chk,
boolean useChecksum,
int latchedAddr)
Write the record onto the cache.
|
protected boolean |
writeLargeRecord(long offset,
ByteBuffer data,
int chk,
boolean useChecksum)
Write a record whose size (when combined with the optional checksum) is
larger than the capacity of an individual
WriteCache buffer. |
public WriteCacheService(int nwriteBuffers, int minCleanListSize, int nreadBuffers, boolean prefixWrites, int compactionThreshold, int hotCacheSize, int hotCacheThreshold, boolean useChecksum, long fileExtent, IReopenChannel<? extends Channel> opener, Quorum quorum, IBackingReader reader) throws InterruptedException
DirectBufferPool
.nwriteBuffers
- The #of WriteCache
buffers.minCleanListSize
- The maximum #of WriteCache
buffers on the
dirtyList
before we start to evict WriteCache
buffers to the disk -or- ZERO (0) to use a default value. maxDirtyListSize LTE nbuffers-4
such that we have
at least: (1) for [current], (1) for [compactingCache], (1)
for reserve and (1) buffer left available on the
cleanList
.prefixWrites
- When true
, the WriteCacheService
is
supporting an RWS mode store and each WriteCache
buffer will directly encode the fileOffset of each record
written onto the WriteCache
. When false
,
the WriteCacheService
is supporting a WORM mode store
and the WriteCache
buffers contain the exact data to
be written onto the backing store.compactionThreshold
- The minimum percentage of space that could be reclaimed before
we will attempt to coalesce the records in a
WriteCache
buffer. When 100
, compaction
is explicitly disabled.
Note: This is ignored for WORM mode backing stores since we can not compact the buffer in that mode.
useChecksum
- true
iff record level checksums are enabled.fileExtent
- The current extent of the backing file.opener
- The object which knows how to (re-)open the channel to which
cached writes are flushed.quorumManager
- The object which manages Quorum
state changes on the
behalf of this service.InterruptedException
protected Quorum<HAPipelineGlue,QuorumMember<HAPipelineGlue>> getQuorum()
Quorum
state changes on the behalf of
this service.protected boolean canCompact()
true
iff we are allowed to compact buffers. The
default implementation of the WriteCache
is for a Worm and can
never compact.
Note: This method is package private for access by
WriteCacheService
.
public long resetSequence()
IBufferStrategy.commit()
and reset()
to
reset WriteCache sequence for HA synchronization. The return value winds
up propagated to the IRootBlockView.getBlockSequence()
field in
the IRootBlockView
s.public long getSequence()
public abstract WriteCache newWriteCache(IBufferAccess buf, boolean useChecksum, boolean bufferHasData, IReopenChannel<? extends Channel> opener, long fileExtent) throws InterruptedException
WriteCache
implementations.buf
- The backing buffer (optional).useChecksum
- true
iff record level checksums are enabled.bufferHasData
- true
iff the buffer has data to be written onto
the local persistence store (from a replicated write).opener
- The object which knows how to re-open the backing channel
(required).fileExtent
- The then current extent of the backing file.WriteCache
wrapping that buffer and able to write on
that channel.InterruptedException
public void reset() throws InterruptedException
IAtomicStore
level is responsible for
ensuring that processes do not see old data after an abort. This is
generally handled by re-loading the appropriate root block and
reinitializing various things from that root block.
All dirty buffers are reset and transferred to the head of the clean list. The buffers on the clean list are NOT reset since they may contain valid cached reads (data which is known to be on the disk). We do not want to discard the read cache on reset().
Note: This approach deliberately does not cause any buffers belonging to
the caller of #writeChk(long, ByteBuffer, int)
to become part of
the cleanList
.
Note: You MUST set the file extent
after resetting
the WriteCacheService
.
This is necessary in order to ensure that the correct file extent is
communicated along the write replication pipeline when high availability
is enabled.
Note: reset()
MUST NOT interrupt readers. It should only reset
those aspects of the write cache state that are associated with writes.
On the other hand, close()
must close all buffers and must not
permit readers to read from closed buffers.
reset
in interface IWriteCache
InterruptedException
public void resetAndClear() throws InterruptedException
InterruptedException
public void close()
IWriteCache
IWriteCache
out of service. Dirty records
are discarded, not flushed.close
in interface IWriteCache
protected void finalized() throws Throwable
close()
is eventually invoked so the buffers can be
returned to the DirectBufferPool
.Throwable
public void flush(boolean force) throws InterruptedException
flush
in interface IWriteCache
InterruptedException
public boolean flush(boolean force, long timeout, TimeUnit units) throws TimeoutException, InterruptedException
flush() is a blocking method. At most one flush() operation may run at a
time. The current
buffer is moved to the dirtyList
while holding the ReentrantReadWriteLock.WriteLock
and flush() then waits until the
dirtyList becomes empty, at which point all dirty records have been
written through to the backing file.
Note: Any exception thrown from this method MUST trigger error handling
resulting in a high-level abort() and reset()
of the
WriteCacheService
.
TODO flush() is currently designed to block concurrent writes() in
order to give us clean decision boundaries for the HA write pipeline and
also to simplify the internal locking design. Once we get HA worked out
cleanly we should explore whether or not we can relax this constraint
such that writes can run concurrently with flush(). That would have
somewhat higher throughput since mutable B+Tree evictions would no longer
cause concurrent tasks to block during the commit protocol or the file
extent protocol. [Perhaps by associating each write set with a distinct
sequence counter (that is incremented by both commit and abort)?]
TODO Flush should order ALL WriteCache
's on the dirtyList by
their fileOffset and then evict them in that order. This reordering will
maximize the opportunity for locality during the IOs. With a large write
cache (multiple GBs) this reordering could substantially reduce the
IOWait associated with flush() for a large update. Note: The reordering
should only be performed by the leader in HA mode - the followers will
receive the WriteCache
blocks in the desired order and can just
drop them onto the dirtyList.
flush
in interface IWriteCache
TimeoutException
InterruptedException
- FIXME The [force] parameter is ignored and will be removed
shortly.WriteTask
,
dirtyList
,
dirtyListEmpty
public void setExtent(long fileExtent) throws IllegalStateException, InterruptedException
WriteCache
. The then
current value of the extent will be communicated together with the rest
of the WriteCache
state if it is written onto another service
using the write replication pipeline (HA only). The receiver will use the
value read from the WriteCache
message to adjust the extent of
its backing file.
Note: Changes in the file extent for persistence store implementations
MUST (a) be mutually exclusive with reads and writes on the backing file
(due to a JVM bug); and (b) force the file data and the file metadata to
the disk. Thus any change in the fileExtent
MUST be followed by
a flush(boolean, long, TimeUnit)
.
Note: You MUST set the file extent each time you invoke reset()
so the WriteCacheService
is always aware of the correct file
extent.
public boolean write(long offset, ByteBuffer data, int chk) throws InterruptedException, IllegalStateException
IWriteCache
write
in interface IWriteCache
offset
- The file offset of that record in the backing file.data
- The record. The bytes from the current
Buffer.position()
to the
Buffer.limit()
will be written and the
Buffer.position()
will be advanced to the
Buffer.limit()
. The caller may subsequently
modify the contents of the buffer without changing the state
of the cache (i.e., the data are copied into the cache).chk
- The checksum of the data (optional). When checksums are
not enabled this should be ZERO (0). When checksums are
enabled, #read(long)
will validate the checksum before
returning data.true
iff the caller's record was transferred to the
cache. When false
, there is not enough room left in
the write cache for this record.InterruptedException
IllegalStateException
- If the buffer is closed.public boolean write(long offset, ByteBuffer data, int chk, boolean useChecksum, int latchedAddr) throws InterruptedException, IllegalStateException
When integrating with the RWStrategy
or the WORMStrategy
there needs to be a read/write lock such that file extension is mutually
exclusive with file read/write operations (due to a Sun bug). The caller
can override #newWriteCache(ByteBuffer, IReopenChannel)
to
acquire the necessary lock (the read lock of a ReadWriteLock
).
This is even true when the record is too large for the cache since we
delegate the write to a temporary WriteCache
wrapping the
caller's buffer.
Note: Any exception thrown from this method MUST trigger error handling
resulting in a high-level abort() and reset()
of the
WriteCacheService
.
latchedAddr
- The latched address (RWStore only).true
since the record is always accepted by the
WriteCacheService
(unless an exception is thrown).InterruptedException
IllegalStateException
http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6371642
public void debugAddrs(long offset, int length, char c)
protected boolean writeLargeRecord(long offset, ByteBuffer data, int chk, boolean useChecksum) throws InterruptedException, IllegalStateException
WriteCache
buffer. This
operation is synchronous (to protect the ByteBuffer from concurrent
modification by the caller). It will block until the record has been
written.
This implementation will write the record onto a sequence of
WriteCache
objects and wait until all of those objects have been
written through to the backing file and the optional HA write pipeline. A
checksum will be appended after the last chunk of the record. This
strategy works for the WORM since the bytes will be laid out in a
contiguous region on the disk.
Note: For the WORM, this code MUST NOT allow the writes to proceed out of order or the data will not be laid out correctly on the disk !!!
Note: The RW store MUST NOT permit individual allocations whose size on
the disk is greater than the capacity of an individual WriteCache
buffer (@todo Or is this Ok? Perhaps it is if the RW store holds a lock
across the write for a large record? Maybe if we also add a low-level
method for inserting an entry into the record map?)
Note: This method DOES NOT register the record with the shared
serviceMap
. Since the record spans multiple WriteCache
objects it can not be directly recovered without reading it from the
backing file.
The WORM does not have these limits on the allocation size, so it seems likely that breaking it up across multiple WriteCache buffer instances would have to be done inside of the WriteCacheService in order to prevent checksums from being interleaved with each WriteCache worth of data it emits for a large record. We can't raise this out of the WriteCacheService because the large record would not be replicated for HA.
public boolean installReads(WriteCache cache) throws InterruptedException
InterruptedException
public ByteBuffer read(long offset, int nbytes) throws InterruptedException, ChecksumError
This implementation DOES NOT throw an IllegalStateException
if
the service is already closed NOR if there is an asynchronous close of
the service. Instead it just returns null
to indicate a
cache miss.
read
in interface IWriteCache
offset
- The file offset of that record in the backing file.nbytes
- The length of the record (decoded from the address by the
caller).null
iff the record does not lie
within the IWriteCache
. When non-null, this will be a
newly allocated exact fit mutable ByteBuffer
backed by a
Java byte[]
. The buffer will be flipped to prepare
for reading (the position will be zero and the limit will be the
#of bytes read). The data DOES NOT include the bytes used to code
checksum even when checksums are enabled.InterruptedException
ChecksumError
- if checksums are enabled and the checksum for the record
could not be validated.public ByteBuffer _readFromCache(long offset, int nbytes) throws ChecksumError, InterruptedException
ChecksumError
InterruptedException
public boolean clearWrite(long offset, int latchedAddr)
If dirty WriteCache
s are flushed in order then it does not
matter, however, if we want to be able to combine WriteCache
s
then it makes sense that there are no duplicate writes.
On reflection this is more likely needed since for the RWStore
,
depending on session parameters, the same cached area could be
overwritten. We could still maintain multiple writes but we need a
guarantee of order when retrieving data from the write cache (newest
first).
So the question is, whether it is better to keep cache consistent or to constrain with read order?
offset
- the address to checkpublic String addrDebugInfo(long paddr)
Write errors often show up as a checksum error, so the length of data written to the address cab be crucial information in determining the root of any problem.
address
- for which info requestedpublic boolean isPresent(long addr)
true
iff the address is in the write
cache at the moment which the write cache is checked.
Note: Unless the caller is holding an appropriate lock across this operation, the result is NOT guaranteed to be correct at any time other than the moment when the cache was tested.
public CounterSet getCounters()
WriteCacheService
.public long getSendCount()
WriteCache
blocks sent by the quorum leader to
the first downstream follower.Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.