public abstract class JoinMasterTask extends Object implements IStepTask, IJoinMaster
IRules. For
query, this task should be run by the client that wishes to materialize the
query results. For mutation, this task may be run by any client or service
since the data does not flow through the master for mutation.
For the first join dimension, the JoinMasterTask creates a
JoinTask per index partition that will be spanned by the
IAccessPath for the first IPredicate in the evaluation order
and feeds each JoinTask(s) in the first join dimension with an
IAsynchronousIterator reading on a buffer containing single empty
IBindingSet.
Each JoinTask consumes IBindingSet chunks read from the
previous join dimension. For each IBindingSet chunk read, a new
IAccessPath is obtained. Elements are then read from than
IAccessPath in chunks. Given the IBindingSet used to obtain
the IAccessPath, a new IBindingSet is created for each
element in each chunk read from the IAccessPath. If the new
IBindingSet satisfies the constraint(s) on the IRule then it
will be output to the next join dimension. An IBindingSet is output
by placing it onto the UnsynchronizedArrayBuffer for the join
dimension. Periodically that UnsynchronizedArrayBuffer will overflow,
and a chunk of IBindingSets will be placed onto the
IBlockingBuffer from which the next join dimension will read its
IBindingSet chunks.
The last join dimension is slightly different. Its
UnsynchronizedArrayBuffer writes onto the
IJoinNexus.newQueryBuffer(),
IJoinNexus.newInsertBuffer(com.bigdata.relation.IMutableRelation), or
IJoinNexus.newDeleteBuffer(com.bigdata.relation.IMutableRelation)
depending on the ActionEnum.
For each JoinTask, once its source iterator(s) have been exhausted
and the IAccessPath reading from the last source IBindingSet
has been exhausted, then the JoinTask for that join dimension is done
and it will flush its UnsynchronizedArrayBuffer and close its output
IBuffer and wait for the downstream JoinTasks to report their
RuleStats. Those RuleStats are aggregated and passed back to
its caller in turn.
Each join dimension is single-threaded. Coordination of resources is achieved
using the output buffer for each join dimension. This allows a source join
dimension to read ahead and forces the sink join dimension to process chunks
of IBindingSets at a time.
The JoinMasterTask is responsible for the JoinTasks for the
first join dimension. Each JoinTask is responsible for the downstream
JoinTasks. If the JoinMasterTask is interrupted or cancelled,
then it interrupts or cancels the JoinTasks for the first join
dimension. If JoinTask is interrupted or cancelled then it must
cancel any JoinTasks which it has created for the next join
dimension.
This is a hard requirement when computing the fix point closure of a rule
(set). Each round of closure MUST be evaluated against the commit time
reported by IBigdataFederation.getLastCommitTime() and is applied for
all rules in that round. This allows unisolated tasks to write on the
generated solutions onto the indices. This is a strong requirement since the
JoinTasks will otherwise wind up holding an exclusive lock on the
ITx.UNISOLATED index partitions, which would cause a deadlock when
attempting to write the generated solutions onto the index partitions. At the
start of the next round of closure, simply update the read-historical
timestamp to the then current value of
IBigdataFederation.getLastCommitTime().
Queries that use ITx.READ_COMMITTED or ITx.UNISOLATED will
not generate deadlocks, but they are subject to abort from the
split/join/move of index partition(s) during query evaluation. This problem
WILL NOT arise if you read instead from the
IBigdataFederation.getLastCommitTime().
JoinMasterTask must distribute
the JoinTasks such that they run inside of the
ConcurrencyManager on the various DataServices on which the
index partitions reside from which the IAccessPaths must read. This
allows the IAccessPath to read on the local index object and reduces
the message traffic to pulling chunks of IBindingSets from the source
JoinTasks.
For the JoinMasterTask and for each JoinTask, the fan out of
JoinTasks is determined by the #of index partitions that are spanned
by the IAccessPaths required to evaluate the IBindingSets for
the next join dimension. The IAccessPath will not be used by the
source join dimension to read on the index, merely to discover the index
partitions to which the generating IBindingSets must be assigned. The
index partition spanned for a given IBindingSet is determined by
generating an as bound IPredicate for the next join dimension,
instantiating the IAccessPath on the source join dimension that will
be used by the target join dimension, and then using a locator scan for the
fromKey and toKey for that IAccessPath. In the case
where the IPredicate is fully bound, the IAccessPath will be
restricted to a single index partition, but we still need to know which index
partition.
The IBindingSet is written on an UnsynchronizedArrayBuffer
corresponding to the target index partition. The
UnsynchronizedArrayBuffer (together with the output IBuffer
for the IBindingSet chunks and the Future for the
JoinTask for that index partition) for the target index partition
exists in an LRU. If it falls off of the end of the LRU, then the
UnsynchronizedArrayBuffer is flushed and the output IBuffer
is closed. The downstream JoinTask will eventually exhaust the
corresponding IAsynchronousIterator source.
When the source join dimension and the sink join dimension have the same
IKeyOrder there will be an orderly progression through the indices
and each sink JoinTask can be safely closed once a JoinTask
is created on the DataService for the next index partition. However,
the IKeyOrders offer differ, which can lead to more scattered
assignment of output IBindingSets to index partitions. The LRU helps
to manage this fan out.
Fan out means that there may be N>1 JoinTasks for each join
dimension. For this reason, a QUERY ISlice must be applied by the
client reading on the IAsynchronousIterator returned by the
JoinMasterTask.
Fan out also implies a requirement for fan-in in order to reduce the scatter
of JoinTasks. Fan-in must aggregate the source JoinTask such
that they target the same sink JoinTask instance for the same rule
execution instance, the same orderIndex (hence the same IPredicate),
and the same index partition. This means that a factory mechanism must be
used to either create a new JoinTask or return the existing
JoinTask on the DataService based on those identifying
properties. This must be done in a thread-safe manner, but contention should
be restricted to the case where the identifying properties are the same. The
factory must be given the IAsynchronousIterator reading
IBindingSet chunks from the source join dimension and the
JoinTask must not close (unless interrupted or cancelled) until all
of its source IAsynchronousIterators have been exhausted.
| Modifier and Type | Field and Description |
|---|---|
protected static boolean |
DEBUG
True iff the
log level is DEBUG or less. |
protected IJoinNexus |
joinNexus |
protected IJoinNexusFactory |
joinNexusFactory |
protected JoinStats[] |
joinStats
Statistics on
JoinTask behavior for each IPredicate in
the tail of the rule. |
protected static org.apache.log4j.Logger |
log |
protected UUID |
masterUUID
The unique identifier for this
JoinMasterTask instance. |
protected int[] |
order
The evaluation order.
|
protected IRule |
rule |
protected IRuleState |
ruleState |
protected RuleStats |
ruleStats |
protected IBuffer<ISolution[]> |
solutionBuffer
From the ctor.
|
protected int |
tailCount |
| Modifier | Constructor and Description |
|---|---|
protected |
JoinMasterTask(IRule rule,
IJoinNexus joinNexus,
IBuffer<ISolution[]> solutionBuffer) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
awaitAll(List<Future<Void>> futures,
long timeout,
TimeUnit unit)
Make sure that each
JoinTask completed successfully. |
RuleStats |
call()
Evaluate the rule.
|
protected void |
combineJoinStats()
|
IBuffer<ISolution[]> |
getSolutionBuffer()
Returns the buffer specified to the ctor (overridden for distributed
joins).
|
UUID |
getUUID()
Return a unique identifier for the
JoinMasterTask instance. |
protected ThickAsynchronousIterator<IBindingSet[]> |
newBindingSetIterator(IBindingSet bindingSet)
Return an
IAsynchronousIterator that will read a single
IBindingSet. |
void |
report(JoinStats joinStats)
Aggregates the statistics for some join dimension.
|
protected static final org.apache.log4j.Logger log
protected static final boolean DEBUG
log level is DEBUG or less.protected final IRule rule
protected final IJoinNexus joinNexus
protected final IJoinNexusFactory joinNexusFactory
protected final IBuffer<ISolution[]> solutionBuffer
IBuffer on which the last join
dimension writes the computed ISolutions.
Note: LocalJoinMasterTask always passes this along to the last
LocalJoinTask.
Note: For a DistributedJoinMasterTask running a Query this gets
proxied and the DistributedJoinTasks all write on the proxy.
However, the DistributedJoinMasterTask DOES NOT proxy this for
mutation in order to keep all data from flowing through the master.
protected final int tailCount
protected final IRuleState ruleState
protected final int[] order
protected final RuleStats ruleStats
protected final JoinStats[] joinStats
JoinTask behavior for each IPredicate in
the tail of the rule. These statistics are reported by each
JoinTask and then aggregated for each join dimension.
Note: The index into this array is the evaluation order of the predicate.
protected final UUID masterUUID
JoinMasterTask instance.protected JoinMasterTask(IRule rule, IJoinNexus joinNexus, IBuffer<ISolution[]> solutionBuffer)
rule - The rule to be executed.joinNexus - The IJoinNexus.solutionBuffer - The ISolution buffer.public final UUID getUUID()
IJoinMasterJoinMasterTask instance. This
is used to concentrate all DistributedJoinTask that target the
same tail predicate and index partition onto the same
DistributedJoinTask sink.getUUID in interface IJoinMasterprotected void awaitAll(List<Future<Void>> futures, long timeout, TimeUnit unit) throws ExecutionExceptions, InterruptedException, TimeoutException
JoinTask completed successfully.
Note: This waits until all JoinTasks complete, regardless of
their outcome (or until the timeout expires), so that all
JoinTask have the opportunity to report their JoinStats
to the JoinMasterTask.
futures - The Future for each JoinTask that was created
by the JoinMasterTask.timeout - The timeout for awaiting those futures.unit - The unit for that timeout.ExecutionExceptions - if one or more JoinTasks fail.InterruptedException - if the JoinMasterTask itself was interrupted while
awaiting its JoinTasks.TimeoutException - if the timeout expires first.protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator(IBindingSet bindingSet)
IAsynchronousIterator that will read a single
IBindingSet.bindingSet - the binding set.protected void combineJoinStats()
JoinTask onto ruleStats.
There are N JoinTasks per IPredicate in the tail of the
rule, where N is the #of index partitions on which we must read to
evaluate the IRule for a given IPredicate in the tail (N
is per IPredicate, not the same for each IPredicate).public void report(JoinStats joinStats)
report in interface IJoinMasterjoinStats - Statistics for an index partition of some join dimension.public IBuffer<ISolution[]> getSolutionBuffer() throws IOException
getSolutionBuffer in interface IJoinMasterIOExceptionCopyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.