public class ProgramTask extends DataServiceCallable<Object> implements IProgramTask
DataService
.Modifier and Type | Field and Description |
---|---|
protected static org.apache.log4j.Logger |
log |
Constructor and Description |
---|
ProgramTask(ActionEnum action,
IStep step,
IJoinNexusFactory joinNexusFactory)
Variant when the task will be submitted using
IDataService.submit(Callable) (efficient since all indices will
be local, but the indices must not be partitioned and must all exist on
the target DataService ). |
ProgramTask(ActionEnum action,
IStep step,
IJoinNexusFactory joinNexusFactory,
IIndexManager indexManager)
Variant when the task will be executed directly by the caller.
|
Modifier and Type | Method and Description |
---|---|
Object |
call()
Execute the program.
|
protected RuleStats |
executeClosure(IProgram program)
Computes the closure of a set of
IRule s until the relation(s) on
which they are writing reach a "fixed point". |
protected RuleStats |
executeMutation(IStep step)
Run a mutation
IStep . |
protected RuleStats |
executeProgramWithEmbeddedClosure(IProgram program)
|
protected IAsynchronousIterator<ISolution[]> |
executeQuery(IStep step)
Execute the
IStep as a query. |
void |
setDataService(DataService dataService)
Sets the
DataService reference and the IBigdataFederation
reference (if not already set). |
getDataService, isDataService
getFederation, setFederation
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getFederation, setFederation
public ProgramTask(ActionEnum action, IStep step, IJoinNexusFactory joinNexusFactory)
IDataService.submit(Callable)
(efficient since all indices will
be local, but the indices must not be partitioned and must all exist on
the target DataService
).
Note: the caller MUST submit the ProgramTask
using
DataService.submit(Callable)
in which case DataServiceCallable.dataService
field will be set (after the ctor) by the DataService
itself. The
DataService
will be used to identify an ExecutorService
and the IJoinNexusFactory
will be used to establish access to
indices, relations, etc. in the context of the AbstractTask
- see
AbstractStepTask.submit()
.
action
- step
- joinNexus
- public ProgramTask(ActionEnum action, IStep step, IJoinNexusFactory joinNexusFactory, IIndexManager indexManager)
action
- step
- joinNexusFactory
- indexManager
- IllegalArgumentException
- if any parameter is null
.public void setDataService(DataService dataService)
DataServiceCallable
DataService
reference and the IBigdataFederation
reference (if not already set).setDataService
in interface IDataServiceCallable
setDataService
in class DataServiceCallable<Object>
dataService
- The data service.public Object call() throws Exception
Note: There is no natural order for high-level query. Also, unless stable
evaluation is requested, the results can be produced by parallel threads
and the order of the materialized solution is therefore not even stable.
The only way to have a natural order is for a sort to be imposed on the
ISolution
s.
protected IAsynchronousIterator<ISolution[]> executeQuery(IStep step)
IStep
as a query.step
- The IStep
.IChunkedOrderedIterator
that will drain the
ISolution
s generated by the IStep
. Execution
will be cancelled if the iterator is
closed
. If execution results
in an error, then the iterator will throw a
RuntimeException
whose cause is the error.RuntimeException
protected RuleStats executeMutation(IStep step) throws InterruptedException, ExecutionException
IStep
. The IStep
may consist of many sub-IStep
s.
Note: If you specify ITx.READ_COMMITTED
for mutation operations
when using a federation then concurrent split/join/move can cause the
operation to fail. It is safer to use read-consistent semantics by
specifying IIndexStore.getLastCommitTime()
instead.
step
- The IStep
.RuleStats.mutationCount
.InterruptedException
ExecutionException
protected RuleStats executeClosure(IProgram program) throws InterruptedException, ExecutionException
IRule
s until the relation(s) on
which they are writing reach a "fixed point".
The general approach is a series of rounds in which each rule is applied in turn (either sequentially or in parallel, depending on the program). Solutions computed for each rule in each round written onto the relation for the head of that rule. The process halts when no new solutions are computed in a given round.
Note: When we are running the program on a ConcurrencyManager
,
each round of the closure is submitted as a single AbstractTask
.
This allows us to do historical reads during the round and to update the
read-behind timestamp before each round. During the round, the program
will write on buffers that are flushed at the end of the round. Those
buffers will use unisolated writes onto the appropriate relations.
Note: This assumes that you are following the IMutableRelation
contract -- you MUST NOT overwrite tuples with the same key and value, or
at least you must not report such "do nothing" overwrites in the mutation
count!!!
action
- The action (must be a mutation operation).program
- The program to be executed.ExecutionException
InterruptedException
protected RuleStats executeProgramWithEmbeddedClosure(IProgram program) throws InterruptedException, ExecutionException
IProgram
containing one or more sub-IProgram
that are closure operations. The top-level program must not be a closure
operation. All steps above the closure operations will be run in a
sequence. The closure operations themselves will be executed using
executeClosure(IProgram)
.
Note: Any program that embeds a closure operation must be sequential
(this is enforced by the Program class).
Note: Programs that use closure operations are constrained to either (a)
a fix point of a (normally parallel) program consisting solely of
IRule
s; or (b) a sequential program containing some steps that
are the fix point of a (normally parallel) program consisting solely of
IRule
s.
ExecutionException
InterruptedException
IllegalArgumentException
- if program is null
IllegalArgumentException
- if program is itself a closure operation.IllegalStateException
- unless the ActionEnum
is a mutation operation.Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.