public class BlockingBuffer<E> extends Object implements IBlockingBuffer<E>
A buffer that will block when it is full. You write elements on the buffer
and they can be read using iterator()
. This class is safe for
concurrent writes (multiple threads can use add(Object)
) but the
iterator()
is not thread-safe (it assumes a single reader).
You MUST make sure that the thread that sets up the
BlockingBuffer
and which submits a task that writes on the buffer
also sets the Future
on the BlockingBuffer
so that the
iterator can monitor the Future
, detect if it has been canceled, and
throw out the exception from the Future
back to the client. Failure
to do this can lead to the iterator not terminating!
Note: BlockingBuffer
is used (a) for IAccessPath
iterators
that exceed the fully-buffered read threshold; (b) for high-level query with
at least one join; (c) by the BigdataStatementIteratorImpl
, which is
used by the RDF DB for high-level query with no joins; and by the
BigdataSolutionResolverator
, which is used by the RDF DB high-level
query that produces binding sets.
Modifier and Type | Class and Description |
---|---|
protected class |
BlockingBuffer.BlockingIterator
An inner class that reads from the buffer.
|
Modifier and Type | Field and Description |
---|---|
static long |
DEFAULT_CONSUMER_CHUNK_TIMEOUT
The default timeout in milliseconds during which chunks of elements may
be combined in order to satisfy the desired minimum chunk size.
|
static TimeUnit |
DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT
The unit in which
DEFAULT_CONSUMER_CHUNK_TIMEOUT is expressed
(milliseconds). |
static int |
DEFAULT_MINIMUM_CHUNK_SIZE
The default minimum chunk size for the chunk combiner.
|
static int |
DEFAULT_PRODUCER_QUEUE_CAPACITY
The default capacity for the internal
Queue on which elements (or
chunks of elements) are buffered. |
protected static org.apache.log4j.Logger |
log
Warning messages are emitted if either the producer or the consumer is
stalled.
|
Constructor and Description |
---|
BlockingBuffer() |
BlockingBuffer(BlockingQueue<E> queue,
int minimumChunkSize,
long chunkTimeout,
TimeUnit chunkTimeoutUnit,
boolean ordered)
Core ctor.
|
BlockingBuffer(int capacity)
Ctor automatically provisions an appropriate
BlockingQueue . |
BlockingBuffer(int capacity,
int minimumChunkSize,
long chunkTimeout,
TimeUnit chunkTimeoutUnit) |
Modifier and Type | Method and Description |
---|---|
void |
abort(Throwable cause)
Signal abnormal termination of the process writing on the buffer.
|
void |
add(E e)
Add an element to the buffer.
|
boolean |
add(E e,
long timeout,
TimeUnit unit)
Add element to the buffer.
|
void |
clear()
Clear the backing queue.
|
void |
close()
Closes the
BlockingBuffer such that it will not accept new
elements (this is a NOP if unless the buffer is open). |
static <E> E |
combineChunks(E chunk1,
E chunk2)
Combines two chunks in the order given (DOES NOT apply a merge sort).
|
long |
flush()
This is a NOP since the
IBlockingBuffer.iterator() is the only way to consume
data written on the buffer. |
long |
getChunksAddedCount()
The #of chunks
#add(Object)ed to the buffer. |
long |
getChunkTimeout()
The maximum time to wait in nanoseconds for another chunk to come along
so that we can combine it with the current chunk for
#next() . |
long |
getElementsAddedCount()
The #of elements
#add(Object)ed to the buffer. |
long |
getElementsOnQueueCount()
The #of elements on the queue.
|
Future |
getFuture()
The
Future of the producer feeding the BlockingBuffer . |
int |
getMinimumChunkSize()
The desired minimum chunk size for the chunk combiner.
|
boolean |
isEmpty()
True iff there are no elements in the buffer.
|
boolean |
isOpen()
Return
true if the buffer is open. |
boolean |
isOrdered() |
IAsynchronousIterator<E> |
iterator()
The iterator is NOT thread-safe and does NOT support remove().
|
void |
reset()
Reset the state of the buffer, including the counter whose value is
reported by
IBuffer.flush() . |
void |
setFuture(Future future)
Set the future of the producer feeding the
BlockingBuffer . |
int |
size()
This reports the #of items in the queue.
|
String |
toString() |
protected static final org.apache.log4j.Logger log
Category.isInfoEnabled()
is true
,
those log messages will be include stack traces which can help to
identify the the consumer or producer.public static final transient int DEFAULT_PRODUCER_QUEUE_CAPACITY
Queue
on which elements (or
chunks of elements) are buffered.public static final transient int DEFAULT_MINIMUM_CHUNK_SIZE
public static final transient long DEFAULT_CONSUMER_CHUNK_TIMEOUT
public static final transient TimeUnit DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT
DEFAULT_CONSUMER_CHUNK_TIMEOUT
is expressed
(milliseconds).public BlockingBuffer()
public BlockingBuffer(int capacity)
BlockingQueue
.capacity
- The capacity of the buffer. When the generic type <E>
is an array type, then this is the chunkOfChunksCapacity
and small chunks will be automatically combined based on
availability and latency. When zero (0) a
SynchronousQueue
will be used. Otherwise an
ArrayBlockingQueue
of the given capacity is used.public BlockingBuffer(int capacity, int minimumChunkSize, long chunkTimeout, TimeUnit chunkTimeoutUnit)
capacity
- The capacity of the buffer. When the generic type
<E> is an array type, then this is the
chunkOfChunksCapacity and small chunks will be
automatically combined based on availability and latency. When
zero (0) a SynchronousQueue
will be used. Otherwise an
LinkedBlockingDeque
of the given capacity is used.minimumChunkSize
- The desired minimum chunk size. When the elements stored in
the buffer are chunks (i.e., arrays of some component type),
elements will be combined together to form larger chunks until
this minimumChunkSize is satisfied, the iterator()
is
exhausted, or the chunkTimeout is reached.chunkTimeout
- The maximum time to wait in nanoseconds for another chunk to
come along so that we can combine it with the current chunk
for #next()
. A value of ZERO(0) disables chunk
combiner.chunkTimeoutUnit
- The units in which the chunkTimeout is expressed.public BlockingBuffer(BlockingQueue<E> queue, int minimumChunkSize, long chunkTimeout, TimeUnit chunkTimeoutUnit, boolean ordered)
queue
- The queue on which elements will be buffered. Elements will be
added
to the queue and drained by the
iterator()
. When this is a BlockingDeque
array element types will be combined by
add(Object, long, TimeUnit)
until the
minimumChunkSize is satisfied.minimumChunkSize
- The desired minimum chunk size. When the elements stored in
the buffer are chunks (i.e., arrays of some component type),
elements will be combined together to form larger chunks until
this chunkSize is satisfied, the iterator()
is
exhausted, or the chunkTimeout is reached.chunkTimeout
- The maximum time to wait in nanoseconds for another chunk to
come along so that we can combine it with the current chunk
for #next()
. A value of ZERO(0) disables chunk
combiner.chunkTimeoutUnit
- The units in which the chunkTimeout is expressed.ordered
- When true
the data are asserted to be ordered and
a merge sort will be applied if chunks are combined such that
the combined chunks will also be ordered (this has no effect
unless the generic type of the buffer is an array type).public final int getMinimumChunkSize()
DEFAULT_MINIMUM_CHUNK_SIZE
public final long getChunkTimeout()
#next()
. A
value of ZERO(0) disables chunk combiner.DEFAULT_CONSUMER_CHUNK_TIMEOUT
public void setFuture(Future future)
BlockingBuffer
.setFuture
in interface IBlockingBuffer<E>
future
- The Future
.public Future getFuture()
Future
of the producer feeding the BlockingBuffer
.getFuture
in interface IRunnableBuffer<E>
Future
-or- null
if no Future
has been set.public boolean isEmpty()
IBuffer
public int size()
size
in interface IBuffer<E>
getElementsOnQueueCount()
public boolean isOpen()
IRunnableBuffer
true
if the buffer is open.isOpen
in interface IRunnableBuffer<E>
public boolean isOrdered()
public void close()
BlockingBuffer
such that it will not accept new
elements (this is a NOP if unless the buffer is open). Once the buffer is
closed, the BlockingBuffer.BlockingIterator
will drain any elements remaining in
the BlockingBuffer
and then report false
for
BlockingIterator#hasNext()) (this does NOT close the
{@link BlockingIterator}).close
in interface IRunnableBuffer<E>
public void clear() throws IllegalStateException
IllegalStateException
- If the buffer is open.public void abort(Throwable cause)
IRunnableBuffer
abort
in interface IRunnableBuffer<E>
cause
- The exception thrown by the processing writing on the buffer.public long getChunksAddedCount()
#add(Object)ed
to the buffer. This will be ZERO
unless the generic type of the buffer is an array type.public long getElementsAddedCount()
#add(Object)ed
to the buffer. When the generic
type of the buffer is an array type, this will be the sum of the length
of the arrays #add(Object)ed
to the buffer.public long getElementsOnQueueCount()
size()
very closely. However, when
the queue uses an array element type, this will be the sum of the #of
elements across all arrays on the queue.public void add(E e)
IRunnableBuffer
Note: This method is constrained to throw the specified exception if the
buffer has been IRunnableBuffer.close()
d.
add
in interface IBuffer<E>
add
in interface IRunnableBuffer<E>
e
- The elementBufferClosedException
- if the buffer has been close()
d.RuntimeException
- if the caller's Thread
is interrupted. The
RuntimeException
will wrap the
InterruptedException
as its cause.public boolean add(E e, long timeout, TimeUnit unit) throws InterruptedException
e
- The element.timeout
- The timeout.unit
- The unit in which the timeout is expressed.true
iff the element was added to the buffer (
false
indicates that the timeout expired before the
element could be added to the buffer).InterruptedException
- if interruptedpublic long flush()
IBlockingBuffer
IBlockingBuffer.iterator()
is the only way to consume
data written on the buffer.public void reset()
IBuffer
IBuffer.flush()
. Any data in the buffer will be discarded.public IAsynchronousIterator<E> iterator()
Note: If the IAsynchronousIterator
is
ICloseableIterator.close()
d before the Future
of the
process writing on the BlockingBuffer
is done, then the
Future
will be canceled using Thread.interrupt()
. Owing
to a feature of FileChannel
, this will cause the backing store to
be asynchronously closed if the interrupt is detected during an IO. The
backing store will be re-opened transparently, but there is overhead
associated with that (locks to be re-acquired, etc).
The most common reason to close an iterator early are that you want to
only visit a limited #of elements. However, if you use either
IAccessPath.iterator(long,long, int)
or IRule
with an
IQueryOptions
to impose that limit, then most processes that
produce IAsynchronousIterator
s will automatically terminate when
they reach the desired limit, thereby avoiding issuing interrupts. Those
processes include IAccessPath
scans where the #of elements to be
visited exceeds the fully materialized chunk threshold and IRule
evaluation.
iterator
in interface IBlockingBuffer<E>
BlockingBuffer.BlockingIterator.close()
public static <E> E combineChunks(E chunk1, E chunk2)
chunk1
- The first chunk.chunk2
- The second chunk.Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.