S
- The generic type of the statement objects.R
- The generic type of the resource identifier (File, URL, etc).
FIXME Modify to support SIDs. We basically need to loop in the
workflowLatch_bufferTids
workflow state until all SIDs have been
assigned. However, the termination conditions will be a little more complex.
During termination, if we have the TIDs but not yet the SIDs then we need to
flush the SID requests rather than allowing them to timeout. Since SID
processing is cyclic, we may have to do this one or more times.
AsynchronousStatementBufferWithSids: When SIDs are enabled, we must identify the minimum set of statements whose SIDs are referenced by blank nodes in the S, P, O positions of other statements. Since we can not make that determination until we reach the end of the document, all statements which use blank nodes are placed into the deferredStatements container. Further, and unlike the synchronous StatementBuffer, we must defer writes of grounded statements until we know whether or not their SID appears in a blank node reference by another statement. We MUST use synchronous RPC to obtain the SIDs for those statements. This means that the entire document MUST be parsed into memory. Since we must buffer the entire document in memory when SIDs are enabled (when using asynchronous writes), distinct implementations of the asynchronous statement buffer are used depending on whether or not SIDs are enabled. [Actually, we fully buffer anyway so we can use the same implementation class.] Once the end of the document has been reached, we iteratively divide the parsed statements into three collections. This process ends once all three collections are empty. 1. groundedStatements : These are statements which are not referenced by other statements using their SID and which do not contain references to the SIDs of other statements. The groundedStatements are written asynchronously since there is no dependency on their SIDs. 2. referencedStatements : These are statements whose SID has not been assigned yet and which do not reference other statements but which are themselves referenced by other statements using a blank node. These statements are written using synchronous RPC so that we may obtain their SIDs and thereby convert one or more deferredStatements to either groundedStatements or referencedStatements. 3. deferredStatements : These are statements using a blank node to reference another statement whose SID has not been assigned yet. These statements MAY also be referenced by other deferred statements. However, those references MAY NOT form a cycle. Deferred statements are moved to either the groundedStatements or the referencedStatements collection once their blank node references have been assigned SIDs. Given: value[] - RDF Values observed in the S, P, O, and C positions. unresolvedRefs[] - RDF blank nodes observed in the C position are entered into this collection. They are removed from the collection as they are resolved. statement[] - RDF Statements reported by the parser. Do: // remove blank nodes serving as SIDs from the value[]. value[] := value[] - unresolvedRef[]; value[] => TERM2ID (Sync RPC, assigning TIDs) value[] => ID2TERM (Async) value[] => Text (Async, iff enabled) // initially, all statements are deferred. deferredStatements := statements; while(!groundedStatements.isEmpty() && !referencedStatements.isEmpty() && !deferredStatements.isEmpty()) { groundedStatement[] => TERM2ID (async) groundedStatement[] := []; // empty. referencedStatement[] => TERM2ID (Sync RPC, assigning SIDs) foreach spo : referencedStatements { unresolvedRefs.remove( spo.c ); } referencedStatement[] := []; // empty. foreach spo : deferredStatement[i] { if(spo.isGrounded) { // true iff S.tid, P.tid, and O.tid are bound, implying that // this statement does not have any unresolved references to // other statements. if(unresolvedReferences.contains(spo.c)) { // will be written synchronously. referencedStatements.add( spo ); } else { // will be written asynchronously. groundedStatement.add( spo ); } } } }
public class AsynchronousStatementBufferFactory<S extends BigdataStatement,R> extends Object implements IAsynchronousWriteStatementBufferFactory<S>
The asynchronous statement buffer w/o SIDs is much simpler that w/. If we require that the document is fully buffered in memory, then we can simplify this to just:
Given: value[] - RDF Values observed in the S,P,O, or C positions. statement[] - RDF Statements reported by the parser. Do: value[] => TERM2ID (Sync RPC, assigning TIDs) value[] => BLOBS (Sync RPC, assigning TIDs) value[] => ID2TERM (Async) value[] => Text (Async, iff enabled) statement[] => (SPO,POS,OSP) (Async)Note: This DOES NOT support truth maintenance. Truth maintenance requires that the term identifiers are resolved against the database's lexicon while the statements are written onto a local (and temporary) triple store. There is no (or at least less) reason to use asynchronous writes against a local store. However, TM could use this to copy the data from the temporary triple store to the database. This should be plugged in transparently in the copyStatements() API for the tripleStore.
Note: This DOES NOT support SIDS.
Modifier and Type | Class and Description |
---|---|
protected class |
AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl
Inner class provides the statement buffer.
|
protected class |
AsynchronousStatementBufferFactory.DeleteTask
Task deletes a resource from the local file system.
|
protected class |
AsynchronousStatementBufferFactory.ParserTask
Tasks either loads a RDF resource or verifies that the told triples found
in that resource are present in the database.
|
Constructor and Description |
---|
AsynchronousStatementBufferFactory(ScaleOutTripleStore tripleStore,
int producerChunkSize,
int valuesInitialCapacity,
int bnodesInitialCapacity,
org.openrdf.rio.RDFFormat defaultFormat,
String defaultGraph,
RDFParserOptions parserOptions,
boolean deleteAfter,
int parserPoolSize,
int parserQueueCapacity,
int term2IdWriterPoolSize,
int otherWriterPoolSize,
int notifyPoolSize,
long pauseParsedPoolStatementThreshold) |
Modifier and Type | Method and Description |
---|---|
void |
awaitAll()
Close buffers and then await their
Future s. |
void |
cancelAll(boolean mayInterruptIfRunning)
Cancel all
Future s. |
void |
close()
Awaits a signal that all documents which have queued writes are
finished and then closes the remaining buffers.
|
protected void |
deleteResource(R resource)
Delete a file whose data have been made restart safe on the database from
the local file system (this must be overridden to handle resources which
are not
File s). |
protected void |
documentDone(R resource)
Invoked after a document has become restart safe.
|
protected void |
documentError(R resource,
Throwable t)
Invoked after a document has failed.
|
CounterSet |
getCounters()
Return performance counters defined by this factory.
|
protected org.openrdf.rio.RDFFormat |
getDefaultRDFFormat()
The default RDF interchange format that will be used when the format can
not be determined.
|
long |
getDocumentDoneCount()
The #of documents submitted to the factory which have been processed
successfully.
|
long |
getDocumentErrorCount()
The #of documents submitted to the factory which could not be processed
due to some error.
|
long |
getElapsedMillis()
The elapsed milliseconds, counting only the time between
notifyStart() and notifyEnd() . |
protected InputStream |
getInputStream(R resource)
Open an buffered input stream reading from the resource.
|
long |
getStatementCount()
Return an estimate of the #of statements written on the indices.
|
boolean |
isAnyDone()
Return
true if the Future for any of the
asynchronous write buffers is done. |
protected boolean |
isDeleteAfter()
Delete files after they have been successfully loaded when
true . |
protected Runnable |
newFailureTask(R resource,
Throwable cause)
Return the optional task to be executed for a resource for which
processing has failed.
|
protected Callable<?> |
newParserTask(R resource)
Return a task to parse the document.
|
IStatementBuffer<S> |
newStatementBuffer()
Note: do not invoke this directly.
|
protected AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl |
newStatementBuffer(R resource) |
protected Runnable |
newSuccessTask(R resource)
Return the optional task to be executed for a resource which has been
successfully processed and whose assertions are now restart safe on the
database.
|
protected void |
notifyEnd()
Notify that the factory is done running tasks (for now).
|
protected void |
notifyStart()
Notify that the factory will begin running tasks.
|
int |
submitAll(File fileOrDir,
FilenameFilter filter,
long retryMillis)
Submit all files in a directory for processing via
#submitOne(String) . |
void |
submitOne(R resource)
Submit a resource for processing.
|
void |
submitOne(R resource,
long retryMillis)
Submit a resource for processing.
|
String |
toString() |
public AsynchronousStatementBufferFactory(ScaleOutTripleStore tripleStore, int producerChunkSize, int valuesInitialCapacity, int bnodesInitialCapacity, org.openrdf.rio.RDFFormat defaultFormat, String defaultGraph, RDFParserOptions parserOptions, boolean deleteAfter, int parserPoolSize, int parserQueueCapacity, int term2IdWriterPoolSize, int otherWriterPoolSize, int notifyPoolSize, long pauseParsedPoolStatementThreshold)
tripleStore
- producerChunkSize
- The chunk size used when writing chunks onto the master for
the asynchronous index write API. If this value is on the
order of the #of terms or statements in the parsed documents,
then all terms / statements will be written onto the master in
one chunk. The master will split the chunk based on the
separator keys for the index partitions and write splits onto
the sink for each index partition. The master and sink
configuration is specified via the IndexMetadata
when
the triple store indices are created.valuesInitialCapacity
- The initial capacity of the map of the distinct RDF
Value
s parsed from a single document.bnodesInitialCapacity
- The initial capacity of the map of the distinct RDF
BNode
s parsed from a single document.defaultFormat
- The default RDFFormat
which will be assumed.defaultGraph
- The value that will be used for the graph/context co-ordinate when
loading data represented in a triple format into a quad store. If
not given, then the context will be the resource identifier for
the resource being parsed.parserOptions
- Options for the RDFParser
.deleteAfter
- true
if the resource should be deleted once the
statements from that resource are restart safe on the target
database.parserPoolSize
- The #of worker threads in the thread pool for parsing RDF
documents.parserQueueCapacity
- The capacity of the bounded work queue for the service running
the parser tasks.term2IdWriterPoolSize
- The #of worker threads in the thread pool for buffering
asynchronous writes on the TERM2ID/BLOBS indices.otherWriterPoolSize
- The #of worker threads in the thread pool for buffering
asynchronous index writes on the other indices.notifyPoolSize
- The #of worker threads in the thread pool for handling
document success and document error notices.pauseParsedPoolStatementThreshold
- The maximum #of statements which can be parsed but not yet
buffered before requests for new parser tasks are paused [0:
Long.MAX_VALUE
]. This allows you to place a constraint
on the RAM of the parsers. The RAM demand of the asynchronous
index write buffers is controlled by their master and sink
queue capacity and chunk size.protected boolean isDeleteAfter()
true
.protected org.openrdf.rio.RDFFormat getDefaultRDFFormat()
protected void notifyStart()
startTime
used by getElapsedMillis()
to report the run
time of the tasks.protected void notifyEnd()
#elapsed()
.public long getElapsedMillis()
notifyStart()
and notifyEnd()
.public long getStatementCount()
This value is aggregated across any IStatementBuffer
obtained
from newStatementBuffer()
for this instance.
This value actually reports the #of statements written on the SPO index
for the database. Statements are written asynchronously in chunks and the
writes MAY proceed at different rates for each of the statement indices.
The counter value will be stable once the awaitAll()
returns
normally.
SPOIndexWriteProc
public long getDocumentErrorCount()
public long getDocumentDoneCount()
public IStatementBuffer<S> newStatementBuffer()
newStatementBuffer
in interface IStatementBufferFactory<S extends BigdataStatement>
newStatementBuffer
in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
protected AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl newStatementBuffer(R resource)
public void submitOne(R resource) throws Exception
resource
- The resource (file or URL, but not a directory).Exception
- if there is a problem creating the parser task.RejectedExecutionException
- if the work queue for the parser service is full.public void submitOne(R resource, long retryMillis) throws InterruptedException, Exception
resource
- The resource (file or URL, but not a directory).retryMillis
- The number of milliseconds to wait between retries when the
parser service work queue is full. When ZERO (0L), a
RejectedExecutionException
will be thrown out instead.Exception
- if there is a problem creating the parser task.RejectedExecutionException
- if the service is shutdown -or- the retryMillis is ZERO(0L).InterruptedException
public int submitAll(File fileOrDir, FilenameFilter filter, long retryMillis) throws Exception
#submitOne(String)
.fileOrDir
- The file or directory.filter
- An optional filter. Only the files selected by the filter will
be processed.retryMillis
- The number of milliseconds to wait between retries when the
parser service work queue is full. When ZERO (0L), a
RejectedExecutionException
will be thrown out instead.Exception
protected InputStream getInputStream(R resource) throws IOException
.gz
or .zip
then the appropriate
decompression will be applied.resource
- The resource identifier.IOException
protected Callable<?> newParserTask(R resource) throws Exception
AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl
for the document. When
that buffer is flushed, the document will be queued for further
processing.resource
- The resource to be parsed.Exception
public boolean isAnyDone()
IAsynchronousWriteStatementBufferFactory
true
if the Future
for any of the
asynchronous write buffers is done.
Note: This method should be invoked periodically to verify that no
errors have been encountered by the asynchronous write buffers. If
this method returns true
, invoke IAsynchronousWriteStatementBufferFactory.awaitAll()
,
which will detect any error(s), cancel the other Future
s,
and throw an error back to you.
isAnyDone
in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
public void cancelAll(boolean mayInterruptIfRunning)
IAsynchronousWriteStatementBufferFactory
cancelAll
in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
public void close()
close
in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
public void awaitAll() throws InterruptedException, ExecutionException
IAsynchronousWriteStatementBufferFactory
Future
s. Once closed, the
buffers will not accept further input and the consumers will
eventually drain the buffers and report that they are exhausted. The
Future
s will become available once the iterators are
exhausted.awaitAll
in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
InterruptedException
- if interrupted while awaiting any of the Future
s.ExecutionException
- if any Future
fails.protected final void documentDone(R resource)
newSuccessTask(Object)
returns a Runnable
then that will
be executed on the notifyService
.resource
- The document identifier.protected final void documentError(R resource, Throwable t)
newFailureTask(Object, Throwable)
returns a Runnable
then that will be executed on the notifyService
.resource
- The document identifier.t
- The exception.protected Runnable newSuccessTask(R resource)
notifyService
.
The default implementation runs a AsynchronousStatementBufferFactory.DeleteTask
IFF
deleteAfter was specified as true
to the ctor and
otherwise returns null
. The event is logged @ INFO.
resource
- The resource.null
if no task should be run.protected Runnable newFailureTask(R resource, Throwable cause)
notifyService
.
The default implementation logs a message @ ERROR.
resource
- The resource.cause
- The cause.null
if no task should be
run.protected void deleteResource(R resource)
File
s).resource
- The resource.public CounterSet getCounters()
IAsynchronousWriteStatementBufferFactory
getCounters
in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.