public abstract class JoinMasterTask extends Object implements IStepTask, IJoinMaster
IRule
s. For
query, this task should be run by the client that wishes to materialize the
query results. For mutation, this task may be run by any client or service
since the data does not flow through the master for mutation.
For the first join dimension, the JoinMasterTask
creates a
JoinTask
per index partition that will be spanned by the
IAccessPath
for the first IPredicate
in the evaluation order
and feeds each JoinTask
(s) in the first join dimension with an
IAsynchronousIterator
reading on a buffer containing single empty
IBindingSet
.
Each JoinTask
consumes IBindingSet
chunks read from the
previous join dimension. For each IBindingSet
chunk read, a new
IAccessPath
is obtained. Elements are then read from than
IAccessPath
in chunks. Given the IBindingSet
used to obtain
the IAccessPath
, a new IBindingSet
is created for each
element in each chunk read from the IAccessPath
. If the new
IBindingSet
satisfies the constraint(s) on the IRule
then it
will be output to the next join dimension. An IBindingSet
is output
by placing it onto the UnsynchronizedArrayBuffer
for the join
dimension. Periodically that UnsynchronizedArrayBuffer
will overflow,
and a chunk of IBindingSet
s will be placed onto the
IBlockingBuffer
from which the next join dimension will read its
IBindingSet
chunks.
The last join dimension is slightly different. Its
UnsynchronizedArrayBuffer
writes onto the
IJoinNexus.newQueryBuffer()
,
IJoinNexus.newInsertBuffer(com.bigdata.relation.IMutableRelation)
, or
IJoinNexus.newDeleteBuffer(com.bigdata.relation.IMutableRelation)
depending on the ActionEnum
.
For each JoinTask
, once its source iterator(s) have been exhausted
and the IAccessPath
reading from the last source IBindingSet
has been exhausted, then the JoinTask
for that join dimension is done
and it will flush its UnsynchronizedArrayBuffer
and close its output
IBuffer
and wait for the downstream JoinTask
s to report their
RuleStats
. Those RuleStats
are aggregated and passed back to
its caller in turn.
Each join dimension is single-threaded. Coordination of resources is achieved
using the output buffer for each join dimension. This allows a source join
dimension to read ahead and forces the sink join dimension to process chunks
of IBindingSet
s at a time.
The JoinMasterTask
is responsible for the JoinTask
s for the
first join dimension. Each JoinTask
is responsible for the downstream
JoinTask
s. If the JoinMasterTask
is interrupted or cancelled,
then it interrupts or cancels the JoinTask
s for the first join
dimension. If JoinTask
is interrupted or cancelled then it must
cancel any JoinTask
s which it has created for the next join
dimension.
This is a hard requirement when computing the fix point closure of a rule
(set). Each round of closure MUST be evaluated against the commit time
reported by IBigdataFederation.getLastCommitTime()
and is applied for
all rules in that round. This allows unisolated tasks to write on the
generated solutions onto the indices. This is a strong requirement since the
JoinTask
s will otherwise wind up holding an exclusive lock on the
ITx.UNISOLATED
index partitions, which would cause a deadlock when
attempting to write the generated solutions onto the index partitions. At the
start of the next round of closure, simply update the read-historical
timestamp to the then current value of
IBigdataFederation.getLastCommitTime()
.
Queries that use ITx.READ_COMMITTED
or ITx.UNISOLATED
will
not generate deadlocks, but they are subject to abort from the
split/join/move of index partition(s) during query evaluation. This problem
WILL NOT arise if you read instead from the
IBigdataFederation.getLastCommitTime()
.
JoinMasterTask
must distribute
the JoinTask
s such that they run inside of the
ConcurrencyManager
on the various DataService
s on which the
index partitions reside from which the IAccessPath
s must read. This
allows the IAccessPath
to read on the local index object and reduces
the message traffic to pulling chunks of IBindingSet
s from the source
JoinTask
s.
For the JoinMasterTask
and for each JoinTask
, the fan out of
JoinTask
s is determined by the #of index partitions that are spanned
by the IAccessPath
s required to evaluate the IBindingSet
s for
the next join dimension. The IAccessPath
will not be used by the
source join dimension to read on the index, merely to discover the index
partitions to which the generating IBindingSet
s must be assigned. The
index partition spanned for a given IBindingSet
is determined by
generating an as bound IPredicate
for the next join dimension,
instantiating the IAccessPath
on the source join dimension that will
be used by the target join dimension, and then using a locator scan for the
fromKey and toKey for that IAccessPath
. In the case
where the IPredicate
is fully bound, the IAccessPath
will be
restricted to a single index partition, but we still need to know which index
partition.
The IBindingSet
is written on an UnsynchronizedArrayBuffer
corresponding to the target index partition. The
UnsynchronizedArrayBuffer
(together with the output IBuffer
for the IBindingSet
chunks and the Future
for the
JoinTask
for that index partition) for the target index partition
exists in an LRU. If it falls off of the end of the LRU, then the
UnsynchronizedArrayBuffer
is flushed and the output IBuffer
is closed. The downstream JoinTask
will eventually exhaust the
corresponding IAsynchronousIterator
source.
When the source join dimension and the sink join dimension have the same
IKeyOrder
there will be an orderly progression through the indices
and each sink JoinTask
can be safely closed once a JoinTask
is created on the DataService
for the next index partition. However,
the IKeyOrder
s offer differ, which can lead to more scattered
assignment of output IBindingSet
s to index partitions. The LRU helps
to manage this fan out.
Fan out means that there may be N>1 JoinTask
s for each join
dimension. For this reason, a QUERY ISlice
must be applied by the
client reading on the IAsynchronousIterator
returned by the
JoinMasterTask
.
Fan out also implies a requirement for fan-in in order to reduce the scatter
of JoinTask
s. Fan-in must aggregate the source JoinTask
such
that they target the same sink JoinTask
instance for the same rule
execution instance, the same orderIndex (hence the same IPredicate
),
and the same index partition. This means that a factory mechanism must be
used to either create a new JoinTask
or return the existing
JoinTask
on the DataService
based on those identifying
properties. This must be done in a thread-safe manner, but contention should
be restricted to the case where the identifying properties are the same. The
factory must be given the IAsynchronousIterator
reading
IBindingSet
chunks from the source join dimension and the
JoinTask
must not close (unless interrupted or cancelled) until all
of its source IAsynchronousIterator
s have been exhausted.
Modifier and Type | Field and Description |
---|---|
protected static boolean |
DEBUG
True iff the
log level is DEBUG or less. |
protected IJoinNexus |
joinNexus |
protected IJoinNexusFactory |
joinNexusFactory |
protected JoinStats[] |
joinStats
Statistics on
JoinTask behavior for each IPredicate in
the tail of the rule. |
protected static org.apache.log4j.Logger |
log |
protected UUID |
masterUUID
The unique identifier for this
JoinMasterTask instance. |
protected int[] |
order
The evaluation order.
|
protected IRule |
rule |
protected IRuleState |
ruleState |
protected RuleStats |
ruleStats |
protected IBuffer<ISolution[]> |
solutionBuffer
From the ctor.
|
protected int |
tailCount |
Modifier | Constructor and Description |
---|---|
protected |
JoinMasterTask(IRule rule,
IJoinNexus joinNexus,
IBuffer<ISolution[]> solutionBuffer) |
Modifier and Type | Method and Description |
---|---|
protected void |
awaitAll(List<Future<Void>> futures,
long timeout,
TimeUnit unit)
Make sure that each
JoinTask completed successfully. |
RuleStats |
call()
Evaluate the rule.
|
protected void |
combineJoinStats()
|
IBuffer<ISolution[]> |
getSolutionBuffer()
Returns the buffer specified to the ctor (overridden for distributed
joins).
|
UUID |
getUUID()
Return a unique identifier for the
JoinMasterTask instance. |
protected ThickAsynchronousIterator<IBindingSet[]> |
newBindingSetIterator(IBindingSet bindingSet)
Return an
IAsynchronousIterator that will read a single
IBindingSet . |
void |
report(JoinStats joinStats)
Aggregates the statistics for some join dimension.
|
protected static final org.apache.log4j.Logger log
protected static final boolean DEBUG
log
level is DEBUG or less.protected final IRule rule
protected final IJoinNexus joinNexus
protected final IJoinNexusFactory joinNexusFactory
protected final IBuffer<ISolution[]> solutionBuffer
IBuffer
on which the last join
dimension writes the computed ISolution
s.
Note: LocalJoinMasterTask
always passes this along to the last
LocalJoinTask
.
Note: For a DistributedJoinMasterTask
running a Query this gets
proxied and the DistributedJoinTask
s all write on the proxy.
However, the DistributedJoinMasterTask
DOES NOT proxy this for
mutation in order to keep all data from flowing through the master.
protected final int tailCount
protected final IRuleState ruleState
protected final int[] order
protected final RuleStats ruleStats
protected final JoinStats[] joinStats
JoinTask
behavior for each IPredicate
in
the tail of the rule. These statistics are reported by each
JoinTask
and then aggregated for each join dimension.
Note: The index into this array is the evaluation order of the predicate.
protected final UUID masterUUID
JoinMasterTask
instance.protected JoinMasterTask(IRule rule, IJoinNexus joinNexus, IBuffer<ISolution[]> solutionBuffer)
rule
- The rule to be executed.joinNexus
- The IJoinNexus
.solutionBuffer
- The ISolution
buffer.public final UUID getUUID()
IJoinMaster
JoinMasterTask
instance. This
is used to concentrate all DistributedJoinTask
that target the
same tail predicate and index partition onto the same
DistributedJoinTask
sink.getUUID
in interface IJoinMaster
protected void awaitAll(List<Future<Void>> futures, long timeout, TimeUnit unit) throws ExecutionExceptions, InterruptedException, TimeoutException
JoinTask
completed successfully.
Note: This waits until all JoinTask
s complete, regardless of
their outcome (or until the timeout expires), so that all
JoinTask
have the opportunity to report their JoinStats
to the JoinMasterTask
.
futures
- The Future
for each JoinTask
that was created
by the JoinMasterTask
.timeout
- The timeout for awaiting those futures.unit
- The unit for that timeout.ExecutionExceptions
- if one or more JoinTask
s fail.InterruptedException
- if the JoinMasterTask
itself was interrupted while
awaiting its JoinTask
s.TimeoutException
- if the timeout expires first.protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator(IBindingSet bindingSet)
IAsynchronousIterator
that will read a single
IBindingSet
.bindingSet
- the binding set.protected void combineJoinStats()
JoinTask
onto ruleStats
.
There are N JoinTask
s per IPredicate
in the tail of the
rule, where N is the #of index partitions on which we must read to
evaluate the IRule
for a given IPredicate
in the tail (N
is per IPredicate
, not the same for each IPredicate
).public void report(JoinStats joinStats)
report
in interface IJoinMaster
joinStats
- Statistics for an index partition of some join dimension.public IBuffer<ISolution[]> getSolutionBuffer() throws IOException
getSolutionBuffer
in interface IJoinMaster
IOException
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.