public class ConcurrencyManager extends Object implements IConcurrencyManager
ConcurrencyManager
uses a NonBlockingLockManager
to identify
a schedule of operations such that access to an unisolated named index is
always single threaded while access to distinct unisolated named indices MAY
be concurrent.
There are several thread pools that facilitate concurrency. They are:
readService
writeService
BTree
is NOT thread-safe for writers. Therefore writers
MUST predeclare their locks, which allows us to avoid deadlocks altogether.
This is also used to schedule the commit phrase of transactions (transaction
commits are in fact unisolated tasks).txWriteService
This is used for the "active" phrase of transaction. Transactions read from
historical states of named indices during their active phase and buffer the
results on isolated indices backed by a per-transaction
TemporaryStore
. Since transactions never write on the unisolated
indices during their "active" phase, distinct transactions may be run with
arbitrary concurrency. However, concurrent tasks for the same transaction
must obtain an exclusive lock on the isolated index(s) that are used to
buffer their writes.
A transaction that requests a commit using the
ITransactionManagerService
results in a unisolated task being
submitted to the writeService
. Transactions are selected to commit
once they have acquired a lock on the corresponding unisolated indices,
thereby enforcing serialization of their write sets both among other
transactions and among unisolated writers. The commit itself consists of the
standard validation and merge phrases.
Modifier and Type | Class and Description |
---|---|
static interface |
ConcurrencyManager.IConcurrencyManagerCounters
Interface defines and documents the counters and counter namespaces for
the
ConcurrencyManager . |
static interface |
ConcurrencyManager.Options
Options for the
ConcurrentManager . |
Modifier and Type | Field and Description |
---|---|
protected TaskCounters |
countersHR
Counters for the
readService . |
protected TaskCounters |
countersTX
Counters for the
txWriteService . |
protected WriteTaskCounters |
countersUN
Counters for
writeService . |
Constructor and Description |
---|
ConcurrencyManager(Properties properties,
ILocalTransactionManager transactionManager,
IResourceManager resourceManager)
(Re-)open a journal supporting concurrent operations.
|
Modifier and Type | Method and Description |
---|---|
CounterSet |
getCounters()
Return the
CounterSet . |
double |
getJournalOverextended()
Return the overextension multiplier for the journal.
|
IResourceManager |
getResourceManager()
The object used to manage local resources.
|
ILocalTransactionManager |
getTransactionManager()
The client side of the transaction manager.
|
WriteExecutorService |
getWriteService()
The service on which read-write tasks are executed.
|
<T> List<Future<T>> |
invokeAll(Collection<? extends AbstractTask<T>> tasks)
Executes the given tasks, returning a list of Futures holding their
status and results when all complete.
|
<T> List<Future<T>> |
invokeAll(Collection<? extends AbstractTask<T>> tasks,
long timeout,
TimeUnit unit)
Executes the given tasks, returning a list of Futures holding their
status and results when all complete or the timeout expires, whichever
happens first.
|
boolean |
isOpen()
Return
true iff the service is running. |
void |
shutdown()
Shutdown the thread pools (running tasks will run to completion, but no
new tasks will start).
|
void |
shutdownNow()
Immediate shutdown (running tasks are canceled rather than being
permitted to complete).
|
<T> FutureTask<T> |
submit(AbstractTask<T> task)
Submit a task (asynchronous).
|
protected final WriteTaskCounters countersUN
writeService
.protected final TaskCounters countersTX
txWriteService
.protected final TaskCounters countersHR
readService
.public ConcurrencyManager(Properties properties, ILocalTransactionManager transactionManager, IResourceManager resourceManager)
properties
- See ConcurrencyManager.Options
.transactionManager
- The object managing the local transactions.resourceManager
- The object managing the resources on which the indices are
stored.public WriteExecutorService getWriteService()
IConcurrencyManager
getWriteService
in interface IConcurrencyManager
public ILocalTransactionManager getTransactionManager()
IConcurrencyManager
getTransactionManager
in interface IConcurrencyManager
public IResourceManager getResourceManager()
IConcurrencyManager
getResourceManager
in interface IConcurrencyManager
public boolean isOpen()
IServiceShutdown
true
iff the service is running.isOpen
in interface IServiceShutdown
public void shutdown()
shutdown
in interface IConcurrencyManager
shutdown
in interface IServiceShutdown
IConcurrencyManager.shutdownNow()
public void shutdownNow()
shutdownNow
in interface IConcurrencyManager
shutdownNow
in interface IServiceShutdown
shutdown()
public CounterSet getCounters()
CounterSet
.getCounters
in interface ICounterSetAccess
public <T> FutureTask<T> submit(AbstractTask<T> task)
Note: Unisolated write tasks will NOT return before the next group commit
(exceptions may be thrown if the task fails or the commit fails). The
purpose of group commits is to provide higher throughput for writes on
the store by only syncing the data to disk periodically rather than after
every write. Group commits are scheduled by the #commitService
.
The trigger conditions for group commits may be configured using
ConcurrencyManager.Options
. If you are using the store in a
single threaded context then you may set
ConcurrencyManager.Options.WRITE_SERVICE_CORE_POOL_SIZE
to ONE (1) which has the
effect of triggering commit immediately after each unisolated write.
However, note that you can not sync a disk more than ~ 30-40 times per
second so your throughput in write operations per second will never
exceed that for a single-threaded application writing on a hard disk.
(Your mileage can vary if you are writing on a transient store or using a
durable medium other than disk).
Note: The isolated indices used by a read-write transaction are NOT thread-safe. Therefore a partial order is imposed over concurrent tasks for the same transaction that seek to read or write on the same index(s). Full concurrency is allowed when different transactions access the same index(s), but write-write conflicts MAY be detected during commit processing.
Note: The following exceptions MAY be wrapped by Future.get()
for
tasks submitted via this method:
ValidationError
InterruptedException
shutdown()
after the timeout has expired or
shutdownNow()
. In either of these cases the task will not be
accepted by the journal.submit
in interface IConcurrencyManager
task
- The task.Future
that may be used to resolve the outcome of the
task.RejectedExecutionException
- if task cannot be scheduled for execution (typically the
queue has a limited capacity and is full)NullPointerException
- if task nullpublic double getJournalOverextended()
public <T> List<Future<T>> invokeAll(Collection<? extends AbstractTask<T>> tasks) throws InterruptedException
Note: Contract is per ExecutorService.invokeAll(Collection)
invokeAll
in interface IConcurrencyManager
tasks
- The tasks.Future
s.InterruptedException
- if interrupted while waiting, in which case unfinished
tasks are canceled.NullPointerException
- if tasks or any of its elements are nullRejectedExecutionException
- if any task cannot be scheduled for executionpublic <T> List<Future<T>> invokeAll(Collection<? extends AbstractTask<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
Note: Contract is based on
ExecutorService.invokeAll(Collection, long, TimeUnit)
but only
the Future
s of the submitted tasks are returned.
invokeAll
in interface IConcurrencyManager
tasks
- The tasks.Future
s of all tasks that were
submitted
prior to the expiration
of the timeout.InterruptedException
- if interrupted while waiting, in which case unfinished
tasks are canceled.NullPointerException
- if tasks or any of its elements are nullRejectedExecutionException
- if any task cannot be scheduled for executionCopyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.