public class QueryEngine extends Object implements IQueryPeer, IQueryClient, ICounterSetAccess
IIndexManager
.
Much of the complexity of the current approach owes itself to having to run a separate task for each join for each shard in order to have the appropriate lock when running against the unisolated shard view. This also means that the join task is running inside of the concurrency manager and hence has the local view of the shard.
The main, and perhaps the only, reason why we run unisolated rules is during closure, when we query against the unisolated indices and then write the entailments back on the unisolated indices.
Supporting closure has always been complicated. This complexity is mostly handled by ProgramTask#executeMutation() and AbstractTripleStore#newJoinNexusFactory() which play games with the timestamps used to read and write on the database, with commit points designed to create visibility for tuples written by a mutation rule, and with the automated advance of the read timestamp for the query in each closure pass in order to make newly committed tuples visible to subsequent rounds of closure. For scale-out, we do shard-wise auto commits so we always have a commit point which makes each write visible and the read timestamp is actually a read-only transaction which prevents the historical data we need during a closure round from being released as we are driving updates onto the federation. For the RWStore, we are having a similar problem (in the HA branch since that is where we are working on the RWStore) where historically allocated records were being released as writes drove updates on the indices. Again, we "solved" the problem for the RWStore using a commit point followed by a read-only transaction reading on that commit point to hold onto the view on which the next closure round needs to read (this uncovered a problem with the RWStore and transaction service interaction which Martyn is currently working to resolve through a combination of shadow allocators and deferred deletes which are processed once the release time is advanced by the transaction service).
The WORM does not have some of these problems with closure because we never delete history, so we do not need to create a commit point and a read-behind transaction. However, the WORM would have problems with concurrent access to the unisolated indices except that we hack that problem through the transparent use of the UnisolatedReadWriteIndex, which allows multiple threads to access the same unisolated index view using a read/write lock pattern (concurrent readers are allowed, but there is only one writer and it has exclusive access when it is running). This works out because we never run closure operations against the WORM through the concurrency manager. If we did, we would have to create a commit point after each mutation and use a read-behind transaction to prevent concurrent access to the unisolated index.
The main advantage that I can see of the current complexity is that it allows us to do load+closure as a single operation on the WORM, resulting in a single commit point. This makes that operation ACID without having to use full read/write transactions. This is how we gain the ACID contract for the standalone Journal in the SAIL for the WORM. Of course, the SAIL does not have that contract for the RWStore because we have to do the commit and read-behind transaction in order to have visibility and avoid concurrent access to the unisolated index (by reading behind on the last commit point).
I think that the reality is even one step more complicated. When doing truth maintenance (incremental closure), we bring the temporary graph to a fixed point (the rules write on the temp store) and then apply the delta in a single write to the database. That suggests that incremental truth maintenance would continue to be ACID, but that database-at-once-closure would be round-wise ACID.
So, I would like to suggest that we break ACID for database-at-once-closure and always follow the pattern of (1) do a commit before each round of closure; and (2) create a read-behind transaction to prevent the release of that commit point as we drive writes onto the indices. If we follow this pattern then we can write on the unisolated indices without conflict and read on the historical views without conflict. Since there will be a commit point before each mutation rule runs (which corresponds to a closure round), database-at-once-closure will be atomic within a round, but will not be a single atomic operation. Per above, I think that we would retain the ACID property for incremental truth maintenance against a WORM or RW mode Journal.
----
The advantage of this proposal (commit before each mutation rule and run query against a read-behind transaction) is that this could enormously simplify how we execute joins.Right now, we use a factory pattern to create a join task on each node for each shard for which that node receives binding sets for a query. The main reason for doing this is to gain the appropriate lock for the unisolated index. If we never run a query against the unisolated index then we can go around the concurrency manager and run a single "query manager" task for all joins for all shards for all queries. This has some great benefits which I will go into below.
That "query manager" task would be responsible for accepting buffers containing elements or binding sets from other nodes and scheduling consumption of those data based on various criteria (order of arrival, priority, buffer resource requirements, timeout, etc.). This manager task could use a fork join pool to execute light weight operations (NIO, formulation of access paths from binding sets, mapping of binding sets onto shards, joining a chunk already read from an access path against a binding set, etc). Operations which touch the disk need to run in their own thread (until we get Java 7 async file IO, which is already available in a preview library). We could handle that by queuing those operations against a fixed size thread pool for reads.
This is a radical change in how we handle distributed query execution, but I think that it could have a huge payoff by reducing the complexity of the join logic, making it significantly easier to execute different kinds of join operations, reducing the overhead for acquiring locks for the unisolated index views, reducing the #of threads consumed by joins (from one per shard per join per query to a fixed pool of N threads for reads), etc. It would centralize the management of resources on each node and make it possible for us to handle things like join termination by simply purging data from the query manager task for the terminated join.
Modifier and Type | Class and Description |
---|---|
static interface |
QueryEngine.Annotations
Annotations understood by the
QueryEngine . |
static interface |
QueryEngine.IRunningQueryListener
Listener API for
IRunningQuery life cycle events (start/halt). |
Modifier and Type | Field and Description |
---|---|
protected QueryEngineCounters |
counters
Counters at the global level.
|
protected static String |
ERR_QUERY_NOT_RUNNING
Error message used if a query is not running.
|
protected GeoSpatialCounters |
geoSpatialCounters
GeoSpatial counters
|
Constructor and Description |
---|
QueryEngine(IIndexManager localIndexManager) |
Modifier and Type | Method and Description |
---|---|
protected boolean |
acceptChunk(IChunkMessage<IBindingSet> msg)
Add a chunk of intermediate results for consumption by some query.
|
void |
addListener(QueryEngine.IRunningQueryListener l)
Add a query listener.
|
void |
addPendingCancel(UUID queryId)
Add a query
UUID to the LRU of query identifiers for which we
have received a CANCEL request, but were unable to find a running QUERY,
recently done query, or running UPDATE request. |
protected void |
assertRunning()
Return if the query engine is running.
|
void |
bufferReady(IChunkMessage<IBindingSet> msg)
Notify a service that a buffer having data for some
BOp in some
running query is available. |
void |
cancelQuery(UUID queryId,
Throwable cause)
Notify a service that the query has been terminated.
|
void |
declareQuery(IQueryDecl queryDecl)
Deprecated.
|
protected void |
didShutdown()
Hook is notified by
shutdown() when all running queries have
terminated. |
AbstractRunningQuery |
eval(BOp op)
Evaluate a query.
|
AbstractRunningQuery |
eval(BOp op,
IBindingSet bset)
Evaluate a query.
|
AbstractRunningQuery |
eval(BOp op,
IBindingSet[] bsets)
Evaluate a query.
|
AbstractRunningQuery |
eval(BOp op,
IBindingSet[] bsets,
Map<Object,Object> attribs)
Evaluate a query.
|
AbstractRunningQuery |
eval(UUID queryId,
BOp op,
IBindingSet bset)
Note: Used only by the test suite.
|
AbstractRunningQuery |
eval(UUID queryId,
BOp op,
Map<Object,Object> queryAttributes,
IBindingSet[] bset)
Note: Used only by the test suite.
|
AbstractRunningQuery |
eval(UUID queryId,
BOp op,
Map<Object,Object> queryAttributes,
IBindingSet[][] bset)
Note: Used only by the test suite.
|
AbstractRunningQuery |
eval(UUID queryId,
PipelineOp query,
Map<Object,Object> queryAttributes,
IChunkMessage<IBindingSet> msg)
Evaluate a query.
|
protected void |
execute(Runnable r)
|
protected void |
finalize()
QueryEngine s are used with a singleton pattern managed by the
QueryEngineFactory . |
org.eclipse.jetty.client.HttpClient |
getClientConnectionManager()
Return the
HttpClient used to make remote SERVICE call requests. |
ConcurrencyManager |
getConcurrencyManager()
Return the
ConcurrencyManager for the local index manager . |
CounterSet |
getCounters()
Return a
CounterSet which reports various statistics for the
QueryEngine . |
IBigdataFederation<?> |
getFederation()
The
IBigdataFederation iff running in scale-out. |
GeoSpatialCounters |
getGeoSpatialCounters()
The
QueryEngineCounters object for this QueryEngine . |
IIndexManager |
getIndexManager()
The local index manager, which provides direct access to local
BTree and IndexSegment objects. |
IQueryClient |
getProxy()
The RMI proxy for this
QueryEngine when used as a query controller. |
PipelineOp |
getQuery(UUID queryId)
Return the query.
|
protected QueryEngineCounters |
getQueryEngineCounters()
The
QueryEngineCounters object for this QueryEngine . |
UUID[] |
getRunningQueries()
Return the set of queries which are running as of the moment when the
request was processed.
|
AbstractRunningQuery |
getRunningQuery(UUID queryId)
Return the
AbstractRunningQuery associated with that query
identifier. |
UUID |
getServiceUUID()
The
UUID of the service in which this QueryEngine is
running. |
protected void |
halt(AbstractRunningQuery q)
The query is no longer running.
|
void |
haltOp(IHaltOpMessage msg)
Notify the client that execution has halted for some query, operator,
node, shard, and source binding set chunk(s).
|
void |
init()
Initialize the
QueryEngine . |
protected boolean |
isRunning() |
boolean |
isScaleOut()
Return
true iff running against an
IBigdataFederation . |
protected QueryEngineCounters |
newCounters()
Extension hook for new
QueryEngineCounters instances. |
protected GeoSpatialCounters |
newGeoSpatialCounters()
Extension hook for new
GeoSpatialCounters instances. |
protected AbstractRunningQuery |
newRunningQuery(UUID queryId,
boolean controller,
IQueryClient clientProxy,
UUID queryControllerId,
PipelineOp query,
IChunkMessage<IBindingSet> realSource)
Factory for
IRunningQuery s. |
boolean |
pendingCancel(UUID queryId)
|
protected AbstractRunningQuery |
putIfAbsent(UUID queryId,
AbstractRunningQuery runningQuery)
Places the
AbstractRunningQuery object into the internal map. |
void |
removeListener(QueryEngine.IRunningQueryListener l)
Remove a query listener.
|
void |
shutdown()
Shutdown the
QueryEngine (blocking). |
void |
shutdownNow()
Do not accept new queries and halt any running binding set chunk tasks.
|
void |
startOp(IStartOpMessage msg)
Notify the client that execution has started for some query, operator,
node, and index partition.
|
protected static final transient String ERR_QUERY_NOT_RUNNING
protected final QueryEngineCounters counters
protected final GeoSpatialCounters geoSpatialCounters
public QueryEngine(IIndexManager localIndexManager)
localIndexManager
- The local index manager.public CounterSet getCounters()
CounterSet
which reports various statistics for the
QueryEngine
.getCounters
in interface ICounterSetAccess
protected QueryEngineCounters newCounters()
QueryEngineCounters
instances.protected GeoSpatialCounters newGeoSpatialCounters()
GeoSpatialCounters
instances.protected QueryEngineCounters getQueryEngineCounters()
QueryEngineCounters
object for this QueryEngine
.public GeoSpatialCounters getGeoSpatialCounters()
QueryEngineCounters
object for this QueryEngine
.public UUID getServiceUUID()
IQueryPeer
UUID
of the service in which this QueryEngine
is
running.getServiceUUID
in interface IQueryPeer
UUID
of the service in which this QueryEngine
is running -or- a unique and distinct UUID if the
QueryEngine
is not running against an
IBigdataFederation.IService.getServiceUUID()
public IBigdataFederation<?> getFederation()
IBigdataFederation
iff running in scale-out.
Note: The IBigdataFederation
is required in scale-out in order to
perform shard locator scans when mapping binding sets across the next
join in a query plan.
public IIndexManager getIndexManager()
BTree
and IndexSegment
objects. In scale-out, this is the
IndexManager
inside the IDataService
and provides direct
access to FusedView
s (aka shards).
Note: You MUST NOT use unisolated indices without obtaining the necessary
locks. The QueryEngine
is intended to run only against committed
index views for which no locks are required.
public ConcurrencyManager getConcurrencyManager()
ConcurrencyManager
for the local index manager
.public IQueryClient getProxy()
QueryEngine
when used as a query controller.
The default implementation returns this.public org.eclipse.jetty.client.HttpClient getClientConnectionManager()
HttpClient
used to make remote SERVICE call requests.public boolean isScaleOut()
true
iff running against an
IBigdataFederation
.public void addPendingCancel(UUID queryId)
UUID
to the LRU of query identifiers for which we
have received a CANCEL request, but were unable to find a running QUERY,
recently done query, or running UPDATE request.queryId
- The UUID of the operation to be cancelled.public boolean pendingCancel(UUID queryId)
true
iff the UUID
is the the collection of
UUID
s for which we have already received a CANCEL request.
Note: The UUID
is removed from the pending cancel collection as a
side-effect.
queryId
- The UUID
of the operation.true
if that operation has already been marked for
cancellation.public void init()
QueryEngine
. It will accept binding set chunks and
run them against running queries until it is shutdown.protected void finalize() throws Throwable
QueryEngine
s are used with a singleton pattern managed by the
QueryEngineFactory
. They are torn down automatically once they
are no longer reachable. This behavior depends on not having any hard
references back to the QueryEngine
.protected void assertRunning()
IllegalStateException
- if the query engine is shutting down.protected boolean isRunning()
protected boolean acceptChunk(IChunkMessage<IBindingSet> msg)
msg
- A chunk of intermediate results.true
if the chunk was accepted. This will return
false
if the query is done (including cancelled) or
the query engine is shutdown. The IChunkMessage
will have
been released
if it was not
accepted.IllegalArgumentException
- if the chunk is null
.IllegalStateException
- if the chunk is not materialized.public void shutdown()
QueryEngine
(blocking). The QueryEngine
will
not accept new queries, but existing queries will run to completion.protected void didShutdown()
shutdown()
when all running queries have
terminated.public void shutdownNow()
@Deprecated public void declareQuery(IQueryDecl queryDecl) throws RemoteException
IQueryPeer
declareQuery
in interface IQueryPeer
queryDecl
- The query declaration.RemoteException
public void bufferReady(IChunkMessage<IBindingSet> msg)
IQueryPeer
BOp
in some
running query is available. The receiver may request the data when they
are ready. If the query is cancelled, then the sender will drop the
buffer.bufferReady
in interface IQueryPeer
msg
- The message.public void cancelQuery(UUID queryId, Throwable cause)
The default implementation is a NOP.
cancelQuery
in interface IQueryPeer
queryId
- The query identifier.cause
- The cause. When null
, this is presumed to be
normal query termination.public PipelineOp getQuery(UUID queryId)
IQueryClient
getQuery
in interface IQueryClient
queryId
- The query identifier.public void startOp(IStartOpMessage msg) throws RemoteException
IQueryClient
startOp
in interface IQueryClient
RemoteException
public void haltOp(IHaltOpMessage msg) throws RemoteException
IQueryClient
haltOp
in interface IQueryClient
RemoteException
public AbstractRunningQuery eval(BOp op) throws Exception
query
- The query to evaluate.IRunningQuery
.IllegalStateException
- if the QueryEngine
has been shutdown()
.Exception
public AbstractRunningQuery eval(BOp op, IBindingSet bset) throws Exception
query
- The query to evaluate.bset
- The initial binding set to present.IRunningQuery
.IllegalStateException
- if the QueryEngine
has been shutdown()
.Exception
public AbstractRunningQuery eval(UUID queryId, BOp op, IBindingSet bset) throws Exception
Exception
public AbstractRunningQuery eval(UUID queryId, BOp op, Map<Object,Object> queryAttributes, IBindingSet[] bset) throws Exception
Exception
public AbstractRunningQuery eval(UUID queryId, BOp op, Map<Object,Object> queryAttributes, IBindingSet[][] bset) throws Exception
Exception
public AbstractRunningQuery eval(BOp op, IBindingSet[] bsets) throws Exception
query
- The query to evaluate.bsets
- The initial binding sets to present.IRunningQuery
.IllegalStateException
- if the QueryEngine
has been shutdown()
.Exception
public AbstractRunningQuery eval(BOp op, IBindingSet[] bsets, Map<Object,Object> attribs) throws Exception
query
- The query to evaluate.bsets
- The initial binding sets to present.IRunningQuery
.IllegalStateException
- if the QueryEngine
has been shutdown()
.Exception
public AbstractRunningQuery eval(UUID queryId, PipelineOp query, Map<Object,Object> queryAttributes, IChunkMessage<IBindingSet> msg) throws Exception
IBindingSet
s made available by the IChunkMessage
will
be pushed into the query.queryId
- The unique identifier for the query.query
- The query to evaluate.attribs
- Attributes to be attached to the query before it begins to
execute (optional).msg
- A message providing access to the initial binding set(s)
used to begin query evaluation.IRunningQuery
.IllegalStateException
- if the QueryEngine
has been shutdown()
.Exception
protected AbstractRunningQuery putIfAbsent(UUID queryId, AbstractRunningQuery runningQuery)
AbstractRunningQuery
object into the internal map.queryId
- The query identifier.runningQuery
- The AbstractRunningQuery
.AbstractRunningQuery
-or- another
AbstractRunningQuery
iff one exists with the same
UUID
.public AbstractRunningQuery getRunningQuery(UUID queryId)
AbstractRunningQuery
associated with that query
identifier.queryId
- The query identifier.AbstractRunningQuery
-or- null
if there
is no query associated with that query identifier.RuntimeException
- if the query halted with an error (if the query halted
normally this will wrap an InterruptedException
).protected void halt(AbstractRunningQuery q)
public void addListener(QueryEngine.IRunningQueryListener l)
public void removeListener(QueryEngine.IRunningQueryListener l)
protected AbstractRunningQuery newRunningQuery(UUID queryId, boolean controller, IQueryClient clientProxy, UUID queryControllerId, PipelineOp query, IChunkMessage<IBindingSet> realSource)
IRunningQuery
s.public UUID[] getRunningQueries()
IQueryClient
getRunningQueries
in interface IQueryClient
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.