public abstract class AbstractRunningQuery extends Object implements IRunningQuery
IRunningQuery
implementations. The
purpose of this class is to isolate aspects common to different designs for
managing resources for a running query and make it easier to realize
different strategies for managing the resources allocated to a running query.
There are common requirements for the IRunningQuery
, but a variety of
ways in which those requirements can be met. Among the common requirements
are a means to manage tradeoffs in the allocation of various resources to the
operators in each query. Some of the more important tradeoffs are the #of
threads to allocate to each operator (threads bounds IO for Java 6 since we
are using a synchronous IO model) and the amount of RAM allocated to each
operator (including RAM on the JVM heap and RAM on the native Java process
heap). If the #of threads is too restrictive, then queries will progress
slowly due to insufficient IO level parallelism. If the query buffers too
much data on the JVM heap, then it can cause GC overhead problems that can
drastically reduce the responsiveness and throughput of the JVM. Data can be
moved off of the JVM heap onto the Java process heap by serializing it into
direct ByteBuffer
s. This can be very efficient in
combination with hash joins at the expense of increasing the latency to the
first result when compared with pipelined evaluation.
Modifier and Type | Field and Description |
---|---|
protected static String |
ERR_DUPLICATE_IDENTIFIER
Error message used when two operators have the same
BOp.Annotations#BOP_ID . |
protected static String |
ERR_NO_SUCH_BOP
Error message used when no operator can be found for a given
BOp.Annotations#BOP_ID . |
protected static String |
ERR_NOT_CONTROLLER
Error message used when an operation which must be performed on the query
controller is attempted on some other
IQueryPeer . |
protected static String |
ERR_NOT_PIPELINE_START
Error message used when a request is addressed to an operator other than
the head of the pipeline in a context where the request must be addressed
to the operator at the head of the pipeline (e.g., when presenting the
initial binding sets to get the query moving.)
|
protected static String |
ERR_QUERY_DONE
Error message used when a request is made after the query has stopped
executing.
|
protected ReentrantLock |
lock
A lock guarding various state changes.
|
Constructor and Description |
---|
AbstractRunningQuery(QueryEngine queryEngine,
UUID queryId,
boolean controller,
IQueryClient clientProxy,
PipelineOp query,
IChunkMessage<IBindingSet> realSource) |
Modifier and Type | Method and Description |
---|---|
protected abstract boolean |
acceptChunk(IChunkMessage<IBindingSet> msg)
Make a chunk of binding sets available for consumption by the query.
|
boolean |
addChild(IRunningQuery childQuery)
Attach a child query.
|
boolean |
cancel(boolean mayInterruptIfRunning) |
protected boolean |
cancelQueryOnPeers(Throwable cause,
Set<UUID> startedOn)
Cancel the query on each node where it is known to be running.
|
protected abstract boolean |
cancelRunningOperators(boolean mayInterruptIfRunning)
Cancel any running operators for this query on this node (internal API).
|
protected void |
checkDeadline()
If the query deadline has expired, then halt the query.
|
protected abstract void |
consumeChunk()
Instruct the
IRunningQuery to consume an IChunkMessage
already on its input queue. |
protected void |
doLastPass(int bopId,
Set doneOn)
Queue empty
IChunkMessage s to trigger the last evaluation pass
for an operator which can not be re-triggered by any upstream operator or
by IChunkMessage s which have already been buffered. |
Void |
get() |
Void |
get(long arg0,
TimeUnit arg1) |
Throwable |
getAsThrownCause()
Return the first
cause regardless of whether it is
indicative of normal termination and null iff no cause has
been set. |
IQueryAttributes |
getAttributes()
Return an interface which allows attribute values to be associated with
an
IQueryContext . |
BOp |
getBOp(int bopId)
Return the
BOp having the specified id. |
Map<Integer,BOp> |
getBOpIndex()
Return an unmodifiable index from
BOp.Annotations#BOP_ID to
BOp . |
Throwable |
getCause()
Return the first
Throwable which caused this process to halt, but
only for abnormal termination. |
IRunningQuery[] |
getChildren()
Report a snapshot of the known (declared) child
IRunningQuery s
for this IRunningQuery and (recursively) for any children of this
IRunningQuery . |
long |
getDeadline()
Return the query deadline in milliseconds (the time at which it will
terminate regardless of its run state).
|
long |
getDoneTime()
The timestamp (ms) when the query was done and ZERO (0) if the query is
not yet done.
|
long |
getElapsed()
The elapsed time (ms) for the query.
|
IBigdataFederation<?> |
getFederation()
The
IBigdataFederation IFF the operator is being evaluated on an
IBigdataFederation . |
protected IHaltable<Void> |
getFuture()
The
Future of this query. |
IIndexManager |
getLocalIndexManager()
The local
IIndexManager . |
IMemoryManager |
getMemoryManager()
Return the
IMemoryManager which may be used to buffer data on
high level data structures, such as the HTree , for this query. |
PipelineOp |
getQuery()
The query.
|
protected IBlockingBuffer<IBindingSet[]> |
getQueryBuffer()
Return the buffer used for the overall output of the query pipeline and
null if this is not the query controller. |
IQueryClient |
getQueryController()
The client coordinate the evaluation of this query (aka the query
controller).
|
QueryEngine |
getQueryEngine()
The query engine class executing the query on this node.
|
UUID |
getQueryId()
The unique identifier for this query.
|
protected long |
getRunningCount(int bopId)
Return the #of instances of the operator which are concurrently
executing.
|
protected com.bigdata.bop.engine.RunState.RunStateEnum |
getRunState(int bopId)
Return the
RunState.RunStateEnum for an operator. |
protected int |
getStartedOnCount(int bopId)
Return the #of shards or nodes on which the operator has started
evaluation.
|
long |
getStartTime()
The timestamp (ms) when the query began execution.
|
StaticAnalysisStats |
getStaticAnalysisStats()
Return statistics associated with the static analysis phase of this
query.
|
Map<Integer,BOpStats> |
getStats()
Return an unmodifiable map exposing the statistics for the operators in
the query and
null unless this is the query controller. |
BOpStats |
getStats(Integer bopId)
|
<T extends Throwable> |
halt(T t)
Halt (exception thrown).
|
void |
halt(Void v)
Halt (normal termination).
|
protected void |
haltOp(IHaltOpMessage msg)
Message provides notice that the operator has ended execution.
|
protected boolean |
isAtOnceReady(int bopId)
Return
true iff the preconditions have been satisfied for
the "at-once" invocation of the specified operator (no predecessors are
running or could be triggered and the operator has not been evaluated). |
boolean |
isCancelled() |
boolean |
isController()
Return
true iff this is the query controller. |
boolean |
isDone() |
static boolean |
isRootCauseInterrupt(Throwable t)
Return
true iff the root cause of the Throwable was
an interrupt. |
ICloseableIterator<IBindingSet[]> |
iterator()
Return an iterator which will drain the solutions from the query.
|
protected IBlockingBuffer<IBindingSet[]> |
newQueryBuffer(PipelineOp query,
BOpStats queryStats)
Return the buffer that will be used to absorb solutions.
|
protected abstract void |
releaseAcceptedMessages()
Close the
IAsynchronousIterator for any IChunkMessage s
which have been accepted for this queue on this node (internal
API). |
protected void |
releaseNativeMemoryForOperator(int bopId)
Release native memory associated with this operator, if any (NOP, but
overridden in scale-out to release NIO buffers used to move solutions
around in the cluster).
|
protected void |
releaseNativeMemoryForQuery()
Release native memory associated with this query, if any.
|
protected String |
runStateString()
Return the textual representation of the
RunState of this query. |
void |
setDeadline(long deadline)
Set the query deadline.
|
void |
setStaticAnalysisStats(StaticAnalysisStats saStats)
Set the static analysis stats associated with this query.
|
protected void |
startOp(IStartOpMessage msg)
Message provides notice that the operator has started execution and will
consume some specific number of binding set chunks.
|
protected void |
startQuery(IChunkMessage<IBindingSet> msg)
Message provides notice that the query has started execution and will
consume some specific number of binding set chunks.
|
String |
toString() |
protected com.bigdata.bop.engine.RunState.RunStateEnum |
tryGetRunState(int bopId)
Attempt to return the
RunState.RunStateEnum for an operator
(non-blocking). |
protected static final String ERR_NOT_CONTROLLER
IQueryPeer
.protected static final String ERR_QUERY_DONE
protected static final String ERR_NOT_PIPELINE_START
protected static final String ERR_NO_SUCH_BOP
BOp.Annotations#BOP_ID
.protected static final String ERR_DUPLICATE_IDENTIFIER
BOp.Annotations#BOP_ID
.protected final ReentrantLock lock
runState
object. It is also used to
serialize requests to acceptChunk(IChunkMessage)
and
cancel(boolean)
and make atomic decision concerning whether to
attach a new IChunkMessage
to an operator task which is already
running or to start a new task for that message.RunState
public AbstractRunningQuery(QueryEngine queryEngine, UUID queryId, boolean controller, IQueryClient clientProxy, PipelineOp query, IChunkMessage<IBindingSet> realSource)
queryEngine
- The QueryEngine
on which the query is running. In
scale-out, a query is typically instantiated on many
QueryEngine
s.queryId
- The identifier for that query.controller
- true
iff the QueryEngine
is the query
controller for this query (the QueryEngine
which will
coordinate the query evaluation).clientProxy
- The query controller. In standalone, this is the same as the
queryEngine. In scale-out, this is an RMI proxy for the
query controller whenever the query is instantiated on a node
other than the query controller itself.query
- The query.realSource
- The original message which kicked off this query on the query
controller. This is NOT required when the query is
materialized on another node and MAY be null
, but
the original message used to kick off the query on the query
controller MUST be provided so we can ensure that the source
iteration is always closed when the query is cancelled.IllegalArgumentException
- if any argument is null
.IllegalArgumentException
- if the readTimestamp is ITx.UNISOLATED
(queries may not read on the unisolated indices).IllegalArgumentException
- if the writeTimestamp is neither
ITx.UNISOLATED
nor a read-write transaction
identifier.protected final IHaltable<Void> getFuture()
Future
of this query.
Note: This is exposed to the QueryEngine
to let it cache the
Future
for recently finished queries.
public final void setDeadline(long deadline)
deadline
- The deadline.IllegalArgumentException
- if the deadline is non-positive.IllegalStateException
- if the deadline was already set.UnsupportedOperationException
- unless node is the query controller.protected final void checkDeadline()
QueryTimeoutException
- if the query deadline has expired.public final long getDeadline()
IRunningQuery
getDeadline
in interface IRunningQuery
Long.MAX_VALUE
if no explicit deadline was specified.public final long getStartTime()
IRunningQuery
getStartTime
in interface IRunningQuery
public final long getDoneTime()
IRunningQuery
getDoneTime
in interface IRunningQuery
public final long getElapsed()
IRunningQuery
getElapsed
in interface IRunningQuery
protected final IBlockingBuffer<IBindingSet[]> getQueryBuffer()
null
if this is not the query controller.public QueryEngine getQueryEngine()
IRunningQuery
getQueryEngine
in interface IRunningQuery
public final IQueryClient getQueryController()
IRunningQuery
QueryEngine
.
For scale-out, this will be the RMI proxy for the QueryEngine
instance to which the query was submitted for evaluation by the
application. The proxy is primarily for light weight RMI messages used to
coordinate the distributed query evaluation. Ideally, all large objects
will be transfered among the nodes of the cluster using NIO buffers.
getQueryController
in interface IRunningQuery
public final UUID getQueryId()
IRunningQuery
getQueryId
in interface IRunningQuery
getQueryId
in interface IQueryContext
public final PipelineOp getQuery()
IRunningQuery
getQuery
in interface IRunningQuery
public final boolean isController()
true
iff this is the query controller.public final Map<Integer,BOpStats> getStats()
IRunningQuery
null
unless this is the query controller.
There will be a single entry in the map for each distinct
PipelineOp
. Entries might not appear until that operator has
either begun or completed at least one evaluation phase. This index only
contains operators which are actually part of the pipeline evaluation.getStats
in interface IRunningQuery
public final BOpStats getStats(Integer bopId)
bopId
- The BOp
identifier.BOpStats
object -or- null
if
there is no entry for that BOp
identifier.IllegalArgumentException
- if the argument is null
.public final Map<Integer,BOp> getBOpIndex()
IRunningQuery
BOp.Annotations#BOP_ID
to
BOp
. This index may contain operators which are not part of the
pipeline evaluation, such as IPredicate
s.getBOpIndex
in interface IRunningQuery
public final BOp getBOp(int bopId)
BOp
having the specified id.bopId
- The BOp
identifier.BOp
.IllegalArgumentException
- if there is no BOp
with that identifier declared in
this query.protected final IBlockingBuffer<IBindingSet[]> newQueryBuffer(PipelineOp query, BOpStats queryStats)
query
- The root of the query plan.queryStats
- Used to track statistics on the solutions to the query (#of
chunks, #of units).protected final void startQuery(IChunkMessage<IBindingSet> msg)
msg
- The initial message presented to the query. The message is
used to update the query RunState
. However, the
message will not be consumed until it is presented to
acceptChunk(IChunkMessage)
by the QueryEngine
.UnsupportedOperationException
- If this node is not the query coordinator.protected final void startOp(IStartOpMessage msg)
msg
- The IStartOpMessage
.UnsupportedOperationException
- If this node is not the query coordinator.protected void haltOp(IHaltOpMessage msg)
msg
- The IHaltOpMessage
UnsupportedOperationException
- If this node is not the query coordinator.protected void doLastPass(int bopId, Set doneOn)
IChunkMessage
s to trigger the last evaluation pass
for an operator which can not be re-triggered by any upstream operator or
by IChunkMessage
s which have already been buffered.
Note: If the queue for accepting new chunks could block then this could deadlock. We work around that by using the same lock for the AbstractRunningQuery and the queue of accepted messages. If the queue blocks, this thread will be yield the lock and another thread may make progress.
msg
- doneOn
- The collection of shards or services on which the operator
need to receive a last evaluation pass message.protected boolean isAtOnceReady(int bopId)
true
iff the preconditions have been satisfied for
the "at-once" invocation of the specified operator (no predecessors are
running or could be triggered and the operator has not been evaluated).bopId
- Some operator identifier.true
iff the "at-once" evaluation of the operator
may proceed.protected com.bigdata.bop.engine.RunState.RunStateEnum getRunState(int bopId)
RunState.RunStateEnum
for an operator.bopId
- The operator.RunState.RunStateEnum
.protected com.bigdata.bop.engine.RunState.RunStateEnum tryGetRunState(int bopId)
RunState.RunStateEnum
for an operator
(non-blocking).
Note: This method is intended for use in contexts where it is desirable,
but not critical, to have the RunState.RunStateEnum
for the operator. For
example, in log messages. The implementation is non-blocking and will
barge in if the lock is available and return the RunState.RunStateEnum
of
the operator. If the lock is not available, it will return
null
.
bopId
- The operator.RunState.RunStateEnum
and null
if the lock could
not be acquired.protected void releaseNativeMemoryForOperator(int bopId)
Note: Operators are responsible for releasing their child
IMemoryManager
context, if any, when they terminate and should
specify the PipelineOp.Annotations#LAST_PASS
annotation to
receive notice in the form of a final evaluation pass over an empty
IChunkMessage
. If they do NOT release an IMemoryManager
context which is a child of the {query's
context
, then their child IMemoryManager
context will be
retained until the termination of the query, at which point the query's
IMemoryManager
context will be release, and all child contexts
will be released automatically along with it.
bopId
- releaseNativeMemoryForQuery()
protected void releaseNativeMemoryForQuery()
We will have to be very careful to wait until each operator's Future isDone() before calling clear() on the IMemoryManager to release the native buffers back to the pool. If we release a buffer while an operator is still running, then we will get data corruption arising from the recycling of the buffer to another native buffer user.
AbstractRunningQuery.cancel(...) is where we need to handle this, more specifically cancelRunningOperators(). Right now it is not waiting for those operators to terminate.
Making this work is tricky. AbstractRunningQuery is holding a lock. The operator tasks do not actually require that lock to terminate, but they are wrapped by a ChunkWrapperTask, which handles reporting back to the AbstractRunningQuery and *does* need the lock, and also by a ChunkFutureTask. Since we actually do do ChunkFutureTask.get(), we are going to deadlock if we invoke that while holding the AbstractRunningQuery's lock.
The alternative is to handle the tear down of the native buffers for a query asynchronously after the query has been cancelled, deferring the release of the native buffers back to the direct buffer pool until all tasks for the query are known to be done.
MemoryManager should know when it has been closed
FIXME We need to have distinct events for the query evaluation life cycle
and the query results life cycle. Really, this means that temporary
solution sets are scoped to the parent query. This is a matter of the
scope of the allocation context for the {@link DirectBufferPoolAllocator}
and releasing that scope when the parent query is done (in cancel()).
[Also consider scoping the temporary solution sets to a transaction or an
HTTP session, e.g., by an integration with the NSS using traditional
session concepts.]
protected abstract boolean acceptChunk(IChunkMessage<IBindingSet> msg)
Note: this is invoked by QueryEngine.acceptChunk(IChunkMessage)
msg
- The chunk.true
if the message was accepted.protected abstract void consumeChunk()
IRunningQuery
to consume an IChunkMessage
already on its input queue.public final ICloseableIterator<IBindingSet[]> iterator()
IRunningQuery
closed
.iterator
in interface IRunningQuery
public final void halt(Void v)
IHaltable
public final <T extends Throwable> T halt(T t)
IHaltable
public final boolean cancel(boolean mayInterruptIfRunning)
Cancelled queries :
protected abstract boolean cancelRunningOperators(boolean mayInterruptIfRunning)
true
if any operators were cancelled.protected abstract void releaseAcceptedMessages()
IAsynchronousIterator
for any IChunkMessage
s
which have been accepted for this queue on this node (internal
API).
Note: This must be invoked while holding a lock which is exclusive with
the lock used to hand off IChunkMessage
s to operator tasks
otherwise we could wind up invoking IAsynchronousIterator.close()
from on an IAsynchronousIterator
running in a different thread.
That would cause visibility problems in the close() semantics unless the
IAsynchronousIterator
is thread-safe for close (e.g., volatile
write, synchronized, etc.). The appropriate lock for this is
lock
. This method is only invoked out of
cancel(boolean)
which owns that lock.
protected boolean cancelQueryOnPeers(Throwable cause, Set<UUID> startedOn)
Note: The default implementation verifies that the caller is holding the
lock
but is otherwise a NOP. This is overridden for scale-out.
cause
- When non-null
, the cause.true
iff something was cancelled.IllegalMonitorStateException
- unless the lock
is held by the current thread.UnsupportedOperationException
- unless this is the query controller.public final Void get() throws InterruptedException, ExecutionException
get
in interface Future<Void>
InterruptedException
ExecutionException
public final Void get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException
get
in interface Future<Void>
InterruptedException
ExecutionException
TimeoutException
public final boolean isCancelled()
isCancelled
in interface Future<Void>
public final Throwable getAsThrownCause()
IHaltable
cause
regardless of whether it is
indicative of normal termination and null
iff no cause has
been set.getAsThrownCause
in interface IHaltable<Void>
public IBigdataFederation<?> getFederation()
IRunningQuery
IBigdataFederation
IFF the operator is being evaluated on an
IBigdataFederation
. When evaluating operations against an
IBigdataFederation
, this reference provides access to the
scale-out view of the indices and to other bigdata services.getFederation
in interface IRunningQuery
public IIndexManager getLocalIndexManager()
IRunningQuery
IIndexManager
. Query evaluation occurs
against the local indices. In scale-out, query evaluation proceeds shard
wise and this IIndexManager
MUST be able to read on the
ILocalBTreeView
.getLocalIndexManager
in interface IRunningQuery
protected long getRunningCount(int bopId)
protected int getStartedOnCount(int bopId)
bopId
- The operator identifier.public IMemoryManager getMemoryManager()
IQueryContext
IMemoryManager
which may be used to buffer data on
high level data structures, such as the HTree
, for this query.
Each operator in the query should in general create its own child
IMemoryManager
. While the overall IMemoryManager
context
associated with a query will be released when the query terminates,
operators which create child IMemoryManager
contexts are
responsible for releasing their IMemoryManager
in a timely
fashion when the operator has finished its evaluation.getMemoryManager
in interface IQueryContext
public final IQueryAttributes getAttributes()
IQueryContext
IQueryContext
.getAttributes
in interface IQueryContext
public final IRunningQuery[] getChildren()
IRunningQuery
s
for this IRunningQuery
and (recursively) for any children of this
IRunningQuery
.IRunningQuery
s and never null
.public final boolean addChild(IRunningQuery childQuery)
Queries as submitted do not know about parent/child relationships
childQuery
- The child query.true
if the child query was not already declared.protected String runStateString()
RunState
of this query.
Note: Exposed for log messages in derived classes since runState
is private.
public static boolean isRootCauseInterrupt(Throwable t)
true
iff the root cause of the Throwable
was
an interrupt. This checks for any of the different kinds of exceptions
which can be thrown when an interrupt is encountered.t
- The throwable.true
iff the root cause was an interrupt.
TODO This could be optimized by checking once at each level for any of
the indicated exceptions.public void setStaticAnalysisStats(StaticAnalysisStats saStats)
IRunningQuery
setStaticAnalysisStats
in interface IRunningQuery
public StaticAnalysisStats getStaticAnalysisStats()
IRunningQuery
getStaticAnalysisStats
in interface IRunningQuery
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.