public class RDFJoinNexus extends AbstractJoinNexus implements IJoinNexus
IProgram
execution support for the RDF DB.
The rules have potential parallelism when performing closure. Each join has potential parallelism as well for subqueries. We could even define a PARALLEL iterator flag and have parallelism across index partitions for a read-historical iterator since the data service locators are immutable for historical reads.
Rule-level parallelism (for fix point closure of a rule set) and join subquery-level parallelism could be distributed to available workers in a cluster. In a similar way, high-level queries could be distributed to workers in a cluster to evaluation. Such distribution would increase the practical parallelism beyond what a single machine could support as long as the total parallelism does not overload the cluster.
There is a pragmatic limit on the #of concurrent threads for a single host.
When those threads target a blocking queue, then thread contention becomes
very high and throughput drops dramatically. We can reduce this problem by
allocating a distinct UnsynchronizedArrayBuffer
to each task. The
task collects a 'chunk' in the UnsynchronizedArrayBuffer
. When full,
the buffer propagates onto a thread-safe buffer of chunks which flushes
either on an IMutableRelation
(mutation) or feeding an
IAsynchronousIterator
(high-level query). It is chunks themselves
that accumulate in this thread-safe buffer, so each add() on that buffer may
cause the thread to yield, but the return for yielding is an entire chunk in
the buffer, not just a single element.
There is one high-level buffer factory corresponding to each of the kinds of
ActionEnum
: AbstractJoinNexus.newQueryBuffer()
;
newInsertBuffer(IMutableRelation)
; and
AbstractJoinNexus.newDeleteBuffer(IMutableRelation)
. In addition there is one for
UnsynchronizedArrayBuffer
s -- this is a buffer that is NOT
thread-safe and that is designed to store a single chunk of elements, e.g.,
in an array E[N]).
Modifier and Type | Class and Description |
---|---|
static class |
RDFJoinNexus.InsertSPOAndJustificationBuffer<E>
Buffer writes on
IMutableRelation#insert(IChunkedIterator) when it is
flushed . |
Modifier and Type | Field and Description |
---|---|
protected static org.apache.log4j.Logger |
log |
chunkCapacity, chunkOfChunksCapacity, filter, indexManager, planFactory, readTimestamp, resourceLocator, solutionFlags
ALL, BINDINGS, ELEMENT, RULE
Constructor and Description |
---|
RDFJoinNexus(RDFJoinNexusFactory joinNexusFactory,
IIndexManager indexManager) |
Modifier and Type | Method and Description |
---|---|
IConstant |
fakeBinding(IPredicate pred,
Var var)
Return a 'fake' binding for the given variable in the specified
predicate.
|
IRuleStatisticsFactory |
getRuleStatisticsFactory()
The factory for rule statistics objects.
|
IAccessPath |
getTailAccessPath(IRelation relation,
IPredicate predicate)
When
backchain is true and the tail predicate is
reading on the SPORelation , then the IAccessPath is
wrapped so that the iterator will visit the backchained inferences as
well. |
ISortKeyBuilder<IBindingSet> |
newBindingSetSortKeyBuilder(IRule rule)
FIXME unit tests for DISTINCT with a head and ELEMENT, with bindings and
a head, with bindings but no head, and with a head but no bindings
(error).
|
IBuffer<ISolution[]> |
newInsertBuffer(IMutableRelation relation)
Overridden to handle justifications when using truth maintenance.
|
protected ISortKeyBuilder<?> |
newSortKeyBuilder(IPredicate<?> head)
Return the
ISortKeyBuilder used to impose DISTINCT on the
solutions generated by a query. |
bind, bind, forceSerialExecution, getAction, getBindingSetSerializer, getChunkCapacity, getChunkOfChunksCapacity, getFullyBufferedReadThreshold, getHeadRelationView, getIndexManager, getJoinNexusFactory, getMaxParallelSubqueries, getPlanFactory, getProperty, getProperty, getRangeCountFactory, getReadTimestamp, getRelationLocator, getRuleTaskFactory, getSolutionFilter, getSolutionSerializer, getTailRelationView, getWriteTimestamp, isEmptyProgram, locatorScan, newBindingSet, newDeleteBuffer, newQueryBuffer, newSolution, newUnsynchronizedBuffer, runDistributedProgram, runLocalProgram, runMutation, runProgram, runQuery, solutionFlags
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bind, bind, forceSerialExecution, getAction, getBindingSetSerializer, getChunkCapacity, getChunkOfChunksCapacity, getFullyBufferedReadThreshold, getHeadRelationView, getIndexManager, getJoinNexusFactory, getMaxParallelSubqueries, getPlanFactory, getProperty, getProperty, getRangeCountFactory, getReadTimestamp, getRuleTaskFactory, getSolutionFilter, getSolutionSerializer, getTailRelationView, getWriteTimestamp, locatorScan, newBindingSet, newDeleteBuffer, newQueryBuffer, newSolution, newUnsynchronizedBuffer, runMutation, runQuery, solutionFlags
public RDFJoinNexus(RDFJoinNexusFactory joinNexusFactory, IIndexManager indexManager)
joinNexusFactory
- The object used to create this instance and which can be used
to create other instances as necessary for distributed rule
execution.indexManager
- The object used to resolve indices, relations, etc.public IRuleStatisticsFactory getRuleStatisticsFactory()
IJoinNexus
getRuleStatisticsFactory
in interface IJoinNexus
getRuleStatisticsFactory
in class AbstractJoinNexus
public IAccessPath getTailAccessPath(IRelation relation, IPredicate predicate)
backchain
is true
and the tail predicate is
reading on the SPORelation
, then the IAccessPath
is
wrapped so that the iterator will visit the backchained inferences as
well. On the other hand, if IPredicate.getPartitionId()
is
defined (not -1
) then the returned access path will be for
the specified shard using the data service local index manager (
AbstractJoinNexus.indexManager
MUST be the data service local index manager for
this case) and expanders WILL NOT be applied (they require a view of the
total relation, not just a shard).getTailAccessPath
in interface IJoinNexus
getTailAccessPath
in class AbstractJoinNexus
relation
- The relation.predicate
- The predicate. When IPredicate.getPartitionId()
is
set, the returned IAccessPath
MUST read on the
identified local index partition (directly, not via RMI).InferenceEngine
,
BackchainAccessPath
public IConstant fakeBinding(IPredicate pred, Var var)
IJoinNexus
IKeyOrder
associated with the IAccessPath
that will be used to evaluate the predicate when it appears in the tail
of an IRule
for a given IEvaluationPlan
.fakeBinding
in interface IJoinNexus
pred
- The predicate.var
- A variable appearing in that predicate.public ISortKeyBuilder<IBindingSet> newBindingSetSortKeyBuilder(IRule rule)
AbstractJoinNexus.runQuery(IStep)
FIXME unit tests for SORT with and without DISTINCT and with the various
combinations used in the unit tests for DISTINCT. Note that SORT, unlike
DISTINCT, requires that all solutions are materialized before any
solutions can be returned to the caller. A lot of optimization can be
done for SORT implementations, including merge sort of large blocks (ala
map/reduce), using compressed sort keys or word sort keys with 2nd stage
disambiguation, etc.
FIXME Add property for sort {ascending,descending,none} to IRule
.
The sort order can also be specified in terms of a sequence of variables.
The choice of the variable order should be applied here.
FIXME The properties that govern the Unicode collator for the generated
sort keys should be configured by the RDFJoinNexusFactory
. In
particular, Unicode should be handled however it is handled for the
LexiconRelation
.newBindingSetSortKeyBuilder
in interface IJoinNexus
rule
- The rule that will determine the order imposed amoung the
bound variables (which variable is 1st, 2nd, 3rd, etc.).protected ISortKeyBuilder<?> newSortKeyBuilder(IPredicate<?> head)
AbstractJoinNexus
ISortKeyBuilder
used to impose DISTINCT on the
solutions generated by a query.newSortKeyBuilder
in class AbstractJoinNexus
head
- The head of the rule.ISortKeyBuilder
.public IBuffer<ISolution[]> newInsertBuffer(IMutableRelation relation)
Return a thread-safe buffer onto which chunks of computed
ISolution
s will be written. When the buffer is
flushed
the chunked ISolution
s will be
inserted into the IMutableRelation
.
Note: AbstractJoinNexus.getSolutionFilter()
is applied by
AbstractJoinNexus.newUnsynchronizedBuffer(IBuffer, int)
and NOT by the buffer
returned by this method.
newInsertBuffer
in interface IJoinNexus
newInsertBuffer
in class AbstractJoinNexus
relation
- The relation.Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.