public class AsynchronousOverflowTask extends Object implements Callable<Object>
IndexSegment
from the writes buffered on the prior
journal. This is done in order to clear the dependencies on the historical
journals. If there are deleted tuples in the buffered writes, then they are
propagated to the index segment.IndexSegment
. This is a compacting merge. Delete markers will not be
present in the generated IndexSegment
.Processing is divided into two stages:
#chooseTasks()
#runTasks()
AbstractAtomicUpdateTask
. The atomic update tasks use
ITx.UNISOLATED
operations on the live journal to make atomic updates
to the index partition definitions and to the MetadataService
and/or
a remote data service where necessary.
Note: This task is invoked after an OverflowManager.overflow()
. It is
run on the ResourceManager
's ExecutorService
so that its
execution is asynchronous with respect to the IConcurrencyManager
.
While it does not require any locks for its own processing stages, it relies
on the OverflowManager.overflowAllowed
flag to disallow additional
overflow operations until it has completed. The various actions taken by this
task are submitted to the IConcurrencyManager
so that they will
obtain the appropriate locks as necessary on the named indices.
Modifier and Type | Field and Description |
---|---|
protected static org.apache.log4j.Logger |
log |
Constructor and Description |
---|
AsynchronousOverflowTask(ResourceManager resourceManager,
OverflowMetadata overflowMetadata) |
Modifier and Type | Method and Description |
---|---|
Object |
call()
Note: This task is interrupted by
OverflowManager.shutdownNow() . |
protected List<AbstractTask> |
chooseJoins()
Scans the registered named indices and decides which ones (if any) are
undercapacity and should be joined.
|
protected List<AbstractTask> |
chooseScatterSplits()
Choose index partitions for scatter split operations.
|
protected List<AbstractTask> |
chooseSplitBuildOrMerge(boolean compactingMerge)
For each index (partition) that has not been handled, decide whether we
will:
Split the index partition.
Compacting merge - build an
IndexSegment the
FusedView of the the index partition.
Incremental build - build an IndexSegment from the writes
absorbed by the mutable BTree on the old journal (this removes
the dependency on the old journal as of its lastCommitTime); or
Note: Compacting merges are decided in two passes. |
protected List<AbstractTask> |
chooseTasks(boolean forceCompactingMerges)
Examine each named index on the old journal and decide what, if anything,
to do with that index.
|
protected ILoadBalancerService |
getLoadBalancerService()
Return the
ILoadBalancerService if it can be discovered. |
protected static boolean |
isNormalShutdown(ResourceManager resourceManager,
Throwable t)
These are all good indicators that the data service was shutdown.
|
protected void |
putUsed(String name,
String action)
Deprecated.
This is no longer valid as many index partitions are entered
onto BOTH the buildQueue and the mergeQueue rather than
exclusively being assigned one task or the other.
|
protected <T> void |
runTasks(List<AbstractTask<T>> tasks)
Submit all tasks, awaiting their completion and check their futures for
errors.
|
protected <T> void |
runTasksConcurrent(List<AbstractTask<T>> tasks)
Runs the overflow tasks in parallel, cancelling any tasks which have not
completed if we run out of time.
|
protected <T> void |
runTasksInSingleThread(List<AbstractTask<T>> tasks)
Runs the overflow tasks one at a time, stopping when the journal needs to
overflow again, when we run out of time, or when there are no more tasks
to be executed.
|
protected boolean |
shouldMove(ILoadBalancerService loadBalancerService)
Figure out if this data service is considered to be highly utilized, in
which case the DS should shed some index partitions.
|
public AsynchronousOverflowTask(ResourceManager resourceManager, OverflowMetadata overflowMetadata)
resourceManager
- overflowMetadata
- protected void putUsed(String name, String action)
name
- The name of the index partition.IllegalStateException
- if the index partition was already used by some other
operation.protected List<AbstractTask> chooseScatterSplits()
protected List<AbstractTask> chooseJoins()
If the rightSibling of an undercapacity index partition is also local
then a JoinIndexPartitionTask
is created to join those index
partitions and both index partitions will be marked as "used".
If the rightSibling of an undercapacity index partition is remote, then a
MoveTask
is created to move the undercapacity index partition to
the remove data service and the undercapacity index partition will be
marked as "used".
protected ILoadBalancerService getLoadBalancerService()
ILoadBalancerService
if it can be discovered.ILoadBalancerService
if it can be discovered and
otherwise null
.protected boolean shouldMove(ILoadBalancerService loadBalancerService)
Note: We consult the load balancer service on this since it is able to put the load of this service into perspective by also considering the load on the other services in the federation.
loadBalancerService
- The load balancer.protected List<AbstractTask> chooseTasks(boolean forceCompactingMerges) throws Exception
LocalPartitionMetadata
describes the key
range partition and identifies the historical resources required to
present a coherent view of that index partition.
Note: Overflow actions which define a new index partition (Split, Join, and Move) all require a phase (which is part of their atomic update tasks) in which they will block the application. This is necessary in order for them to "catch up" with buffered writes on the new journal - those writes need to be incorporated into the new index partition.
An incremental build is generally faster than a compacting merge because
it only copies those writes that were buffered on the mutable
BTree
. However, an incremental build must copy ALL tuples,
including deleted tuples, so it can do more work and does not cause the
rangeCount() to be reduced since the deleted tuples are preserved.
MetadataIndex
. If
that rightSibling is local (same ResourceManager
) then we will
JOIN the index partitions. Otherwise we will MOVE the undercapacity index
partition to the IDataService
on which its rightSibling was
found.
We can choose which index partitions to move fairly liberally. Cold index partitions are not consuming any CPU/RAM/IO resources and moving them to another host will not effect the utilization of either the source or the target host. Moving an index partition which is "hot for write" can impose a noticeable latency because the "hot for write" partition will have absorbed more writes on the journal while we are moving the data from the old view and we will need to move those writes as well. When we move those writes the index will be unavailable for write until it appears on the target data service. Therefore we generally choose to move "warm" index partitions since it will introduce less latency when we temporarily suspend writes on the index partition.
Indices typically have many commit points, and any one of them could
become "hot for read". However, moving an index partition is not going to
reduce the load on the old node for historical reads since we only move
the current state of the index, not its history. Nodes that are hot for
historical reads spots should be handled by increasing its replication
count and reading from the secondary data services. Note that
ITx.READ_COMMITTED
and ITx.UNISOLATED
both count against
the "live" index - read-committed reads always track the most recent
state of the index partition and would be moved if the index partition
was moved.
Bottom line: if a node is hot for historical read then increase the replication count and read from failover services. If a node is hot for read-committed and unisolated operations then move one or more of the warm read-committed/unisolated index partitions to a node with less utilization.
Index partitions that get a lot of action are NOT candidates for moves unless the node itself is either overutilized, about to exhaust its DISK, or other nodes are at very low utilization. We always prefer to move the "warm" index partitions instead.
ILoadBalancerService
; (b) temporary files SHOULD be purged; it
MAY choose to shed indices that are "hot for write" since that will slow
down the rate at which the disk space is consumed; (d) index partitions
may be aggressively moved off of the LDS; (e) the transaction service MAY
reduce the retention period; and (f) as a last resort, the transaction
service MAY invalidate read locks, which implies that read or read-write
transactions will be aborted.
FIXME implement suggestions for handling cases when we are nearing DISK
exhaustion (aggressive release of resources, which should not depend on
asynchronous overflow but rather be part of a monitoring thread or an
inspection task run in each group commit).
FIXME read locks for read-committed operations. For example, queries to
the mds should use a read-historical tx so that overflow in the mds will
not cause the views to be discarded during data service overflow. In
fact, we can just create a read-historical transaction when we start data
service overflow and then pass it into the rest of the process, aborting
that tx when overflow is complete.
FIXME make the atomic update tasks truly atomic using full transactions
and/or distributed locks and correcting actions.Exception
protected List<AbstractTask> chooseSplitBuildOrMerge(boolean compactingMerge)
IndexSegment
the
FusedView
of the the index partition.IndexSegment
from the writes
absorbed by the mutable BTree
on the old journal (this removes
the dependency on the old journal as of its lastCommitTime); orcompactingMerge
- When true
a compacting merge will be performed
for all index partitions.
Remove all support for splits from this method. Splits are
decided by CompactingMergeTask
. Run the split logic
against the IndexSegment
to choose the separatorKeys.
Then submit the split task.
public Object call() throws Exception
OverflowManager.shutdownNow()
.
Therefore it tests Thread.isInterrupted()
and returns immediately
if it has been interrupted.protected <T> void runTasks(List<AbstractTask<T>> tasks) throws InterruptedException
InterruptedException
protected <T> void runTasksInSingleThread(List<AbstractTask<T>> tasks) throws InterruptedException
InterruptedException
protected <T> void runTasksConcurrent(List<AbstractTask<T>> tasks) throws InterruptedException
tasks
- InterruptedException
OverflowManager#overflowTasksConcurrent}
protected static boolean isNormalShutdown(ResourceManager resourceManager, Throwable t)
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.