H
- The generic type of the value returned by Callable.call()
for the master.E
- The generic type of the elements in the chunks stored in the
BlockingBuffer
.S
- The generic type of the subtask implementation class.L
- The generic type of the locator object used to lookup a subtask in
the internal map (must be unique and must implement hashCode() and
equals() per their contracts).public abstract class AbstractMasterTask<H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>,E,S extends AbstractSubtask,L> extends AbstractHaltableProcess implements Callable<H>, IMasterTask<E,H>
BlockingBuffer
and distributes those chunks to
subtasks according to some abstraction which is not defined by this class.
KVO
[] chunks, in which each KVO
represents a tuple to be written on a scale-out index. The buffer is drained
by a master task, which transfers chunks to sinks tasks, each of which writes
on a specific index partition.
The master is provisioned with a CTOR which is used to convert the
KVO
s into unsigned byte[]
keys and
byte[]
values for writes on the scale-out index. The
master may be provisioned with a mechanism to filter out duplicate tuples.
Writes on the index partitions are asynchronous with respect to the
application. However, a KVOC
/ KVOLatch
combination can be
used to coordinate notification when some set of tuples of interest have been
successfully written onto the scale-out index. This combination can also be
used to pass back values from the write operation if they are assigned by
side-effect onto the KVO.obj
reference.
The asynchronous write implementation is divided into a master, with an input queue and a redirect queue, and sinks, each of which has an input queue and writes chunks onto a specific index partition for the scale-out index. The input queues for the master and the sinks are bounded. The redirect input queue is unbounded. The master and each sink is assigned its own worker thread.
The master transfers chunks from its input queue to the sinks. It polls the redirect queue for a chunk. If that queue was empty, then it polls a chunk from the input queue. If no chunks are available, it needs to check again. The master stops polling the input queue when the input queue is closed, but it continues to drain the redirect queue until all sinks are done or the master is canceled.
Note: The requirement for polling arises because: (a) we are not coordinating signals for the arrival of a chunk on the input or redirect queues; and (b) a chunk can be redirected at any time if there is an outstanding write by a sink on an index partition.
The atomic decision to terminate the master is made using a lock
.
The lock is specific to the life cycle of the sinks. The lock is held when a
sink is created. When a sink terminates, its last action is to grab the lock
and signal the subtaskDone
Condition
. The master terminates
when, while holding the lock, it observes that no sinks are running AND the
redirect queue is empty. Since chunks are placed onto the redirect queue by
sinks (and there are no sinks running) and by the master (which is not
issuing a redirect since it is running its termination logic) these criteria
are sufficient for termination. However, the sink must ensure that its buffer
is closed before it terminates, even if it terminates by exception, so that
an attempt to transfer a chunk to the sink will not block forever.
Once the master is holding a chunk, it splits the chunk into a set of dense
chunks correlated with the locators of the index partitions on which those
chunks will be written. The split operation is NOT atomic, but it is
consistent in the following sense. If a Split
is identified based on
old information, then the chunk will be directed to an index partition which
no longer exists. An attempt to write on that index partition will result in
a stale locator exception, which is handled.
Once the chunk has been split, the split chunks are transferred to the appropriate sink(s). Since the master is not holding any locks, a blocking put() may be used to transfer the chunk to sink.
The sink drain chunks from its input queue. If the input queue is empty and
the idle timeout expires before a chunk is transferred to the input queue,
then the sink will be asynchronously closed and an
IdleTimeoutException
will be set on the input queue. If the master
attempts to transfer a chunk to the sink's input queue after the idle
timeout, then an exception wrapping the idle timeout exception will be
thrown. The master handles the wrapped idle timeout exception by re-opening
the sink and will retry the transfer of the chunk to the (new) sink's input
queue. After the sink closes it's input queue by an idle timeout, it will
continue to drain the input queue until it is empty, at which point the sink
will terminate (this handles the case where the master concurrently
transferred a chunk to the sink's input queue before it was closed by the
idle time out).
The sink combines chunks drained from its input queue until the target chunk size for a write is achieved or until the chunk timeout for the sink is exceeded. The sink then writes on the index partition corresponding to its locator. This write occurs in the thread assigned to the sink and the sink will block during the write request.
If a stale locator exception is received by the sink in response to a write, it will: (a) notify the client of the stale locator exception; (b) close the input queue, setting the stale locator exception as the cause; (c) place the chunk for that write onto the master's (unbounded) redirect queue; and (d) drain its input queue and transfer the chunks to the master's redirect queue.
If the master attempts to transfer a chunk to the input queue for a sink which has closed its input queue in response to a stale locator exception, then an exception will be thrown with the stale locator exception as the inner cause. The master will trap that exception and place the chunk on the redirect queue instead.
If the sink RMI is successful, the sink will invoke the optional result handler and touch each tuple in the chunk using KVO#done(). These protocols can be used to pass results from asynchronous writes back to the application.
ISplitter
Modifier and Type | Field and Description |
---|---|
protected BlockingBuffer<E[]> |
buffer
The top-level buffer on which the application is writing.
|
protected static org.apache.log4j.Logger |
log |
protected long |
sinkIdleTimeoutNanos
The timeout in nanoseconds before closing an idle output sink.
|
protected long |
sinkPollTimeoutNanos
The time in nanoseconds that the
sink will wait
inside of the IAsynchronousIterator when it polls the iterator
for a chunk. |
protected IAsynchronousIterator<E[]> |
src
The iterator draining the
buffer . |
H |
stats
Statistics for this (and perhaps other) masters.
|
Constructor and Description |
---|
AbstractMasterTask(H stats,
BlockingBuffer<E[]> buffer,
long sinkIdleTimeoutNanos,
long sinkPollTimeoutNanos) |
Modifier and Type | Method and Description |
---|---|
protected void |
addToOutputBuffer(L locator,
E[] a,
int fromIndex,
int toIndex,
boolean reopen)
Resolves the output buffer onto which the split must be written and adds
the data to that output buffer.
|
H |
call() |
BlockingBuffer<E[]> |
getBuffer()
The top-level buffer on which the application is writing.
|
int |
getRedirectQueueSize()
The #of chunks on the master's redirectQueue.
|
protected S |
getSink(L locator,
boolean reopen)
Return the sink for the locator.
|
H |
getStats()
The statistics.
|
protected abstract void |
handleChunk(E[] chunk,
boolean reopen)
Handle the next chunk of elements from the
buffer . |
void |
mapOperationOverSubtasks(SubtaskOp<S> op)
Maps an operation across the subtasks.
|
protected void |
moveSinkToFinishedQueueAtomically(L locator,
AbstractSubtask sink)
Transfer a sink from
sinks to finishedSubtaskQueue . |
protected abstract S |
newSubtask(L locator,
BlockingBuffer<E[]> out)
Factory for a new subtask.
|
protected abstract BlockingBuffer<E[]> |
newSubtaskBuffer()
Factory for a new buffer for a subtask.
|
protected boolean |
nothingPending()
Extension hook for implementations where the clients accept work for
asynchronous processing and notify the master as work items completed
successfully or fail.
|
protected void |
notifySubtaskDone(AbstractSubtask subtask)
Notify the master that a subtask is done.
|
protected void |
redirectChunk(E[] chunk)
Places a chunk onto the master's redirectQueue.
|
protected abstract void |
submitSubtask(FutureTask<? extends AbstractSubtaskStats> subtask)
Submit the subtask to an
Executor . |
protected void |
willShutdown()
Extension hook invoked when the master's buffer is exhausted by
awaitAll() . |
halt, halted
protected static final transient org.apache.log4j.Logger log
protected final BlockingBuffer<E[]> buffer
protected final IAsynchronousIterator<E[]> src
public final H extends AbstractMasterStats<L,? extends AbstractSubtaskStats> stats
protected final long sinkIdleTimeoutNanos
protected final long sinkPollTimeoutNanos
sink
will wait
inside of the IAsynchronousIterator
when it polls the iterator
for a chunk. If this value is too large then the sink will block for
noticeable lengths of time and will be less responsive to interrupts.
Something in the 10s of milliseconds is appropriate.public AbstractMasterTask(H stats, BlockingBuffer<E[]> buffer, long sinkIdleTimeoutNanos, long sinkPollTimeoutNanos)
stats
- Statistics for the master.buffer
- The buffer on which data is written by the application and
from which it is drained by the master.sinkIdleTimeoutNanos
- The time in nanoseconds after which an idle sink will be
closed. Any buffered writes are flushed when the sink is
closed. This must be GTE the sinkChunkTimeout
otherwise the sink will decide that it is idle when it was
just waiting for enough data to prepare a full chunk.sinkPollTimeoutNanos
- The time in nanoseconds that the sink
will wait inside of the IAsynchronousIterator
when it
polls the iterator for a chunk. If this value is too large
then the sink will block for noticeable lengths of time and
will be less responsive to interrupts. Something in the 10s of
milliseconds is appropriate.public final int getRedirectQueueSize()
protected final void redirectChunk(E[] chunk) throws InterruptedException
chunk
- The chunk.InterruptedException
public BlockingBuffer<E[]> getBuffer()
IMasterTask
getBuffer
in interface IMasterTask<E,H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>>
public void mapOperationOverSubtasks(SubtaskOp<S> op) throws InterruptedException, ExecutionException
op
- The operation, which should be light weightInterruptedException
ExecutionException
- if a subtask throws an exception.protected void notifySubtaskDone(AbstractSubtask subtask) throws InterruptedException
finishedSubtaskQueue
queue. The master polls that queue in
call()
and awaitAll()
and checks the Future
of
each finished subtask using drainFutures()
. If a Future
reports an error, then the master is halted. This is how we ensure that
all subtasks complete normally.subtask
- The subtask.InterruptedException
public H getStats()
IMasterTask
getStats
in interface IMasterTask<E,H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>>
public H call() throws Exception
call
in interface Callable<H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>>
Exception
protected abstract void handleChunk(E[] chunk, boolean reopen) throws InterruptedException
buffer
.chunk
- A chunk.reopen
- When false
it is an error if the output buffer
has been closed. When true
the output buffer
will be (re-)opened as necessary. This will be
false
when invoked by call()
(since
the output buffers are not closed until the master's buffer is
closed) and should be true
if you are handling
redirects.InterruptedException
protected boolean nothingPending()
AbstractMasterTask
will not terminate
unless this method returns true
when queried while holding
the lock
. A true return indicates that there are no pending
work items. The default implementation returns true
.
The work perform by the client must be idempotent (it must be safe to re-perform the operation). Pending work items may be in an unknown state, the master may submit the same work item to multiple clients (where that is permitted by the locator semantics), the client task may fail before the work item is complete, and a failed client can cause work items associated with that client to be posted to another client.
To handle master termination, the pending set must track outstanding work items. Those work item should be removed from the pending set as soon as any client has successfully completed that work item (since the work is idempotent).
To handle client failure, the subtask must track the pending set for its
client. If the client dies, then the subtask must handle the client by
placing all pending work items for that client (including any in the
chunk for the current request) onto the redirectQueue
.
protected void willShutdown() throws InterruptedException
awaitAll()
. The default implementation is a NOP.InterruptedException
protected S getSink(L locator, boolean reopen) throws InterruptedException
newSubtaskBuffer()
and
newSubtask(Object, BlockingBuffer)
.
Note: The caller is single threaded since this is invoked from the master's thread. This code depends on that assumption.
locator
- The locator (unique subtask key).reopen
- true
IFF a closed buffer should be re-opened
(in fact, this causes a new buffer to be created and the new
buffer will be drained by a new
IndexPartitionWriteTask
).IllegalArgumentException
- if the argument is null
.InterruptedException
- if interrupted.RuntimeException
- if AbstractHaltableProcess.halted()
protected abstract BlockingBuffer<E[]> newSubtaskBuffer()
protected abstract S newSubtask(L locator, BlockingBuffer<E[]> out)
locator
- The unique key for the subtask.out
- The BlockingBuffer
on which the master will write for
that subtask.protected abstract void submitSubtask(FutureTask<? extends AbstractSubtaskStats> subtask)
Executor
.subtask
- The FutureTask
used to execute thee subtask.protected void moveSinkToFinishedQueueAtomically(L locator, AbstractSubtask sink) throws InterruptedException
sinks
to finishedSubtaskQueue
. The
entry for the locator is removed from sinks
atomically IFF that
map the given sink is associated with the given locator in
that map.
This is done atomically using the lock
. This is invoked both by
AbstractSubtask.call()
(when it is preparing to exit call()) and
by getSink(Object, boolean)
(when it discovers that the sink for
a locator is closed, but not yet finished with its work).
Note: The sink MUST be atomically transferred from sinks
to
finishedSubtaskQueue
and awaitAll()
MUST verify that
both are empty in order for the termination condition to be atomic.
locator
- The locator.sink
- The sink.InterruptedException
protected void addToOutputBuffer(L locator, E[] a, int fromIndex, int toIndex, boolean reopen) throws InterruptedException
split
- The Split
identifies both the tuples to be dispatched
and the PartitionLocator
on which they must be
written.a
- The array of tuples. Only those tuples addressed by the
split will be written onto the output buffer.reopen
- true
IFF a closed buffer should be re-opened (in
fact, this causes a new buffer to be created and the new
buffer will be drained by a new AbstractSubtask
).InterruptedException
- if the thread is interrupted.Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.