public class WriteExecutorService extends ThreadPoolExecutor
ThreadPoolExecutor
used by the ConcurrencyManager
to
execute concurrent unisolated write tasks and perform group commits. Tasks
extend AbstractTask
. The caller receives a Future
when they
submit a task to the write service. That Future
is NOT available
until the next group commit following the successful execution of the write
task.
Note: adding the thread name to the log messages for this class can aid debugging. You can do this using the log4j configuration.
Note: the problem with running concurrent unisolated operations during a commit and relying on an "auto-commit" flag to indicate whether or not the index will participate is two fold. First, previous unisolated operations on the same index will not get committed if an operation is currently running, so we could wind up deferring check points of indices for quite a while. Second, if there is a problem with the commit and we have to abort, then any ongoing operations would still be using unisolated indices that could include write sets that were discarded - this would make abort non-atomic.
The ground state from which an unisolated operation begins needs to evolve
after each unisolated operation that reaches its commit point successfully.
This can be accomplished by holding onto the btree reference, or even just the
address at which the metadata record for the btree was last written. We use
AbstractJournal.getName2Addr()
for this purpose.
However, if an unisolated write fails for any reason on a given index then we MUST use the last successful check point for that index. This is handled by doing an abort.
Note: Due to the way in which the BTree
class is written, it "steals"
child references when cloning an immutable node or leaf prior to making
modifications. This means that we must reload the btree from a metadata
record if we have to roll back due to an abort of some unisolated operation
since the state of the BTree
has been changed as a side effect in a
non-reversible manner.
Note: Running Thread
s may be interrupted at arbitrary moments for a
number of reasons by this class. The foremost example is a Thread
that is executing an AbstractTask
when a concurrent decision is made
to discard the commit group, e.g., because another task in that commit group
failed. Regardless of the reason, if the Thread
is performing an NIO
operation at the moment that the interrupt is notice, then it will close the
channel on which that operation was being performed. If you are using a
disk-based BufferMode
for the journal, then the interrupt just caused
the backing FileChannel
to be closed. In order to permit continued
operations on the journal, the IRawStore
MUST transparently re-open
the channel. (The same problem can arise if you are using NIO for sockets or
anything else that uses the Channel
abstraction.)
The WriteExecutorService
invokes #overflow()
each time it
does a group commit. Normally the WriteExecutorService
does not
quiesce before doing a group commit, and when it is not quiescent the
ResourceManager
can NOT #overflow()
the journal since
concurrent tasks are still writing on the current journal. Therefore the
ResourceManager
monitors the IBufferStrategy.getExtent()
of
the live journal. When it decides that the live journal is large enough it
pause()
s WriteExecutorService
and waits
until #overflow()
is called with a quiescent
WriteExecutorService
. This effectively grants the
ResourceManager
exclusive access to the journal. It can then run
#overflow()
to setup a new journal and tell the
WriteExecutorService
to resume()
processing.
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
Modifier and Type | Field and Description |
---|---|
protected AtomicInteger |
activeTaskCountWithLocksHeld |
protected long |
groupCommitTimeout
The time in milliseconds that a group commit will await currently running
tasks to join the commit group.
|
protected long |
overflowLockRequestTimeout
The time in milliseconds that a group commit will await an exclusive lock
on the write service in order to perform synchronous overflow processing.
|
protected static org.apache.log4j.Logger |
overflowLog
Uses the
OverflowManager log for things relating to synchronous
overflow processing. |
Constructor and Description |
---|
WriteExecutorService(IResourceManager resourceManager,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit keepAliveUnit,
BlockingQueue<Runnable> queue,
ThreadFactory threadFactory,
long groupCommitTimeout,
long overflowLockRequestTimeout) |
Modifier and Type | Method and Description |
---|---|
protected void |
afterTask(AbstractTask<?> r,
Throwable t)
This is executed after
AbstractTask.doTask() . |
protected void |
beforeExecute(Thread t,
Runnable r)
|
protected void |
beforeTask(Thread t,
AbstractTask<?> r)
Executed before
AbstractTask.doTask() |
long |
getAbortCount()
The #of aborts (not failed tasks) since the
WriteExecutorService
was started. |
int |
getActiveTaskCountWithLocksHeld()
The instantaneous #of tasks that have acquired their
locks are executing concurrently on the write service.
|
long |
getByteCountPerCommit()
The #of bytes written by the last commit.
|
int |
getCommitGroupSize()
The #of tasks in the most recent commit group.
|
long |
getGroupCommitCount()
The #of group commits since the
WriteExecutorService was started
(all commits by this service are group commits). |
int |
getInternalLockQueueLength()
The #of threads queued on the internal
lock . |
NonBlockingLockManagerWithNewDesign<String> |
getLockManager()
The object that coordinates exclusive access to the resources.
|
int |
getMaxCommitGroupSize()
The maximum #of tasks in any commit group.
|
long |
getMaxCommitServiceTime()
The maximum service time in milliseconds of the atomic commit.
|
long |
getMaxCommitWaitingTime()
The maximum waiting time in millseconds from when a task completes
successfully until the next group commit.
|
int |
getMaxPoolSize()
The maximum #of threads in the pool.
|
long |
getMaxRunning()
The maximum #of tasks that are concurrently executing without regard to
whether or not the tasks have acquired their locks.
|
long |
getOverflowCount()
The #of times synchronous overflow processing has been performed.
|
int |
getReadyCount()
#of tasks that are waiting to run but are blocked on the #lock.
|
long |
getRejectedExecutionCount()
The #of rejected tasks.
|
long |
getTaskCommittedCount()
The #of tasks that (a) executed successfully and (b) have been committed.
|
long |
getTaskFailedCount()
The #of tasks that have failed.
|
long |
getTaskSuccessCount()
The #of tasks that have executed successfully (MIGHT NOT have been
committed safely).
|
void |
shutdown()
Overridden to shutdown the embedded lock manager service.
|
List<Runnable> |
shutdownNow()
Overridden to shutdown the embedded lock manager service.
|
String |
toString()
A snapshot of the executor state.
|
boolean |
tryLock(long timeout,
TimeUnit unit)
Acquires an exclusive lock on the write service.
|
void |
unlock()
Release the exclusive write lock.
|
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, execute, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, remove, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, terminated
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
protected static final org.apache.log4j.Logger overflowLog
OverflowManager
log for things relating to synchronous
overflow processing.protected final long groupCommitTimeout
protected final long overflowLockRequestTimeout
protected final AtomicInteger activeTaskCountWithLocksHeld
public WriteExecutorService(IResourceManager resourceManager, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, long groupCommitTimeout, long overflowLockRequestTimeout)
resourceManager
- corePoolSize
- maximumPoolSize
- keepAliveTime
- keepAliveUnit
- queue
- threadFactory
- groupCommitTimeout
- The time in milliseconds that a group commit will await
currently running tasks to join the commit group.overflowLockRequestTimeout
- public NonBlockingLockManagerWithNewDesign<String> getLockManager()
public long getRejectedExecutionCount()
public int getMaxPoolSize()
public long getMaxRunning()
Note: Since this does not reflect tasks executing concurrently with locks held it is not a measure of the true concurrency of tasks executing on the service.
public long getMaxCommitWaitingTime()
public long getMaxCommitServiceTime()
AbstractJournal.commit()
public int getInternalLockQueueLength()
lock
. These are (for the
most part) threads waiting to start or stop during a group commit.
However, you can not use this measure to infer whether there are threads
waiting to run which are being starved during a group commit or simply
threads waiting to do their post-processing.public int getCommitGroupSize()
public int getMaxCommitGroupSize()
public long getGroupCommitCount()
WriteExecutorService
was started
(all commits by this service are group commits).public long getByteCountPerCommit()
public long getAbortCount()
WriteExecutorService
was started. Aborts are serious events and occur IFF an
IAtomicStore.commit()
fails. Failed tasks do NOT result in an
abort.public long getTaskFailedCount()
public long getTaskSuccessCount()
getTaskCommittedCount()
public long getTaskCommittedCount()
public long getOverflowCount()
public int getActiveTaskCountWithLocksHeld()
The returned value is limited by ThreadPoolExecutor.getActiveCount()
. Note that
ThreadPoolExecutor.getActiveCount()
reports tasks which are waiting on
their locks as well as those engaged in various pre- or
post-processing.
public int getReadyCount()
protected void beforeExecute(Thread t, Runnable r)
beforeExecute
in class ThreadPoolExecutor
t
- The thread that will run the task.r
- The Runnable
wrapping the AbstractTask
- this
is actually a FutureTask
. See
AbstractExecutorService
.protected void beforeTask(Thread t, AbstractTask<?> r)
AbstractTask.doTask()
t
- The thread in which that task will execute.r
- The AbstractTask
.protected void afterTask(AbstractTask<?> r, Throwable t)
AbstractTask.doTask()
. If the task
completed successfully (no exception thrown and its thread is not
interrupted) then we invoke groupCommit()
. Otherwise the write
set of the task was already discarded by
AbstractTask.InnerWriteServiceCallable
and we do nothing.r
- The Callable
wrapping the AbstractTask
.t
- The exception thrown -or- null
if the task
completed successfully.public String toString()
toString
in class ThreadPoolExecutor
public void shutdown()
shutdown
in interface ExecutorService
shutdown
in class ThreadPoolExecutor
public List<Runnable> shutdownNow()
shutdownNow
in interface ExecutorService
shutdownNow
in class ThreadPoolExecutor
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
The write service is paused for up to timeout units. During that time no new tasks will start. The lock will be granted if all running tasks complete before the timeout expires.
Note: The exclusive write lock is granted using the same lock
that is used to coordinate all other activity of the write service. If
the exclusive write lock is granted then the caller's thread will hold
the lock
and MUST release the lock using unlock()
.
Note: When the exclusive lock is granted there will be NO running tasks and the write service will be paused. This ensures that no task can run on the write service and that groupCommit will not attempt to grab the lock itself.
Note: If there is heavy write activity on the service then the timeout may well expire before the exclusive write lock becomes available. Further, the acquisition of the exclusive write lock will throttle concurrent write activity and negatively impact write performance if the system is heavily loaded by write tasks. Therefore, the write lock should be requested only when it is necessary and a significant value should be specified for the timeout (60s or more) to ensure that it is acquired.
timeout
- The timeout.unit
- The unit in which the timeout is expressed.true
iff the exclusive lock was acquired.InterruptedException
public void unlock()
IllegalMonitorStateException
- if the current thread does not own the lock.Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.