• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Tephra: Transactions for Apache HBase

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

Tephra

开源软件地址:

https://gitee.com/apache/tephra

开源软件介绍:

(Tephra)

Note: Tephra has moved to Apache Incubator

For latest updates on Apache Tephra go to its new site at http://tephra.incubator.apache.org.


Transactions for Apache HBase™:Cask Tephra provides globally consistent transactions on top of Apache HBase. While HBaseprovides strong consistency with row- or region-level ACID operations, it sacrificescross-region and cross-table consistency in favor of scalability. This trade-off requiresapplication developers to handle the complexity of ensuring consistency when their modificationsspan region boundaries. By providing support for global transactions that span regions, tables, ormultiple RPCs, Tephra simplifies application development on top of HBase, without a significantimpact on performance or scalability for many workloads.

How It Works

Tephra leverages HBase's native data versioning to provide multi-versioned concurrencycontrol (MVCC) for transactional reads and writes. With MVCC capability, each transactionsees its own consistent "snapshot" of data, providing snapshot isolation of concurrent transactions.

Tephra consists of three main components:

  • Transaction Server - maintains global view of transaction state, assigns new transaction IDsand performs conflict detection;
  • Transaction Client - coordinates start, commit, and rollback of transactions; and
  • TransactionProcessor Coprocessor - applies filtering to the data read (based on agiven transaction's state) and cleans up any data from old (no longer visible) transactions.

Transaction Server

A central transaction manager generates a globally unique, time-based transaction ID for eachtransaction that is started, and maintains the state of all in-progress and recently committedtransactions for conflict detection. While multiple transaction server instances can be runconcurrently for automatic failover, only one server instance is actively serving requests at atime. This is coordinated by performing leader election amongst the running instances throughZooKeeper. The active transaction server instance will also register itself using a servicediscovery interface in ZooKeeper, allowing clients to discover the currently active serverinstance without additional configuration.

Transaction Client

A client makes a call to the active transaction server in order to start a new transaction. Thisreturns a new transaction instance to the client, with a unique transaction ID (used to identifywrites for the transaction), as well as a list of transaction IDs to exclude for reads (fromin-progress or invalidated transactions). When performing writes, the client overrides thetimestamp for all modified HBase cells with the transaction ID. When reading data from HBase, theclient skips cells associated with any of the excluded transaction IDs. The read exclusions areapplied through a server-side filter injected by the TransactionProcessor coprocessor.

TransactionProcessor Coprocessor

The TransactionProcessor coprocessor is loaded on all HBase tables where transactional readsand writes are performed. When clients read data, it coordinates the server-side filteringperformed based on the client transaction's snapshot. Data cells from any transactions that arecurrently in-progress or those that have failed and could not be rolled back ("invalid"transactions) will be skipped on these reads. In addition, the TransactionProcessor cleansup any data versions that are no longer visible to any running transactions, either because thetransaction that the cell is associated with failed or a write from a newer transaction wassuccessfully committed to the same column.

More details on how Tephra transactions work and the interactions between these components can befound in our Transactions over HBase presentation.

Is It Building?

Status of continuous integration build at Travis CI: (BuildStatus)

Requirements

Java Runtime

The latest JDK or JRE version 1.7.xx or 1.8.xxfor Linux, Windows, or Mac OS X must be installed in your environment; we recommend the Oracle JDK.

To check the Java version installed, run the command:

$ java -version

Tephra is tested with the Oracle JDKs; it may work with other JDKs such asOpen JDK, but it has not been tested with them.

Once you have installed the JDK, you'll need to set the JAVA_HOME environment variable.

Hadoop/HBase Environment

Tephra requires a working HBase and HDFS environment in order to operate. Tephra supports thesecomponent versions:

ComponentSourceSupported Versions
HDFSApache Hadoop2.0.2-alpha through 2.6.0
CDH or HDP(CDH) 5.0.0 through 5.7.0 or (HDP) 2.0, 2.1, 2.2 or 2.3
MapR4.1 (with MapR-FS)
HBaseApache0.96.x, 0.98.x, 1.0.x, and 1.1.x
CDH or HDP(CDH) 5.0.0 through 5.7.0 or (HDP) 2.0, 2.1, 2.2 or 2.3
MapR4.1 (with Apache HBase)
ZookeeperApacheVersion 3.4.3 through 3.4.5
CDH or HDP(CDH) 5.0.0 through 5.7.0 or (HDP) 2.0, 2.1, 2.2 or 2.3
MapR4.1

Note: Components versions shown in this table are those that we have tested and areconfident of their suitability and compatibility. Later versions of components may work,but have not necessarily been either tested or confirmed compatible.

Getting Started

You can get started with Tephra by building directly from the latest source code:

git clone https://github.com/caskdata/tephra.gitcd tephramvn clean package

After the build completes, you will have a full binary distribution of Tephra under thetephra-distribution/target/ directory. Take the tephra-<version>.tar.gz file and installit on your systems.

For any client applications, add the following dependencies to any Apache Maven POM files (or yourbuild system's equivalent configuration), in order to make use of Tephra classes:

<dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-api</artifactId>  <version>0.7.1</version></dependency><dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-core</artifactId>  <version>0.7.1</version></dependency>

Since the HBase APIs have changed between versions, you will need to select theappropriate HBase compatibility library.

For HBase 0.96.x:

<dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-hbase-compat-0.96</artifactId>  <version>0.7.1</version></dependency>

For HBase 0.98.x:

<dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-hbase-compat-0.98</artifactId>  <version>0.7.1</version></dependency>

For HBase 1.0.x:

<dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-hbase-compat-1.0</artifactId>  <version>0.7.1</version></dependency>

If you are running the CDH 5.4, 5.5, or 5.6 version of HBase 1.0.x (this version contains API incompatibilitieswith Apache HBase 1.0.x):

<dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-hbase-compat-1.0-cdh</artifactId>  <version>0.7.1</version></dependency>

For HBase 1.1.x or CDH 5.7 version of HBase 1.2.x:

<dependency>  <groupId>co.cask.tephra</groupId>  <artifactId>tephra-hbase-compat-1.1</artifactId>  <version>0.7.1</version></dependency>

Deployment and Configuration

Tephra makes use of a central transaction server to assign unique transaction IDs for datamodifications and to perform conflict detection. Only a single transaction server can activelyhandle client requests at a time, however, additional transaction server instances can be runsimultaneously, providing automatic failover if the active server becomes unreachable.

Transaction Server Configuration

The Tephra transaction server can be deployed on the same cluster nodes running the HBase HMasterprocess. The transaction server requires that the HBase libraries be available on the server'sJava CLASSPATH.

The transaction server supports the following configuration properties. All configurationproperties can be added to the hbase-site.xml file on the server's CLASSPATH:

NameDefaultDescription
data.tx.bind.port15165Port to bind to
data.tx.bind.address0.0.0.0Server address to listen on
data.tx.server.io.threads2Number of threads for socket IO
data.tx.server.threads20Number of handler threads
data.tx.timeout30Timeout for a transaction to complete (seconds)
data.tx.long.timeout86400Timeout for a long running transaction to complete (seconds)
data.tx.cleanup.interval10Frequency to check for timed out transactions (seconds)
data.tx.snapshot.dir HDFS directory used to store snapshots of tx state
data.tx.snapshot.interval300Frequency to write new snapshots
data.tx.snapshot.retain10Number of old transaction snapshots to retain
data.tx.metrics.period60Frequency for metrics reporting (seconds)

To run the Transaction server, execute the following command in your Tephra installation:

./bin/tephra start

Any environment-specific customizations can be made by editing the bin/tephra-env.sh script.

Client Configuration

Since Tephra clients will be communicating with HBase, the HBase client libraries and the HBase clusterconfiguration must be available on the client's Java CLASSPATH.

Client API usage is described in the Client APIs section.

The transaction service client supports the following configuration properties. All configurationproperties can be added to the hbase-site.xml file on the client's CLASSPATH:

NameDefaultDescription
data.tx.client.timeout30000Client socket timeout (milliseconds)
data.tx.client.providerpool

Client provider strategy:

  • "pool" uses a pool of clients
  • "thread-local" a client per thread

Note that "thread-local" provider can have aresource leak if threads are recycled

data.tx.client.count50Max number of clients for "pool" provider
data.tx.client.obtain.timeout3000Timeout (milliseconds) to wait when obtainingclients from the "pool" provider
data.tx.client.retry.strategybackoffClient retry strategy: "backoff" for back offbetween attempts; "n-times" for fixed numberof tries
data.tx.client.retry.attempts2Number of times to retry ("n-times" strategy)
data.tx.client.retry.backoff.initial100Initial sleep time ("backoff" strategy)
data.tx.client.retry.backoff.factor4Multiplication factor for sleep time
data.tx.client.retry.backoff.limit30000Exit when sleep time reaches this limit

HBase Coprocessor Configuration

In addition to the transaction server, Tephra requires an HBase coprocessor to be installed on alltables where transactional reads and writes will be performed.

To configure the coprocessor on all HBase tables, add the following to hbase-site.xml.

For HBase 0.96.x:

<property>  <name>hbase.coprocessor.region.classes</name>  <value>co.cask.tephra.hbase96.coprocessor.TransactionProcessor</value></property>

For HBase 0.98.x:

<property>  <name>hbase.coprocessor.region.classes</name>  <value>co.cask.tephra.hbase98.coprocessor.TransactionProcessor</value></property>

For HBase 1.0.x:

<property>  <name>hbase.coprocessor.region.classes</name>  <value>co.cask.tephra.hbase10.coprocessor.TransactionProcessor</value></property>

For the CDH 5.4, 5.5, or 5.6 version of HBase 1.0.x:

<property>  <name>hbase.coprocessor.region.classes</name>  <value>co.cask.tephra.hbase10cdh.coprocessor.TransactionProcessor</value></property>

For HBase 1.1.x or CDH 5.7 version of HBase 1.2.x:

<property>  <name>hbase.coprocessor.region.classes</name>  <value>co.cask.tephra.hbase11.coprocessor.TransactionProcessor</value></property>

You may configure the TransactionProcessor to be loaded only on HBase tables that you willbe using for transaction reads and writes. However, you must ensure that the coprocessor isavailable on all impacted tables in order for Tephra to function correctly.

Using Existing HBase Tables Transactionally

Tephra overrides HBase cell timestamps with transaction IDs, and uses these transactionIDs to filter out cells older than the TTL (Time-To-Live). Transaction IDs are at a higherscale than cell timestamps. When a regular HBase table that has existing data isconverted to a transactional table, existing data may be filtered out during reads. Toallow reading of existing data from a transactional table, you will need to set theproperty data.tx.read.pre.existing as true on the table's table descriptor.

Note that even without the property data.tx.read.pre.existing being set to true,any existing data will not be removed during compactions. Existing data simply won't bevisible during reads.

Metrics Reporting

Tephra ships with built-in support for reporting metrics via JMX and a log file, using theDropwizard Metrics library.

To enable JMX reporting for metrics, you will need to enable JMX in the Java runtimearguments. Edit the bin/tephra-env.sh script and uncomment the following lines, making anydesired changes to configuration for port used, SSL, and JMX authentication:

# export JMX_OPTS="-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=13001"# export OPTS="$OPTS $JMX_OPTS"

To enable file-based reporting for metrics, edit the conf/logback.xml file and uncomment thefollowing section, replacing the FILE-PATH placeholder with a valid directory on the localfilesystem:

<appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">  <file>/FILE-PATH/metrics.log</file>  <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">    <fileNamePattern>metrics.log.%d{yyyy-MM-dd}</fileNamePattern>    <maxHistory>30</maxHistory>  </rollingPolicy>  <encoder>    <pattern>%d{ISO8601} %msg%n</pattern>  </encoder></appender><logger name="tephra-metrics" level="TRACE" additivity="false">  <appender-ref ref="METRICS" /></logger>

The frequency of metrics reporting may be configured by setting the data.tx.metrics.periodconfiguration property to the report frequency in seconds.

Client APIs

The TransactionAwareHTable class implements HBase's HTableInterface, thus providing the same APIsthat a standard HBase HTable instance provides. Only certain operations are supportedtransactionally. These are:

Methods Supported In Transactions
exists(Get get)
exists(List<Get> gets)
get(Get get)
get(List<Get> gets)
batch(List<? extends Row> actions, Object[] results)
batch(List<? extends Row> actions)
batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) [0.96]
batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) [0.96]
getScanner(byte[] family)
getScanner(byte[] family, byte[] qualifier)
put(Put put)
put(List<Put> puts)
delete(Delete delete)
delete(List<Delete> deletes)

Other operations are not supported transactionally and will throw an UnsupportedOperationException if invoked.To allow use of these non-transactional operations, call setAllowNonTransactional(true). Thisallows you to call the following methods non-transactionally:

Methods Supported Outside of Transactions
getRowOrBefore(byte[] row, byte[], family)
checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
mutateRow(RowMutations rm)
append(Append append)
increment(Increment increment)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)

Note that for batch operations, only certain supported operations (get, put, and delete)are applied transactionally.

Usage

To use a TransactionalAwareHTable, you need an instance of TransactionContext.TransactionContext provides the basic contract for client use of transactions. At each pointin the transaction lifecycle, it provides the necessary interactions with the Tephra TransactionServer in order to start, commit, and rollback transactions. Basic usage ofTransactionContext is handled using the following pattern:

TransactionContext context = new TransactionContext(client, transactionAwareHTable);try {  context.start();  transactionAwareHTable.put(new Put(Bytes.toBytes("row"));  // ...  context.finish();} catch (TransactionFailureException e) {  context.abort();}
  1. First, a new transaction is started using TransactionContext.start().
  2. Next, any data operations are performed within the context of the transaction.
  3. After data operations are complete, TransactionContext.finish() is called to commit thetransaction.
  4. If an exception occurs, TransactionContext.abort() can be called to rollback thetransaction.

TransactionAwareHTable handles the details of performing data operations transactionally, andimplements the necessary hooks in order to commit and rollback the data changes (seeTransactionAware).

Example

To demonstrate how you might use TransactionAwareHTables, below is a basic implementation of aSecondaryIndexTable. This class encapsulates the usage of a TransactionContext and provides a simple interfaceto a user:

/** * A Transactional SecondaryIndexTable. */public class SecondaryIndexTable {  private byte[] secondaryIndex;  private TransactionAwareHTable transactionAwareHTable;  private TransactionAwareHTable secondaryIndexTable;  private TransactionContext transactionContext;  private final TableName secondaryIndexTableName;  private static final byte[] secondaryIndexFamily =    Bytes.toBytes("secondaryIndexFamily");  private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');  private static final byte[] DELIMITER  = new byte[] {0};  public SecondaryIndexTable(TransactionServiceClient transactionServiceClient,                             HTable hTable, byte[] secondaryIndex) {    secondaryIndexTableName =          TableName.valueOf(hTable.getName().getNameAsString() + ".idx");    HTable secondaryIndexHTable = null;    HBaseAdmin hBaseAdmin = null;    try {      hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());      if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {        hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));      }      secondaryIndexHTable = new HTable(hTable.getConfiguration(),                                        secondaryIndexTableName);    } catch (Exception e) {      Throwables.propagate(e);    } finally {      try {        hBaseAdmin.close();      } catch (Exception e) {        Throwables.propagate(e);      }    }    this.secondaryIndex = secondaryIndex;    this.transactionAwareHTable = new TransactionAwareHTable(hTable);    this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);    this.transactionContext = new TransactionContext(transactionServiceClient,                                                     transactionAwareHTable,                                                     secondaryIndexTable);  }  public Result get(Get get) throws IOException {    return get(Collections.singletonList(get))[0];  }  public Result[] get(List<Get> gets) throws IOException {    try {      transactionContext.start();      Result[] result = transactionAwareHTable.get(gets);      transactionContext.finish();      return result;    } catch (Exception e) {      try {        transactionContext.abort();      } catch (TransactionFailureException e1) {        throw new IOException("Could not rollback transaction", e1);      }    }    return null;  }  public Result[] getByIndex(byte[] value) throws IOException {    try {      transactionContext.start();      Scan scan = new Scan(value, Bytes.add(value, new byte[0]));      scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);      ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);      ArrayList<Get> gets = new ArrayList<Get>();      for (Result result : indexScanner) {        for (Cell cell : result.listCells()) {          gets.add(new Get(cell.getValue()));        }      }      Result[] results = transactionAwareHTable.get(gets);      transactionContext.finish();      return results;    } catch (Exception e) {      try {        transactionContext.abort();      } catch (TransactionFailureException e1) {        throw new IOException("Could not rollback transaction", e1);      }    }    return null;  }  public void put(Put put) throws IOException {    put(Collections.singletonList(put));  }  public void put(List<Put> puts) throws IOException {    try {      transactionContext.start();      ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();      for (Put put : puts) {        List<Put> indexPuts = new ArrayList<Put>();        Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();        for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {          for (KeyValue value : family.getValue()) {            if (value.getQualifier().equals(secondaryIndex)) {              byte[] secondaryRow = Bytes.add(value.getQualifier(),                                              DELIMITER,                                              Bytes.add(value.getValue(),                                              DELIMITER,                                              value.getRow()));              Put indexPut = new Put(secondaryRow);              indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());              indexPuts.add(indexPut);            }          }        }        secondaryIndexPuts.addAll(indexPuts);      }      transactionAwareHTable.put(puts);      secondaryIndexTable.put(secondaryIndexPuts);      transactionContext.finish();    } catch (Exception e) {      try {        transactionContext.abort();      } catch (TransactionFailureException e1) {        throw new IOException("Could not rollback transaction", e1);      }    }  }}

Known Issues and Limitations

  • Currently, column family Delete operations are implemented by writing a cell with an emptyqualifier (empty byte[]) and empty value (empty byte[]). This is done in place ofnative HBase Delete operations so the delete marker can be rolled back in the event ofa transaction failure -- normal HBase Delete operations cannot be undone. However, thismeans that applications that store data in a column with an empty qualifier will not be able tostore empty values, and will not be able to transactionally delete that column.
  • Column Delete operations are implemented by writing a empty value (empty byte[]) to thecolumn. This means that applications will not be able to store empty values to columns.
  • Invalid transactions are not automatically cleared from the exclusion list. When a transaction isinvalidated, either from timing out or being invalidated by the client due to a failure to rollbackchanges, its transaction ID is added to a list of excluded transactions. Data from invalidatedtransactions will be dropped by the TransactionProcessor coprocessor on HBase region flushand compaction operations. Currently, however, transaction IDs can only be manually removedfrom the list of excluded transaction IDs, using the co.cask.tephra.TransactionAdmin tool.

How to Contribute

Interested in helping to improve Tephra? We welcome all contributions, whether in filing detailedbug reports, submitting pull requests for code changes and improvements, or by asking questions andassisting others on the mailing list.

Bug Reports & Feature Requests

Bugs and tasks are tracked in a public JIRA issue tracker.

Tephra User Groups and Mailing Lists

  • Tephra User Group: [email protected]

    The tephra-user mailing list is primarily for users using the product to developapplications. You can expect questions from users, release announcements, and any otherdiscussions that we think will be helpful to the users.

  • Tephra Developer Group and Development Discussions: [email protected]

    The tephra-dev mailing list is essentially for developers actively working on theproduct, and should be used for all our design, architecture and technical discussionsmoving forward. This mailing list will also receive all JIRA and GitHub notifications.

IRC

Have questions about how Tephra works, or need help using it? Drop by the #tephrachat room on irc.freenode.net.

Pull Requests

We have a simple pull-based development model with a consensus-building phase, similar to Apache'svoting process. If you’d like to help make Tephra better by adding new features, enhancing existingfeatures, or fixing bugs, here's how to do it:

  1. If you are planning a large change or contribution, discuss your plans on the tephra-devmailing list first. This will help us understand your needs and best guide your solution in away that fits the project.
  2. Fork Tephra into your own GitHub repository.
  3. Create a topic branch with an appropriate name.
  4. Work on the code to your heart's content.
  5. Once you’re satisfied, create a pull request from your GitHub repo (it’s helpful if you fill inall of the description fields).
  6. After we review and accept your request, we’ll commit your code to the caskdata/tephrarepository.

Thanks for helping to improve Tephra!

License and Trademarks

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this product exceptin compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the Licenseis distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expressor implied. See the License for the specific language governing permissions and limitations underthe License.

Cask, Cask Tephra and Tephra are trademarks of Cask Data, Inc. All rights reserved.

Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with permission.No endorsement by The Apache Software Foundation is implied by the use of these marks.


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
os_5: 课题5发布时间:2022-03-25
下一篇:
vulhub-compose: vulhub-compose是一款屏蔽docker-compose的命令行工具,目的是降低火 ...发布时间:2022-03-25
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap