public class ChunkedRunningQuery extends AbstractRunningQuery
IRunningQuery
implementation based on the assignment of
IChunkMessage
(s) to an operator task. Operators (other than those
with "operator-at-once" evaluation semantics) will typically executed
multiple times, consuming at least one IChunkMessage
each time they
are evaluated. IChunkMessage
s target a specific operator (bopId) and
shard (shardId). In scale-out, binding sets will be mapped across the target
access path and may be replicated to one or more nodes depending on the
distribution of the shards. This evaluation strategy is compatible with both
the Journal
(aka standalone) and the IBigdataFederation
(aka
clustered or scale-out).
Note: The challenge with this implementation is managing the amount of data
buffered on the JVM heap without introducing control structures which can
result in deadlock or starvation. This has been addressed to a large extent
by sharing a lock between this class and the per-operator input work queues
using modified version of the JSR 166 classes. For high volume operator at
once evaluation, we need to buffer the data on the native process heap using
the IMemoryManager
.
ERR_DUPLICATE_IDENTIFIER, ERR_NO_SUCH_BOP, ERR_NOT_CONTROLLER, ERR_NOT_PIPELINE_START, ERR_QUERY_DONE, lock
Constructor and Description |
---|
ChunkedRunningQuery(QueryEngine queryEngine,
UUID queryId,
boolean controller,
IQueryClient clientProxy,
PipelineOp query,
IChunkMessage<IBindingSet> realSource) |
Modifier and Type | Method and Description |
---|---|
protected boolean |
acceptChunk(IChunkMessage<IBindingSet> msg)
Make a chunk of binding sets available for consumption by the query.
|
protected boolean |
cancelRunningOperators(boolean mayInterruptIfRunning)
Cancel any running operators for this query on this node (internal API).
|
protected void |
consumeChunk()
Instruct the
IRunningQuery to consume an IChunkMessage
already on its input queue. |
protected IChunkHandler |
getChunkHandler()
Return the effective
IChunkHandler for this query. |
protected IChunkHandler |
getChunkHandler(QueryEngine queryEngine,
PipelineOp query)
Factory returns the effective
IChunkHandler for this query. |
protected Map<Integer,QueueStats> |
getQueueStats()
Return a summary of the work queue for the operators in this query
(non-blocking).
|
protected void |
haltOp(IHaltOpMessage msg)
Overridden to attempt to consume another chunk each time an operator
reports that it has halted evaluation.
|
protected void |
releaseAcceptedMessages()
Close the
IAsynchronousIterator for any IChunkMessage s
which have been accepted for this queue on this node (internal
API). |
addChild, cancel, cancelQueryOnPeers, checkDeadline, doLastPass, get, get, getAsThrownCause, getAttributes, getBOp, getBOpIndex, getCause, getChildren, getDeadline, getDoneTime, getElapsed, getFederation, getFuture, getLocalIndexManager, getMemoryManager, getQuery, getQueryBuffer, getQueryController, getQueryEngine, getQueryId, getRunningCount, getRunState, getStartedOnCount, getStartTime, getStaticAnalysisStats, getStats, getStats, halt, halt, isAtOnceReady, isCancelled, isController, isDone, isRootCauseInterrupt, iterator, newQueryBuffer, releaseNativeMemoryForOperator, releaseNativeMemoryForQuery, runStateString, setDeadline, setStaticAnalysisStats, startOp, startQuery, toString, tryGetRunState
public ChunkedRunningQuery(QueryEngine queryEngine, UUID queryId, boolean controller, IQueryClient clientProxy, PipelineOp query, IChunkMessage<IBindingSet> realSource)
queryEngine
- The QueryEngine
on which the query is running. In
scale-out, a query is typically instantiated on many
QueryEngine
s.queryId
- The identifier for that query.controller
- true
iff the QueryEngine
is the query
controller for this query (the QueryEngine
which will
coordinate the query evaluation).clientProxy
- The query controller. In standalone, this is the same as the
queryEngine. In scale-out, this is a proxy for the
query controller whenever the query is instantiated on a node
other than the query controller itself.query
- The query.saStats
- Statistics object containing static analysis statisticsIllegalArgumentException
- if any argument is null
.IllegalArgumentException
- if the readTimestamp is ITx.UNISOLATED
(queries may not read on the unisolated indices).IllegalArgumentException
- if the writeTimestamp is neither
ITx.UNISOLATED
nor a read-write transaction
identifier.protected boolean acceptChunk(IChunkMessage<IBindingSet> msg)
Note: this is invoked by QueryEngine.acceptChunk(IChunkMessage)
acceptChunk
in class AbstractRunningQuery
msg
- The chunk.true
if the message was accepted.protected void consumeChunk()
IRunningQuery
to consume an IChunkMessage
already on its input queue..
Examines the input queue for each (bopId,partitionId). If there is work available and no task is currently running, then drain the work queue and submit a task to consume that work.
consumeChunk
in class AbstractRunningQuery
protected void haltOp(IHaltOpMessage msg)
haltOp
in class AbstractRunningQuery
msg
- The IHaltOpMessage
protected boolean cancelRunningOperators(boolean mayInterruptIfRunning)
AbstractRunningQuery
cancelRunningOperators
in class AbstractRunningQuery
true
if any operators were cancelled.protected void releaseAcceptedMessages()
AbstractRunningQuery
IAsynchronousIterator
for any IChunkMessage
s
which have been accepted for this queue on this node (internal
API).
Note: This must be invoked while holding a lock which is exclusive with
the lock used to hand off IChunkMessage
s to operator tasks
otherwise we could wind up invoking IAsynchronousIterator.close()
from on an IAsynchronousIterator
running in a different thread.
That would cause visibility problems in the close() semantics unless the
IAsynchronousIterator
is thread-safe for close (e.g., volatile
write, synchronized, etc.). The appropriate lock for this is
AbstractRunningQuery.lock
. This method is only invoked out of
AbstractRunningQuery.cancel(boolean)
which owns that lock.
releaseAcceptedMessages
in class AbstractRunningQuery
protected Map<Integer,QueueStats> getQueueStats()
Note: For a cluster, there is one work queue per (operator,shard) pair.
protected IChunkHandler getChunkHandler(QueryEngine queryEngine, PipelineOp query)
IChunkHandler
for this query.queryEngine
- query
- IChunkHandler
policy.QueryHints.QUERY_ENGINE_CHUNK_HANDLER
,
Vector query engine on native heap.
protected final IChunkHandler getChunkHandler()
IChunkHandler
for this query.Vector query engine on native heap.
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.