public class StatementBuffer<S extends org.openrdf.model.Statement> extends Object implements IStatementBuffer<S>, ICounterSetAccess
Statement
source and writing that output onto an
AbstractTripleStore
using the batch API.
Note: there is a LOT of Value
duplication in parsed RDF and we get a
significant reward for reducing Value
s to only the distinct
Value
s during processing. On the other hand, there is little
Statement
duplication. Hence we pay an unnecessary overhead if we try
to make the statements distinct in the buffer.
Note: This also provides an explanation for why neither this class nor writes
of SPOs do better when "distinct" statements is turned on - the "Value"
objects in that case are only represented by long integers and duplication in
their values does not impose a burden on either the heap or the index
writers. In contrast, the duplication of Value
s in the
StatementBuffer
imposes a burden on both the heap and the index
writers.
Modifier and Type | Class and Description |
---|---|
static interface |
StatementBuffer.IWrittenSPOArray
Note: The use of this interface is NOT encouraged.
|
Modifier and Type | Field and Description |
---|---|
protected AbstractTripleStore |
database
The database that will be used to resolve terms.
|
protected StatementBuffer.IWrittenSPOArray |
didWriteCallback |
protected int |
numBNodes |
protected int |
numLiterals |
protected int |
numSIDs
The #of blank nodes which appear in the context position and zero (0) if
statement identifiers are not enabled.
|
protected int |
numStmts
#of valid entries in
stmts . |
protected int |
numURIs |
protected int |
numValues
#of valid entries in
values . |
protected BigdataStatement[] |
stmts
Buffer for parsed RDF
Statement s. |
protected BigdataValueFactory |
valueFactory |
protected BigdataValue[] |
values
Buffer for parsed RDF
Value s. |
Constructor and Description |
---|
StatementBuffer(AbstractTripleStore database,
int capacity)
|
StatementBuffer(AbstractTripleStore database,
int capacity,
int queueCapacity) |
StatementBuffer(TempTripleStore statementStore,
AbstractTripleStore database,
int capacity,
int queueCapacity)
Create a buffer that writes on a
TempTripleStore when it is
flush() ed. |
Modifier and Type | Method and Description |
---|---|
protected void |
_clear()
Invoked by
incrementalWrite() to clear terms and statements
which have been written in preparation for buffering more writes. |
void |
add(org.openrdf.model.Resource s,
org.openrdf.model.URI p,
org.openrdf.model.Value o)
Add an "explicit" statement to the buffer (flushes on overflow, no
context).
|
void |
add(org.openrdf.model.Resource s,
org.openrdf.model.URI p,
org.openrdf.model.Value o,
org.openrdf.model.Resource c)
Add an "explicit" statement to the buffer (flushes on overflow).
|
void |
add(org.openrdf.model.Resource s,
org.openrdf.model.URI p,
org.openrdf.model.Value o,
org.openrdf.model.Resource c,
StatementEnum type)
Add a statement to the buffer (core impl, flushes on overflow).
|
void |
add(org.openrdf.model.Statement e)
Add a statement to the buffer.
|
protected void |
finalize()
Added to ensure that the
FutureTask is cancelled in case the
caller does not shutdown the StatementBuffer normally. |
long |
flush()
Signals the end of a source and causes all buffered statements to be
written.
|
int |
getCapacity()
The maximum #of Statements, URIs, Literals, or BNodes that the buffer can
hold.
|
CounterSet |
getCounters()
Return performance counters.
|
AbstractTripleStore |
getDatabase()
The database that will be used to resolve terms.
|
int |
getQueueCapacity()
The capacity of the optional queue used to overlap the parser with the
index writer -or- ZERO (0) iff the queue is disabled and index writes
will be synchronous and alternate with the parser (the historical
behavior).
|
AbstractTripleStore |
getStatementStore()
The optional store into which statements will be inserted when non-
null . |
protected void |
handleStatement(org.openrdf.model.Resource _s,
org.openrdf.model.URI _p,
org.openrdf.model.Value _o,
org.openrdf.model.Resource _c,
StatementEnum type)
Adds the values and the statement into the buffer.
|
protected void |
incrementalWrite()
Batch insert buffered data (terms and statements) into the store.
|
boolean |
isEmpty()
True iff there are no elements in the buffer.
|
boolean |
nearCapacity()
Returns true if the bufferQueue has less than three slots remaining for
any of the value arrays (URIs, Literals, or BNodes) or if there are no
slots remaining in the statements array.
|
void |
reset()
Clears all buffered data, including the canonicalizing mapping for blank
nodes and deferred provenance statements.
|
void |
setBNodeMap(Map<String,BigdataBNode> bnodes)
Set the canonicalizing map for blank nodes based on their ID.
|
void |
setChangeLog(IChangeLog changeLog)
Set an
IChangeLog listener that will be notified about each
statement actually written onto the backing store. |
void |
setReadOnly()
When invoked, the
StatementBuffer will resolve terms against the
lexicon, but not enter new terms into the lexicon. |
int |
size()
The #of elements currently in the buffer.
|
String |
toString() |
protected final BigdataValue[] values
Value
s.protected final BigdataStatement[] stmts
Statement
s.protected int numValues
values
.protected int numStmts
stmts
.protected int numURIs
protected int numLiterals
protected int numBNodes
protected int numSIDs
protected final AbstractTripleStore database
statementStore
is null
, statements will be written
into this store as well.protected final BigdataValueFactory valueFactory
protected StatementBuffer.IWrittenSPOArray didWriteCallback
public StatementBuffer(AbstractTripleStore database, int capacity)
Value
objects to SPO
s and writes on the database when it is flush()
ed. This
may be used to perform efficient batch write of Sesame Value
s or
Statement
s onto the database. If you already have
SPO
s then use
IRawTripleStore.addStatements(IChunkedOrderedIterator, IElementFilter)
and friends.database
- The database into which the termS and statements will be
inserted.capacity
- The #of statements that the buffer can hold.public StatementBuffer(AbstractTripleStore database, int capacity, int queueCapacity)
public StatementBuffer(TempTripleStore statementStore, AbstractTripleStore database, int capacity, int queueCapacity)
TempTripleStore
when it is
flush()
ed. This variant is used during truth maintenance since
the terms are written on the database lexicon but the statements are
asserted against the TempTripleStore
.statementStore
- The store into which the statements will be inserted
(optional). When null
, both statements and terms
will be inserted into the database. This optional
argument provides the ability to load statements into a
temporary store while the terms are resolved against the main
database. This facility is used during incremental load+close
operations.database
- The database. When statementStore is null
,
both terms and statements will be inserted into the
database.capacity
- The #of statements that the buffer can hold.queueCapacity
- The capacity of blocking queue used by the
StatementBuffer
-or- ZERO (0) to disable the blocking
queue and perform synchronous writes (default is
statements). The blocking
queue holds parsed data pending writes onto the backing store
and makes it possible for the parser to race ahead while
writer is blocked writing onto the database indices.(added blocking queue)
public final AbstractTripleStore getStatementStore()
null
.getStatementStore
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
public final AbstractTripleStore getDatabase()
getStatementStore()
is null
, statements will be
written into this store as well.getDatabase
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
public int getCapacity()
public int getQueueCapacity()
BLZG-1552
public boolean isEmpty()
IBuffer
public int size()
IBuffer
public CounterSet getCounters()
ICounterSetAccess
getCounters
in interface ICounterSetAccess
public void setReadOnly()
StatementBuffer
will resolve terms against the
lexicon, but not enter new terms into the lexicon. This mode can be used
to efficiently resolve terms to SPO
s.public void setChangeLog(IChangeLog changeLog)
IChangeLog
listener that will be notified about each
statement actually written onto the backing store.changeLog
- The change log listener.protected void finalize() throws Throwable
FutureTask
is cancelled in case the
caller does not shutdown the StatementBuffer
normally.public long flush()
Note: The source limits the scope within which blank nodes are co-referenced by their IDs. Calling this method will flush the buffer, cause any deferred statements to be written, and cause the canonicalizing mapping for blank nodes to be discarded.
flush
in interface IBuffer<S extends org.openrdf.model.Statement>
IRelation
.
See IMutableRelation
public void reset()
public void setBNodeMap(Map<String,BigdataBNode> bnodes)
IStatementBuffer
IStatementBuffer
instances. For example, the BigdataSail
does this so that the
same bnode map is used throughout the life of a SailConnection
.
While RIO provides blank node correlation within a given source, it does
NOT provide blank node correlation across sources. You need to use this
method to do that.
Note: It is reasonable to expect that the bnodes map is used by
concurrent threads. For this reason, the map SHOULD be thread-safe. This
can be accomplished either using Collections.synchronizedMap(Map)
or a ConcurrentHashMap
. However, implementations MUST still be
synchronized on the map reference across operations which conditionally
insert into the map in order to make that update atomic and thread-safe.
Otherwise a race condition exists for the conditional insert and
different threads could get incoherent answers.
setBNodeMap
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
bnodes
- The blank nodes map.protected void _clear()
incrementalWrite()
to clear terms and statements
which have been written in preparation for buffering more writes. This
does NOT discard either the canonicalizing mapping for blank nodes NOR
any deferred statements.protected void incrementalWrite()
public void add(org.openrdf.model.Resource s, org.openrdf.model.URI p, org.openrdf.model.Value o)
add
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
s
- p
- o
- public void add(org.openrdf.model.Resource s, org.openrdf.model.URI p, org.openrdf.model.Value o, org.openrdf.model.Resource c)
add
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
s
- p
- o
- c
- public void add(org.openrdf.model.Resource s, org.openrdf.model.URI p, org.openrdf.model.Value o, org.openrdf.model.Resource c, StatementEnum type)
add
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
s
- p
- o
- type
- c
- The context (optional).public void add(org.openrdf.model.Statement e)
IStatementBuffer
add
in interface IStatementBuffer<S extends org.openrdf.model.Statement>
add
in interface IBuffer<S extends org.openrdf.model.Statement>
e
- The statement. If stmt implements
BigdataStatement
then the StatementEnum
will
be used (this makes it possible to load axioms into the
database as axioms) but the term identifiers on the stmt's
values will be ignored.public boolean nearCapacity()
protected void handleStatement(org.openrdf.model.Resource _s, org.openrdf.model.URI _p, org.openrdf.model.Value _o, org.openrdf.model.Resource _c, StatementEnum type)
_s
- The subject._p
- The predicate._o
- The object._c
- The context (may be null).type
- The statement type.IndexOutOfBoundsException
- if the buffer capacity is exceeded.nearCapacity()
Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.