public class FederatedRunningQuery extends ChunkedRunningQuery
ChunkedRunningQuery
to provide additional state and logic
required to support distributed query evaluation against an
IBigdataFederation
.ERR_DUPLICATE_IDENTIFIER, ERR_NO_SUCH_BOP, ERR_NOT_CONTROLLER, ERR_NOT_PIPELINE_START, ERR_QUERY_DONE, lock
Constructor and Description |
---|
FederatedRunningQuery(FederatedQueryEngine queryEngine,
UUID queryId,
boolean controller,
IQueryClient clientProxy,
UUID queryControllerId,
PipelineOp query,
IChunkMessage<IBindingSet> realSource) |
Modifier and Type | Method and Description |
---|---|
protected boolean |
acceptChunk(IChunkMessage<IBindingSet> msg)
Overridden to make this visible to the
FederatedQueryEngine . |
protected boolean |
cancelQueryOnPeers(Throwable cause,
Set<UUID> startedOn)
Overridden to notify each peer on which the query was started.
|
protected void |
doLastPass(int bopId,
Set doneOn)
Overridden to broadcast to all nodes and/or shards on which the operator
has run in scale-out.
|
DirectBufferPoolAllocator.IAllocationContext |
getAllocationContext(AllocationContextKey key)
Return the
DirectBufferPoolAllocator.IAllocationContext for the given key. |
protected IChunkHandler |
getChunkHandler(QueryEngine queryEngine,
PipelineOp query)
Factory returns the effective
IChunkHandler for this query. |
FederatedQueryEngine |
getQueryEngine()
The query engine class executing the query on this node.
|
IQueryPeer |
getQueryPeer(UUID serviceUUID)
Resolve the proxy for an
IQueryPeer . |
protected void |
releaseNativeMemoryForOperator(int bopId)
Extended to release all allocations associated with the specified
operator.
|
protected void |
releaseNativeMemoryForQuery()
Extended to release all
DirectBufferPoolAllocator.IAllocationContext s associated with the
query when it terminates. |
cancelRunningOperators, consumeChunk, getChunkHandler, getQueueStats, haltOp, releaseAcceptedMessages
addChild, cancel, checkDeadline, get, get, getAsThrownCause, getAttributes, getBOp, getBOpIndex, getCause, getChildren, getDeadline, getDoneTime, getElapsed, getFederation, getFuture, getLocalIndexManager, getMemoryManager, getQuery, getQueryBuffer, getQueryController, getQueryId, getRunningCount, getRunState, getStartedOnCount, getStartTime, getStaticAnalysisStats, getStats, getStats, halt, halt, isAtOnceReady, isCancelled, isController, isDone, isRootCauseInterrupt, iterator, newQueryBuffer, runStateString, setDeadline, setStaticAnalysisStats, startOp, startQuery, toString, tryGetRunState
public FederatedRunningQuery(FederatedQueryEngine queryEngine, UUID queryId, boolean controller, IQueryClient clientProxy, UUID queryControllerId, PipelineOp query, IChunkMessage<IBindingSet> realSource)
protected void releaseNativeMemoryForOperator(int bopId)
Release native memory associated with this operator, if any (NOP, but overridden in scale-out to release NIO buffers used to move solutions around in the cluster).
Note: Operators are responsible for releasing their child
IMemoryManager
context, if any, when they terminate and should
specify the PipelineOp.Annotations#LAST_PASS
annotation to
receive notice in the form of a final evaluation pass over an empty
IChunkMessage
. If they do NOT release an IMemoryManager
context which is a child of the {query's
context
, then their child IMemoryManager
context will be
retained until the termination of the query, at which point the query's
IMemoryManager
context will be release, and all child contexts
will be released automatically along with it.
releaseNativeMemoryForOperator
in class AbstractRunningQuery
AbstractRunningQuery.releaseNativeMemoryForQuery()
protected void releaseNativeMemoryForQuery()
DirectBufferPoolAllocator.IAllocationContext
s associated with the
query when it terminates.
Release native memory associated with this query, if any. FIXME This could cause direct buffers to be released back to the pool before the operator tasks have terminated. That is NOT safe as the buffers could then be reissued to other threads while existing threads still have references to the buffers. Really, the same problem exists with the allocation contexts used for NIO transfers of IBindingSet[]s.
We will have to be very careful to wait until each operator's Future isDone() before calling clear() on the IMemoryManager to release the native buffers back to the pool. If we release a buffer while an operator is still running, then we will get data corruption arising from the recycling of the buffer to another native buffer user.
AbstractRunningQuery.cancel(...) is where we need to handle this, more specifically cancelRunningOperators(). Right now it is not waiting for those operators to terminate.
Making this work is tricky. AbstractRunningQuery is holding a lock. The operator tasks do not actually require that lock to terminate, but they are wrapped by a ChunkWrapperTask, which handles reporting back to the AbstractRunningQuery and *does* need the lock, and also by a ChunkFutureTask. Since we actually do do ChunkFutureTask.get(), we are going to deadlock if we invoke that while holding the AbstractRunningQuery's lock.
The alternative is to handle the tear down of the native buffers for a query asynchronously after the query has been cancelled, deferring the release of the native buffers back to the direct buffer pool until all tasks for the query are known to be done.
releaseNativeMemoryForQuery
in class AbstractRunningQuery
MemoryManager should know when it has been closed
FIXME We need to have distinct events for the query evaluation life cycle
and the query results life cycle. Really, this means that temporary
solution sets are scoped to the parent query. This is a matter of the
scope of the allocation context for the {@link DirectBufferPoolAllocator}
and releasing that scope when the parent query is done (in cancel()).
[Also consider scoping the temporary solution sets to a transaction or an
HTTP session, e.g., by an integration with the NSS using traditional
session concepts.]
public FederatedQueryEngine getQueryEngine()
IRunningQuery
getQueryEngine
in interface IRunningQuery
getQueryEngine
in class AbstractRunningQuery
protected boolean acceptChunk(IChunkMessage<IBindingSet> msg)
FederatedQueryEngine
.
Make a chunk of binding sets available for consumption by the query.
Note: this is invoked by QueryEngine.acceptChunk(IChunkMessage)
acceptChunk
in class ChunkedRunningQuery
msg
- The chunk.true
if the message was accepted.public IQueryPeer getQueryPeer(UUID serviceUUID)
IQueryPeer
. This is special cased for
both this service (the actual reference is returned) and the query
controller (we use an alternative path to discover the query controller
since it might not be registered against a lookup service if it is not a
data service).serviceUUID
- The service identifier for the peer.QueryEngine
reference if the identified service is this service, or
null
if the service could not be discovered.public DirectBufferPoolAllocator.IAllocationContext getAllocationContext(AllocationContextKey key)
DirectBufferPoolAllocator.IAllocationContext
for the given key.key
- The key.protected boolean cancelQueryOnPeers(Throwable cause, Set<UUID> startedOn)
Cancel the query on each node where it is known to be running.
Note: The default implementation verifies that the caller is holding the
AbstractRunningQuery.lock
but is otherwise a NOP. This is overridden for scale-out.
cancelQueryOnPeers
in class AbstractRunningQuery
cause
- When non-null
, the cause.true
iff something was cancelled.protected IChunkHandler getChunkHandler(QueryEngine queryEngine, PipelineOp query)
ChunkedRunningQuery
IChunkHandler
for this query.getChunkHandler
in class ChunkedRunningQuery
IChunkHandler
policy.QueryHints.QUERY_ENGINE_CHUNK_HANDLER
,
Vector query engine on native heap.
protected void doLastPass(int bopId, Set doneOn)
Queue empty IChunkMessage
s to trigger the last evaluation pass
for an operator which can not be re-triggered by any upstream operator or
by IChunkMessage
s which have already been buffered.
Note: If the queue for accepting new chunks could block then this could deadlock. We work around that by using the same lock for the AbstractRunningQuery and the queue of accepted messages. If the queue blocks, this thread will be yield the lock and another thread may make progress.
doLastPass
in class AbstractRunningQuery
doneOn
- The collection of shards or services on which the operator
need to receive a last evaluation pass message.BOpEvaluationContext
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.