public class BigdataFileSystem extends AbstractResource<IDatabase<BigdataFileSystem>> implements IContentRepository, IRowStoreConstants
blockSize
blocks. Partial and even empty blocks are allowed and only
the data written will be stored. 2^63-1
distinct blocks may be
written per file version, making the maximum possible file size
536,870,912
exabytes. Files may be used as queues, in which
case blocks containing new records are atomically appended while a map/reduce
style master consumes the head block of the file.
Efficient method are offered for streaming and block oriented IO. All block read and write operations are atomic, including block append. Files may be easily written such that records never cross a block boundary by the expediency of flushing the output stream if a record would overflow the current block. A flush forces the atomic write of a partial block. Partial blocks are stored efficiently - only the bytes actually written are stored. Blocks are large enough that most applications can safely store a large number of logical records in each block. Files comprised of application defined logical records organized into a sequence of blocks are well-suited to map/reduce processing. They may be efficiently split at block boundaries and references to the blocks distributed to clients. Likewise, reduce clients can aggregate data into large files suitable for further map/reduce processing.
The distributed file system uses two scale-out indices to support ACID operations on file metadata and atomic file append. These ACID guarantees arise from the use of unisolated operations on the respective indices and therefore apply only to the individual file metadata or file block operations. In particular, file metadata read and write are atomic and all individual file block IO (read, write, and append) operations are atomic. Atomicity is NOT guaranteed when performing more than a single file block IO operation, e.g., multiple appends MIGHT NOT write sequential blocks since other block operations could have intervened.
The content length of the file is not stored as file metadata. Instead it MAY be estimated by a range count of the index entries spanned by the file's data. The exact file size may be readily determined when reading small files by the expediency of sucking the entire file into a buffer - all reads are at least one block. Streaming processing is advised in all cases when handling large files, including when the file is to be delivered via HTTP.
The metadata index
uses a SparseRowStore
design, similar to Google's bigtable or Hadoop's HBase. All updates to file
version metadata are atomic. The primary key in the metadata index for every
file is its FileMetadataSchema.ID
. In addition, each version of a file
has a distinct FileMetadataSchema.VERSION
property. File creation time,
version creation time, and file version metadata update timestamps may be
recovered from the timestamps associated with the properties in the metadata
index. The use of the FileMetadataSchema.CONTENT_TYPE
and
FileMetadataSchema.CONTENT_ENCODING
properties is enforced by the
high-level Document
interface. Applications are free to define
additional properties.
Each time a file is created a new version number is assigned. The data index
uses the FileMetadataSchema.ID
as the first field in a compound key. The
second field is the FileMetadataSchema.VERSION
- a 32-bit integer. The
remainder of the key is a 64-bit signed block identifier (2^63-1 distinct
block identifiers). The block identifiers are strictly monotonic (e.g., one
up) and their sequence orders the blocks into the logical byte order of the
file.
Operations that create a new file actually create a new file version. The old
file version will eventually be garbage collected depending on the policy in
effect for compacting merges. Likewise, operations that delete a file simply
mark the metadata for the file version as deleted and the file version will
be eventually reclaimed. The high-level update(Document)
operation
in fact simply creates a new file version.
Use case: A REST-ful repository. Documents may be stored, updated, read, deleted, and searched using a full text index.
Use case: A map/reduce master reads document metadata using an index scan. It
examines the data index's MetadataIndex
(that is, the index that
knows where each partition of the scale-out data index is stored) and
determines which map clients are going to be "close" to each document and
then hands off the document to one of those map clients.
Use case: The same as the use case above, but large files are being processed
and there is a requirement to "break" the files into splits and hand off the
splits. This can be achieved by estimating the file size using a range
count and multiplying through by the block size. Blocks may be handed off to
the clients in parallel (of course, clients need to deal with the hassle of
processing files where records will cross split boundaries unless they always
pad out with unused bytes to the next blockSize
boundary).
Use case: A reduce client wants to write a very large files so it creates a metadata record for the file and then does a series of atomic appears to the file. The file may grow arbitrarily large. Clients may begin to read from the file as soon as the first block has been flushed.
Use case: Queues MAY be built from the operations to atomically read or delete the first block for the file version. The "design pattern" is to have clients append blocks to the file version, taking care that logical rows never cross a block boundary (e.g., by flushing partial blocks). A master then reads the head block from the file version, distributing the logical records therein to consumers and providing fail safe processing in case consumers die or take too long. Once all records for the head block have been processed the master simply deletes the head block. This "pattern" is quite similar to map/reduce and, like map/reduce, requires that the consumer operations may be safely re-run.
Use case: File replication, retention of deleted versions, and media indexing are administered by creating "zones" comprising one or more index partitions with a shared file identifier prefix, e.g., /tmp or /highly-available, or /deployment-text-index. All files in a given zone share the same policy for file replication, compacting merges (determining when a deleted or even a non-deleted file version will be discarded), and media indexing.
Use case: File rename is NOT a cheap operation. It essentially creates a new file version with the desired name and copies the data from the old file version to the new file version. Finally the old file version is "deleted". This approach is necessary since files may moved from one "zone" to another and since the file data must reside on the index partition(s) identified by its file version. FIXME write a JSON API that interoperates to the extent possible with GAE and HBASE.
Modifier and Type | Class and Description |
---|---|
static interface |
BigdataFileSystem.Options
Configuration options.
|
Modifier and Type | Field and Description |
---|---|
protected static boolean |
DEBUG
True iff the
log level is DEBUG or less. |
static String |
FILE_DATA_INDEX_BASENAME
The basename of the index in which the file data blocks are stored.
|
static String |
FILE_METADATA_INDEX_BASENAME
The basename of the index in which the file metadata are stored.
|
protected static boolean |
INFO
True iff the
log level is INFO or less. |
protected static org.apache.log4j.Logger |
log |
protected static long |
MAX_BLOCK
The maximum block identifier that can be assigned to a file version.
|
static FileMetadataSchema |
metadataSchema |
indexManager
AUTO_TIMESTAMP, AUTO_TIMESTAMP_UNIQUE, CURRENT_ROW, MAX_TIMESTAMP, MIN_TIMESTAMP
Constructor and Description |
---|
BigdataFileSystem(IIndexManager indexManager,
String namespace,
Long timestamp,
Properties properties)
Ctor specified by
DefaultResourceLocator . |
Modifier and Type | Method and Description |
---|---|
long |
appendBlock(String id,
int version,
byte[] b,
int off,
int len)
Atomic append of a block to a file version.
|
protected static void |
assertLong(Map<String,Object> properties,
String name) |
protected static void |
assertString(Map<String,Object> properties,
String name) |
Iterator<Long> |
blocks(String id,
int version)
Returns an iterator that visits all block identifiers for the file
version in sequence.
|
long |
copyBlocks(String fromId,
int fromVersion,
String toId,
int toVersion)
Copies blocks from one file version to another.
|
long |
copyStream(String id,
int version,
InputStream is)
Copies data from the input stream to the file version.
|
void |
create()
Note: A commit is required in order for a read-committed view to have
access to the registered indices.
|
int |
create(Document doc)
Create a new persistent document in this repository based on the metadata
and content in the supplied document object.
|
int |
create(Map<String,Object> metadata)
Creates a new file version from the specified metadata.
|
long |
delete(String id)
Note: A new file version is marked as deleted and then the file blocks
for the old version are deleted from the data index.
|
long |
deleteAll(String fromId,
String toId)
Efficient delete of file metadata and file data for all files and file
versions spanned by the specified file identifiers.
|
boolean |
deleteBlock(String id,
int version,
long block)
Atomic delete of a block for a file version.
|
long |
deleteHead(String id,
int version)
Atomic delete of the first block of the file version.
|
void |
destroy()
Destroy any logically contained resources (relations, indices).
|
ITPV[] |
getAllVersionInfo(String id)
Return an array describing all non-eradicated versions of a file.
|
long |
getBlockCount(String id,
int version)
Return the maximum #of blocks in the file version.
|
int |
getBlockSize()
The size of a file block.
|
Iterator<? extends DocumentHeader> |
getDocumentHeaders(String fromId,
String toId)
Return a listing of the documents and metadata about them in this
repository.
|
IIndex |
getFileDataIndex()
The index in which the file blocks are stored (the index must exist).
|
SparseRowStore |
getFileMetadataIndex()
The index in which the file metadata is stored (the index must exist).
|
int |
getOffsetBits()
The #of bits in a 64-bit long integer identifier that are used to encode
the byte offset of a record in the store as an unsigned integer.
|
FileVersionInputStream |
inputStream(String id,
int version)
Read data from a file version.
|
FileVersionInputStream |
inputStream(String id,
int version,
long tx)
Read data from a file version.
|
boolean |
isReadOnly()
|
OutputStream |
outputStream(String id,
int version)
Return an output stream that will append on the file version.
|
Document |
read(String id)
Reads the document metadata for the current version of the specified
file.
|
byte[] |
readBlock(String id,
int version,
long block)
Atomic read of a block for a file version.
|
Reader |
reader(String id,
int version,
String encoding)
Read character data from a file version.
|
byte[] |
readHead(String id,
int version)
Atomic read of the first block of the file version.
|
ITPS |
readMetadata(String id,
long timestamp)
Return the file metadata for the version of the file associated with the
specified timestamp.
|
Iterator<String> |
search(String query)
FIXME Integrate with
FullTextIndex to providing indexing and
search of file versions. |
int |
update(Document doc)
Create a new file version using the supplied file metadata.
|
Map<String,Object> |
updateMetadata(String id,
Map<String,Object> metadata)
Update the metadata for the current file version.
|
boolean |
writeBlock(String id,
int version,
long block,
byte[] b,
int off,
int len)
Atomic write of a block for a file version.
|
Writer |
writer(String id,
int version,
String encoding)
Return a
Writer that will append character data on the
file version. |
acquireExclusiveLock, assertWritable, getBareProperties, getChunkCapacity, getChunkOfChunksCapacity, getChunkTimeout, getCommitTime, getContainer, getContainerNamespace, getExecutorService, getFullyBufferedReadThreshold, getIndexManager, getMaxParallelSubqueries, getNamespace, getProperties, getProperty, getProperty, getTimestamp, init, isForceSerialExecution, toString, unlock
protected static final org.apache.log4j.Logger log
protected static final boolean INFO
log
level is INFO or less.protected static final boolean DEBUG
log
level is DEBUG or less.protected static final long MAX_BLOCK
Note: This is limited to -1 so that we can always
form the key greater than any valid key for a file version. This is
required by the atomic append logic when it seeks the next block
identifier. See AtomicBlockAppendProc
.
public static final String FILE_METADATA_INDEX_BASENAME
AbstractResource.getNamespace()
as a
prefix.
Note: This is a SparseRowStore
governed by the
FileMetadataSchema
.
public static final String FILE_DATA_INDEX_BASENAME
AbstractResource.getNamespace()
as a
prefix.
Note: The entries in this index are a series of blocks for a file. Blocks
are blockSize
bytes each and are assigned monotonically
increasing block numbers by the atomic append operation. The final block
may be smaller (there is no need to pad out the data with nulls). The
keys are formed from two fields - a field containing the content
identifier followed by an integer field containing the sequential block
number. A range scan with a fromKey of the file identifier and a toKey
computed using the successor of the file identifier will naturally visit
all blocks in a file in sequence.
public static final FileMetadataSchema metadataSchema
public BigdataFileSystem(IIndexManager indexManager, String namespace, Long timestamp, Properties properties)
DefaultResourceLocator
.BigdataFileSystem.Options
public final int getOffsetBits()
Options.OFFSET_BITS
,
getBlockSize()
public final int getBlockSize()
2^63 - 1
blocks ( 536,870,912
Exabytes).
Note: The BigdataFileSystem
makes the assumption
that the Options.OFFSET_BITS
is the #of
offset bits configured for the IDataService
s in the connected
IBigdataFederation
and computes the
getBlockSize()
based on that assumption. It is
NOT possible to write blocks on the BigdataFileSystem
whose size
is greater than the maximum block size actually configured for the
IDataService
s in the connected IBigdataFederation
.
Options.OFFSET_BITS
,
getOffsetBits()
public SparseRowStore getFileMetadataIndex()
public IIndex getFileDataIndex()
public boolean isReadOnly()
isReadOnly
in class AbstractResource<IDatabase<BigdataFileSystem>>
public void create()
IBigdataFederation
, ITx.UNISOLATED
operations will take
care of this for you. Otherwise you must do this yourself.create
in interface IMutableResource<IDatabase<BigdataFileSystem>>
create
in class AbstractResource<IDatabase<BigdataFileSystem>>
public void destroy()
IMutableResource
destroy
in interface IMutableResource<IDatabase<BigdataFileSystem>>
destroy
in class AbstractResource<IDatabase<BigdataFileSystem>>
public int create(Map<String,Object> metadata)
metadata
- The file metadata.public int create(Document doc)
IContentRepository
create
in interface IContentRepository
doc
- an object containing the content and metadata to persistpublic Document read(String id)
read
in interface IContentRepository
id
- The file identifier.null
iff
there is no current version for that file identifier.public ITPS readMetadata(String id, long timestamp)
id
- The file identifier.timestamp
- The timestamp.ITPS
,
SparseRowStore#read(Schema, Object, long, com.bigdata.sparse.INameFilter)
public Map<String,Object> updateMetadata(String id, Map<String,Object> metadata)
id
- The file identifier.metadata
- The properties to be written. A null
value for
a property will cause the corresponding property to be
deleted. Properties not present in this map will NOT be
modified.public int update(Document doc)
Note: This is essentially a delete + create operation. Since the combined operation is NOT atomic it is possible that conflicts can arise when more than one client attempts to update a file concurrently.
update
in interface IContentRepository
doc
- The file metadata.public long delete(String id)
delete
in interface IContentRepository
id
- the identifier of the document to deletepublic ITPV[] getAllVersionInfo(String id)
This method returns all known version identifiers together with their timestamps, thereby making it possible to read either the metadata or the data for historical file versions - as long as the metadata and/or data has not yet been eradicated.
The file metadata and data blocks for historical version(s) of a file remain available until they are eradicated from their respective indices by a compacting merge in which the history policies no longer perserve those data.
In order to read the historical file metadata you need to know the timestamp associated with the version identifer which you wish to read. This should be timestamp when that version was deleted MINUS ONE in order to read the last valid metadata for the file version that file version was deleted.
Likewise, in order to read the historical version data you need to know the version identifer which you wish to read as well as the timestamp. In this case, use the timestamp when that version was deleted in order to read the last committed state for the file version.
Historical file version metadata is eradicated atomically since the entire logical row will be hosted on the same index partition. Either the file version metadata is available or it is now.
Historical file version data is eradicated one index partition at a time. If the file version spans more than one index partition then it may be possible to read some blocks from the file but not others.
Historical file version metadata and data will remain available until their governing history policy is no longer satisified. Therefore, when in doubt, you can consult the history policy in force for the file to determine whether or not its data may have been eradicated.
id
- The file identifier.ITPV.getValue()
returns null
give the
timestamp at which a file version was deleted. Tuples
where the ITPV.getValue()
returns non-null
give the timestamp at which a file version was created.#readMetadata(String, long), to read the file version metadata based
on a timestamp.
,
#inputStream(String, int, long), to read the file data as of a
specific timestamp.
public Iterator<? extends DocumentHeader> getDocumentHeaders(String fromId, String toId)
IContentRepository
Note: If you assign identifiers using a namespace then you can use this method to efficiently visit all documents within that namespace.
getDocumentHeaders
in interface IContentRepository
fromId
- The identifier of the first document to be visited or
null
if there is no lower bound.toId
- The identifier of the first document that will NOT be visited
or null
if there is no upper bound.DocumentHeader
s.public long deleteAll(String fromId, String toId)
delete(String)
.deleteAll
in interface IContentRepository
fromId
- The identifier of the first document to be deleted or
null
if there is no lower bound.toId
- The identifier of the first document that will NOT be deleted
or null
if there is no upper bound.public Iterator<String> search(String query)
FullTextIndex
to providing indexing and
search of file versions. Deleted file versions should be removed from the
text index. There should be explicit metadata on the file version in
order for it to be indexed. The text indexer will require content type
and encoding information in order to handle indexing. Low-level output
stream, writer, block write and block append operations will not trigger
the indexer since it depends on the metadata index to know whether or not
a file version should be indexed. However you could explicitly submit a
file version for indexing.
Perhaps the best way to handle this is to queue document metadata up for
a distributed full text indexing service. The service accepts metadata
for documents from the queue and decides whether or not the document
should be indexed based on its metadata and how the document should be
processed if it is to be indexed. Those business rules would be
registered with the full text indexing service. (Alternatively, they can
be configured with the BigdataFileSystem
and applied locally as
the blocks of the file are written into the repository. That's certainly
easier right off the bat.)
search
in interface IContentRepository
query
- A query.public Iterator<Long> blocks(String id, int version)
Note: This may be used to efficiently distribute blocks among a population of clients, e.g., in a map/reduce paradigm.
public long copyBlocks(String fromId, int fromVersion, String toId, int toVersion)
fromId
- fromVersion
- toId
- toVersion
- public boolean writeBlock(String id, int version, long block, byte[] b, int off, int len)
Note: You can write any valid block identifier at any time. If the block exists then its data will be replaced.
Note: Writing blocks out of sequence can create "holes". Those holes may
be filled by later writing the "missing" blocks.
copyBlocks(String, int, String, int)
will renumber the blocks
and produce a dense sequence of blocks.
Note: Atomic append will always write the successor of the largest block
identifier already written on the file version. If you write block
MAX_BLOCK
then it will no longer be possible to append blocks to
that file version, but you can still write blocks using
writeBlock(String, int, long, byte[], int, int)
.
id
- The file identifier.version
- The file version.block
- The block identifier in [0:MAX_BLOCK
].b
- The buffer containing the bytes to be written. When the buffer
contains more than blockSize
bytes it will be broken
up into multiple blocks.off
- The offset of the 1st byte to be written.len
- The #of bytes to be written.true
iff the block was overwritten (ie., if the
block already exists, which case its contents were replaced).IllegalArgumentException
- if id is null
or an empty string.IllegalArgumentException
- if version is negative.IllegalArgumentException
- if block is negative.IllegalArgumentException
- if b is null
.IllegalArgumentException
- if off is negative or greater than the length of the
byte[].IllegalArgumentException
- if len is negative or off+len is greater
than the length of the byte[].IllegalArgumentException
- if len is greater than blockSize
.public long deleteHead(String id, int version)
id
- The file identifier.version
- The version identifier.-1L
if nothing was deleted.public boolean deleteBlock(String id, int version, long block)
id
- The file identifier.version
- The version identifier.block
- The block identifier -or- -1L
to read the first
block in the file version regardless of its block identifier.true
iff the block was deleted.public byte[] readHead(String id, int version)
id
- The file identifier.version
- The version identifier.null
iff there are
no blocks for that file version. Note that an empty block will
return an empty byte[] rather than null
.public byte[] readBlock(String id, int version, long block)
id
- The file identifier.version
- The version identifier.block
- The block identifier.null
iff the block
does not exist. Note that an empty block will return an empty
byte[] rather than null
.public long appendBlock(String id, int version, byte[] b, int off, int len)
id
- The file identifier.version
- The file version.b
- The buffer containing the data to be written..off
- The offset of the 1st byte to be written.len
- The #of bytes to be written in [0:blockSize
].IllegalArgumentException
- if id is null
or an empty string.IllegalArgumentException
- if version is negative.IllegalArgumentException
- if b is null
.IllegalArgumentException
- if off is negative or greater than the length of the
byte[].IllegalArgumentException
- if len is negative or off+len is greater
than the length of the byte[].IllegalArgumentException
- if len is greater than blockSize
.public long getBlockCount(String id, int version)
Note: the block count only decreases when a compacting merge eradicates
deleted blocks from an index partition. It will increase any time there
is a write on a block for the file version for which neither a delete nor
an undeleted entry exists. The only way to count the #of non-deleted
blocks for a file version is to traverse the blocks(String, int)
iterator.
id
- The file identifier.version
- The file version identifier.public Writer writer(String id, int version, String encoding) throws UnsupportedEncodingException
Writer
that will append character data on the
file version. Characters written on the Writer
will be converted
to bytes using the specified encoding. Bytes will be buffered until the
block is full and then written on the file version using an atomic
append. An Writer.flush()
will force a non-empty partial block to
be written immediately.
Note: Map/Reduce processing of a file version MAY be facilitated greatly
by ensuring that "records" never cross a block boundary - this means that
file versions can be split into blocks and blocks distributed to clients
without any regard for the record structure within those blocks. The
caller can prevent records from crossing block boundaries by the simple
expediency of invoking Writer.flush()
to force the atomic append
of a (partial but non-empty) block to the file.
Since the characters are being converted to bytes, the caller MUST make
Writer.flush()
decisions with an awareness of the expansion rate
of the specified encoding. For simplicity, it is easy to specify
UTF-16
in which case you can simply count two bytes
written for each character written.
id
- The file identifier.version
- The version identifier.encoding
- The character set encoding.UnsupportedEncodingException
public Reader reader(String id, int version, String encoding) throws UnsupportedEncodingException
id
- The file identifier.version
- The version identifier.encoding
- The character set encoding.UnsupportedEncodingException
public FileVersionInputStream inputStream(String id, int version)
Note: The input stream will remain coherent for the file version as of the time that the view on the file version is formed. Additional atomic appends MAY be read, but that is NOT guarenteed. If the file is deleted and its data is expunged by a compacting merge during the read then the read MAY be truncated.
id
- The file identifier.version
- The version identifier.null
if there is no data for that file
version, including no deleted blocks pending garbage collection.
An empty input stream MAY be returned since empty blocks are
allowed. An empty stream will also be returned after a file
version is deleted until the deleted blocks are eradicated from
the file data index.public FileVersionInputStream inputStream(String id, int version, long tx)
Some points about consistency and transaction identifiers.
ITx.UNISOLATED
read addition atomic writes and
atomic appends issued after the input stream view was formed MAY be read,
but that is NOT guarenteed - it depends on the buffering of the range
iterator used to read blocks for the file version. Likewise, if the file
is deleted and its data is expunged by a compacting merge during the read
then the read MAY be truncated. data index
provided that the relevant data has
not been eradicated by a compacting merge. It is not possible to recover
all states - merely committed states - since unisolated writes may be
grouped together by group commit and therefore have the same commit
point. ITransactionManagerService
. In general
the use of full transactions is discouraged as the
BigdataFileSystem
is designed for high throughput and high
concurrency with weaker isolation levels suitable for scale-out
processing techniques including map/reduce.id
- The file identifier.version
- The version identifier.tx
- The transaction identifier. This is generally either
ITx.UNISOLATED
to use an unisolated read -or-
- timestamp
to use a historical read for the
most recent consistent state of the file data not later than
timestamp.null
if there is no data for that file
version, including no deleted blocks pending garbage collection.
An empty input stream MAY be returned since empty blocks are
allowed. An empty stream will also be returned after a file
version is deleted until the deleted blocks are eradicated from
the file data index.public OutputStream outputStream(String id, int version)
OutputStream.flush()
will force a non-empty partial block to be
written immediately.
Note: Map/Reduce processing of a file version MAY be facilitated greatly
by ensuring that "records" never cross a block boundary - this means that
files can be split into blocks and blocks distributed to clients without
any regard for the record structure within those blocks. The caller can
prevent records from crossing block boundaries by the simple expediency
of invoking OutputStream.flush()
to force the atomic append of a
(partial but non-empty) block to the file.
id
- The file identifier.version
- The version identifier.public long copyStream(String id, int version, InputStream is)
id
- The file identifier.version
- The version identifier.is
- The input stream (closed iff it is fully consumed).Copyright © 2006–2019 SYSTAP, LLC DBA Blazegraph. All rights reserved.