forked from forgejo/forgejo
Integrate public as bindata optionally (#293)
* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
This commit is contained in:
parent
4680c349dd
commit
b6a95a8cb3
691 changed files with 305318 additions and 1272 deletions
796
vendor/github.com/pingcap/go-themis/themis_txn.go
generated
vendored
Normal file
796
vendor/github.com/pingcap/go-themis/themis_txn.go
generated
vendored
Normal file
|
@ -0,0 +1,796 @@
|
|||
package themis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/go-hbase"
|
||||
"github.com/pingcap/go-themis/oracle"
|
||||
)
|
||||
|
||||
type TxnConfig struct {
|
||||
ConcurrentPrewriteAndCommit bool
|
||||
WaitSecondaryCommit bool
|
||||
TTLInMs uint64
|
||||
MaxRowsInOneTxn int
|
||||
// options below is for debugging and testing
|
||||
brokenPrewriteSecondaryTest bool
|
||||
brokenPrewriteSecondaryAndRollbackTest bool
|
||||
brokenCommitPrimaryTest bool
|
||||
brokenCommitSecondaryTest bool
|
||||
}
|
||||
|
||||
var defaultTxnConf = TxnConfig{
|
||||
ConcurrentPrewriteAndCommit: true,
|
||||
WaitSecondaryCommit: false,
|
||||
MaxRowsInOneTxn: 50000,
|
||||
TTLInMs: 5 * 1000, // default txn TTL: 5s
|
||||
brokenPrewriteSecondaryTest: false,
|
||||
brokenPrewriteSecondaryAndRollbackTest: false,
|
||||
brokenCommitPrimaryTest: false,
|
||||
brokenCommitSecondaryTest: false,
|
||||
}
|
||||
|
||||
type themisTxn struct {
|
||||
client hbase.HBaseClient
|
||||
rpc *themisRPC
|
||||
lockCleaner LockManager
|
||||
oracle oracle.Oracle
|
||||
mutationCache *columnMutationCache
|
||||
startTs uint64
|
||||
commitTs uint64
|
||||
primaryRow *rowMutation
|
||||
primary *hbase.ColumnCoordinate
|
||||
secondaryRows []*rowMutation
|
||||
secondary []*hbase.ColumnCoordinate
|
||||
primaryRowOffset int
|
||||
singleRowTxn bool
|
||||
secondaryLockBytes []byte
|
||||
conf TxnConfig
|
||||
hooks *txnHook
|
||||
}
|
||||
|
||||
var _ Txn = (*themisTxn)(nil)
|
||||
|
||||
var (
|
||||
// ErrSimulated is used when maybe rollback occurs error too.
|
||||
ErrSimulated = errors.New("simulated error")
|
||||
maxCleanLockRetryCount = 30
|
||||
pauseTime = 300 * time.Millisecond
|
||||
)
|
||||
|
||||
func NewTxn(c hbase.HBaseClient, oracle oracle.Oracle) (Txn, error) {
|
||||
return NewTxnWithConf(c, defaultTxnConf, oracle)
|
||||
}
|
||||
|
||||
func NewTxnWithConf(c hbase.HBaseClient, conf TxnConfig, oracle oracle.Oracle) (Txn, error) {
|
||||
var err error
|
||||
txn := &themisTxn{
|
||||
client: c,
|
||||
mutationCache: newColumnMutationCache(),
|
||||
oracle: oracle,
|
||||
primaryRowOffset: -1,
|
||||
conf: conf,
|
||||
rpc: newThemisRPC(c, oracle, conf),
|
||||
hooks: newHook(),
|
||||
}
|
||||
txn.startTs, err = txn.oracle.GetTimestamp()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
txn.lockCleaner = newThemisLockManager(txn.rpc, c)
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) setHook(hooks *txnHook) {
|
||||
txn.hooks = hooks
|
||||
}
|
||||
|
||||
func (txn *themisTxn) Gets(tbl string, gets []*hbase.Get) ([]*hbase.ResultRow, error) {
|
||||
results, err := txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
var ret []*hbase.ResultRow
|
||||
hasLock := false
|
||||
for _, r := range results {
|
||||
// if this row is locked, try clean lock and get again
|
||||
if isLockResult(r) {
|
||||
hasLock = true
|
||||
err = txn.constructLockAndClean([]byte(tbl), r.SortedColumns)
|
||||
if err != nil {
|
||||
// TODO if it's a conflict error, it means this lock
|
||||
// isn't expired, maybe we can retry or return partial results.
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
// it's OK, because themisBatchGet doesn't return nil value.
|
||||
ret = append(ret, r)
|
||||
}
|
||||
if hasLock {
|
||||
// after we cleaned locks, try to get again.
|
||||
ret, err = txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, true)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) Get(tbl string, g *hbase.Get) (*hbase.ResultRow, error) {
|
||||
r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
// contain locks, try to clean and get again
|
||||
if r != nil && isLockResult(r) {
|
||||
r, err = txn.tryToCleanLockAndGetAgain([]byte(tbl), g, r.SortedColumns)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) Put(tbl string, p *hbase.Put) {
|
||||
// add mutation to buffer
|
||||
for _, e := range getEntriesFromPut(p) {
|
||||
txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (txn *themisTxn) Delete(tbl string, p *hbase.Delete) error {
|
||||
entries, err := getEntriesFromDel(p)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
for _, e := range entries {
|
||||
txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) Commit() error {
|
||||
if txn.mutationCache.getMutationCount() == 0 {
|
||||
return nil
|
||||
}
|
||||
if txn.mutationCache.getRowCount() > txn.conf.MaxRowsInOneTxn {
|
||||
return ErrTooManyRows
|
||||
}
|
||||
|
||||
txn.selectPrimaryAndSecondaries()
|
||||
err := txn.prewritePrimary()
|
||||
if err != nil {
|
||||
// no need to check wrong region here, hbase client will retry when
|
||||
// occurs single row NotInRegion error.
|
||||
log.Error(errors.ErrorStack(err))
|
||||
// it's safe to retry, because this transaction is not committed.
|
||||
return ErrRetryable
|
||||
}
|
||||
|
||||
err = txn.prewriteSecondary()
|
||||
if err != nil {
|
||||
if isWrongRegionErr(err) {
|
||||
log.Warn("region info outdated")
|
||||
// reset hbase client buffered region info
|
||||
txn.client.CleanAllRegionCache()
|
||||
}
|
||||
return ErrRetryable
|
||||
}
|
||||
|
||||
txn.commitTs, err = txn.oracle.GetTimestamp()
|
||||
if err != nil {
|
||||
log.Error(errors.ErrorStack(err))
|
||||
return ErrRetryable
|
||||
}
|
||||
err = txn.commitPrimary()
|
||||
if err != nil {
|
||||
// commit primary error, rollback
|
||||
log.Error("commit primary row failed", txn.startTs, err)
|
||||
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
|
||||
txn.rollbackSecondaryRow(len(txn.secondaryRows) - 1)
|
||||
return ErrRetryable
|
||||
}
|
||||
txn.commitSecondary()
|
||||
log.Debug("themis txn commit successfully", txn.startTs, txn.commitTs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) commitSecondary() {
|
||||
if bypass, _, _ := txn.hooks.beforeCommitSecondary(txn, nil); !bypass {
|
||||
return
|
||||
}
|
||||
if txn.conf.brokenCommitSecondaryTest {
|
||||
txn.brokenCommitSecondary()
|
||||
return
|
||||
}
|
||||
if txn.conf.ConcurrentPrewriteAndCommit {
|
||||
txn.batchCommitSecondary(txn.conf.WaitSecondaryCommit)
|
||||
} else {
|
||||
txn.commitSecondarySync()
|
||||
}
|
||||
}
|
||||
|
||||
func (txn *themisTxn) commitSecondarySync() {
|
||||
for _, r := range txn.secondaryRows {
|
||||
err := txn.rpc.commitSecondaryRow(r.tbl, r.row, r.mutationList(false), txn.startTs, txn.commitTs)
|
||||
if err != nil {
|
||||
// fail of secondary commit will not stop the commits of next
|
||||
// secondaries
|
||||
log.Warning(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (txn *themisTxn) batchCommitSecondary(wait bool) error {
|
||||
//will batch commit all rows in a region
|
||||
rsRowMap, err := txn.groupByRegion()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, regionRowMap := range rsRowMap {
|
||||
wg.Add(1)
|
||||
_, firstRowM := getFirstEntity(regionRowMap)
|
||||
go func(cli *themisRPC, tbl string, rMap map[string]*rowMutation, startTs, commitTs uint64) {
|
||||
defer wg.Done()
|
||||
err := cli.batchCommitSecondaryRows([]byte(tbl), rMap, startTs, commitTs)
|
||||
if err != nil {
|
||||
// fail of secondary commit will not stop the commits of next
|
||||
// secondaries
|
||||
if isWrongRegionErr(err) {
|
||||
txn.client.CleanAllRegionCache()
|
||||
log.Warn("region info outdated when committing secondary rows, don't panic")
|
||||
}
|
||||
}
|
||||
}(txn.rpc, string(firstRowM.tbl), regionRowMap, txn.startTs, txn.commitTs)
|
||||
}
|
||||
if wait {
|
||||
wg.Wait()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) groupByRegion() (map[string]map[string]*rowMutation, error) {
|
||||
rsRowMap := make(map[string]map[string]*rowMutation)
|
||||
for _, rm := range txn.secondaryRows {
|
||||
region, err := txn.client.LocateRegion(rm.tbl, rm.row, true)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
key := getBatchGroupKey(region, string(rm.tbl))
|
||||
if _, exists := rsRowMap[key]; !exists {
|
||||
rsRowMap[key] = map[string]*rowMutation{}
|
||||
}
|
||||
rsRowMap[key][string(rm.row)] = rm
|
||||
}
|
||||
return rsRowMap, nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) commitPrimary() error {
|
||||
if txn.conf.brokenCommitPrimaryTest {
|
||||
return txn.brokenCommitPrimary()
|
||||
}
|
||||
return txn.rpc.commitRow(txn.primary.Table, txn.primary.Row,
|
||||
txn.primaryRow.mutationList(false),
|
||||
txn.startTs, txn.commitTs, txn.primaryRowOffset)
|
||||
}
|
||||
|
||||
func (txn *themisTxn) selectPrimaryAndSecondaries() {
|
||||
txn.secondary = nil
|
||||
for tblName, rowMutations := range txn.mutationCache.mutations {
|
||||
for _, rowMutation := range rowMutations {
|
||||
row := rowMutation.row
|
||||
findPrimaryInRow := false
|
||||
for i, mutation := range rowMutation.mutationList(true) {
|
||||
colcord := hbase.NewColumnCoordinate([]byte(tblName), row, mutation.Family, mutation.Qual)
|
||||
// set the first column as primary if primary is not set by user
|
||||
if txn.primaryRowOffset == -1 &&
|
||||
(txn.primary == nil || txn.primary.Equal(colcord)) {
|
||||
txn.primary = colcord
|
||||
txn.primaryRowOffset = i
|
||||
txn.primaryRow = rowMutation
|
||||
findPrimaryInRow = true
|
||||
} else {
|
||||
txn.secondary = append(txn.secondary, colcord)
|
||||
}
|
||||
}
|
||||
if !findPrimaryInRow {
|
||||
txn.secondaryRows = append(txn.secondaryRows, rowMutation)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// hook for test
|
||||
if bypass, _, _ := txn.hooks.afterChoosePrimaryAndSecondary(txn, nil); !bypass {
|
||||
return
|
||||
}
|
||||
|
||||
if len(txn.secondaryRows) == 0 {
|
||||
txn.singleRowTxn = true
|
||||
}
|
||||
// construct secondary lock
|
||||
secondaryLock := txn.constructSecondaryLock(hbase.TypePut)
|
||||
if secondaryLock != nil {
|
||||
txn.secondaryLockBytes = secondaryLock.Encode()
|
||||
} else {
|
||||
txn.secondaryLockBytes = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (txn *themisTxn) constructSecondaryLock(typ hbase.Type) *themisSecondaryLock {
|
||||
if txn.primaryRow.getSize() <= 1 && len(txn.secondaryRows) == 0 {
|
||||
return nil
|
||||
}
|
||||
l := newThemisSecondaryLock()
|
||||
l.primaryCoordinate = txn.primary
|
||||
l.ts = txn.startTs
|
||||
// TODO set client addr
|
||||
return l
|
||||
}
|
||||
|
||||
func (txn *themisTxn) constructPrimaryLock() *themisPrimaryLock {
|
||||
l := newThemisPrimaryLock()
|
||||
l.typ = txn.primaryRow.getType(txn.primary.Column)
|
||||
l.ts = txn.startTs
|
||||
for _, c := range txn.secondary {
|
||||
l.addSecondary(c, txn.mutationCache.getMutation(c).typ)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func (txn *themisTxn) constructLockAndClean(tbl []byte, lockKvs []*hbase.Kv) error {
|
||||
locks, err := getLocksFromResults([]byte(tbl), lockKvs, txn.rpc)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
for _, lock := range locks {
|
||||
err := txn.cleanLockWithRetry(lock)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) tryToCleanLockAndGetAgain(tbl []byte, g *hbase.Get, lockKvs []*hbase.Kv) (*hbase.ResultRow, error) {
|
||||
// try to clean locks
|
||||
err := txn.constructLockAndClean(tbl, lockKvs)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
// get again, ignore lock
|
||||
r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, true)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) commitSecondaryAndCleanLock(lock *themisSecondaryLock, commitTs uint64) error {
|
||||
cc := lock.Coordinate()
|
||||
mutation := &columnMutation{
|
||||
Column: &cc.Column,
|
||||
mutationValuePair: &mutationValuePair{
|
||||
typ: lock.typ,
|
||||
},
|
||||
}
|
||||
err := txn.rpc.commitSecondaryRow(cc.Table, cc.Row,
|
||||
[]*columnMutation{mutation}, lock.Timestamp(), commitTs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) cleanLockWithRetry(lock Lock) error {
|
||||
for i := 0; i < maxCleanLockRetryCount; i++ {
|
||||
if exists, err := txn.lockCleaner.IsLockExists(lock.Coordinate(), 0, lock.Timestamp()); err != nil || !exists {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
log.Warnf("lock exists txn: %v lock-txn: %v row: %q", txn.startTs, lock.Timestamp(), lock.Coordinate().Row)
|
||||
// try clean lock
|
||||
err := txn.tryToCleanLock(lock)
|
||||
if errorEqual(err, ErrLockNotExpired) {
|
||||
log.Warn("sleep a while, and retry clean lock", txn.startTs)
|
||||
// TODO(dongxu) use cleverer retry sleep time interval
|
||||
time.Sleep(pauseTime)
|
||||
continue
|
||||
} else if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// lock cleaned successfully
|
||||
return nil
|
||||
}
|
||||
return ErrCleanLockFailed
|
||||
}
|
||||
|
||||
func (txn *themisTxn) tryToCleanLock(lock Lock) error {
|
||||
// if it's secondary lock, first we'll check if its primary lock has been released.
|
||||
if lock.Role() == RoleSecondary {
|
||||
// get primary lock
|
||||
pl := lock.Primary()
|
||||
// check primary lock is exists
|
||||
exists, err := txn.lockCleaner.IsLockExists(pl.Coordinate(), 0, pl.Timestamp())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if !exists {
|
||||
// primary row is committed, commit this row
|
||||
cc := pl.Coordinate()
|
||||
commitTs, err := txn.lockCleaner.GetCommitTimestamp(cc, pl.Timestamp())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if commitTs > 0 {
|
||||
// if this transction has been committed
|
||||
log.Info("txn has been committed, ts:", commitTs, "prewriteTs:", pl.Timestamp())
|
||||
// commit secondary rows
|
||||
err := txn.commitSecondaryAndCleanLock(lock.(*themisSecondaryLock), commitTs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
expired, err := txn.rpc.checkAndSetLockIsExpired(lock)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// only clean expired lock
|
||||
if expired {
|
||||
// try to clean primary lock
|
||||
pl := lock.Primary()
|
||||
commitTs, cleanedLock, err := txn.lockCleaner.CleanLock(pl.Coordinate(), pl.Timestamp())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if cleanedLock != nil {
|
||||
pl = cleanedLock
|
||||
}
|
||||
log.Info("try clean secondary locks", pl.Timestamp())
|
||||
// clean secondary locks
|
||||
// erase lock and data if commitTs is 0; otherwise, commit it.
|
||||
for k, v := range pl.(*themisPrimaryLock).secondaries {
|
||||
cc := &hbase.ColumnCoordinate{}
|
||||
if err = cc.ParseFromString(k); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if commitTs == 0 {
|
||||
// commitTs == 0, means clean primary lock successfully
|
||||
// expire trx havn't committed yet, we must delete lock and
|
||||
// dirty data
|
||||
err = txn.lockCleaner.EraseLockAndData(cc, pl.Timestamp())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
} else {
|
||||
// primary row is committed, so we must commit other
|
||||
// secondary rows
|
||||
mutation := &columnMutation{
|
||||
Column: &cc.Column,
|
||||
mutationValuePair: &mutationValuePair{
|
||||
typ: v,
|
||||
},
|
||||
}
|
||||
err = txn.rpc.commitSecondaryRow(cc.Table, cc.Row,
|
||||
[]*columnMutation{mutation}, pl.Timestamp(), commitTs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return ErrLockNotExpired
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) batchPrewriteSecondaryRowsWithLockClean(tbl []byte, rowMs map[string]*rowMutation) error {
|
||||
locks, err := txn.batchPrewriteSecondaryRows(tbl, rowMs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// lock clean
|
||||
if locks != nil && len(locks) > 0 {
|
||||
// hook for test
|
||||
if bypass, _, err := txn.hooks.onSecondaryOccursLock(txn, locks); !bypass {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// try one more time after clean lock successfully
|
||||
for _, lock := range locks {
|
||||
err = txn.cleanLockWithRetry(lock)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// prewrite all secondary rows
|
||||
locks, err = txn.batchPrewriteSecondaryRows(tbl, rowMs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if len(locks) > 0 {
|
||||
for _, l := range locks {
|
||||
log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", l.Coordinate(), l, l.Timestamp())
|
||||
}
|
||||
return ErrRetryable
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) prewriteRowWithLockClean(tbl []byte, mutation *rowMutation, containPrimary bool) error {
|
||||
lock, err := txn.prewriteRow(tbl, mutation, containPrimary)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// lock clean
|
||||
if lock != nil {
|
||||
// hook for test
|
||||
if bypass, _, err := txn.hooks.beforePrewriteLockClean(txn, lock); !bypass {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = txn.cleanLockWithRetry(lock)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// try one more time after clean lock successfully
|
||||
lock, err = txn.prewriteRow(tbl, mutation, containPrimary)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if lock != nil {
|
||||
log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", lock.Coordinate(), lock, lock.Timestamp())
|
||||
return ErrRetryable
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) batchPrewriteSecondaryRows(tbl []byte, rowMs map[string]*rowMutation) (map[string]Lock, error) {
|
||||
return txn.rpc.batchPrewriteSecondaryRows(tbl, rowMs, txn.startTs, txn.secondaryLockBytes)
|
||||
}
|
||||
|
||||
func (txn *themisTxn) prewriteRow(tbl []byte, mutation *rowMutation, containPrimary bool) (Lock, error) {
|
||||
// hook for test
|
||||
if bypass, ret, err := txn.hooks.onPrewriteRow(txn, []interface{}{mutation, containPrimary}); !bypass {
|
||||
return ret.(Lock), errors.Trace(err)
|
||||
}
|
||||
if containPrimary {
|
||||
// try to get lock
|
||||
return txn.rpc.prewriteRow(tbl, mutation.row,
|
||||
mutation.mutationList(true),
|
||||
txn.startTs,
|
||||
txn.constructPrimaryLock().Encode(),
|
||||
txn.secondaryLockBytes, txn.primaryRowOffset)
|
||||
}
|
||||
return txn.rpc.prewriteSecondaryRow(tbl, mutation.row,
|
||||
mutation.mutationList(true),
|
||||
txn.startTs,
|
||||
txn.secondaryLockBytes)
|
||||
}
|
||||
|
||||
func (txn *themisTxn) prewritePrimary() error {
|
||||
// hook for test
|
||||
if bypass, _, err := txn.hooks.beforePrewritePrimary(txn, nil); !bypass {
|
||||
return err
|
||||
}
|
||||
err := txn.prewriteRowWithLockClean(txn.primary.Table, txn.primaryRow, true)
|
||||
if err != nil {
|
||||
log.Debugf("prewrite primary %v %q failed: %v", txn.startTs, txn.primaryRow.row, err.Error())
|
||||
return errors.Trace(err)
|
||||
}
|
||||
log.Debugf("prewrite primary %v %q successfully", txn.startTs, txn.primaryRow.row)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) prewriteSecondary() error {
|
||||
// hook for test
|
||||
if bypass, _, err := txn.hooks.beforePrewriteSecondary(txn, nil); !bypass {
|
||||
return err
|
||||
}
|
||||
if txn.conf.brokenPrewriteSecondaryTest {
|
||||
return txn.brokenPrewriteSecondary()
|
||||
}
|
||||
if txn.conf.ConcurrentPrewriteAndCommit {
|
||||
return txn.batchPrewriteSecondaries()
|
||||
}
|
||||
return txn.prewriteSecondarySync()
|
||||
}
|
||||
|
||||
func (txn *themisTxn) prewriteSecondarySync() error {
|
||||
for i, mu := range txn.secondaryRows {
|
||||
err := txn.prewriteRowWithLockClean(mu.tbl, mu, false)
|
||||
if err != nil {
|
||||
// rollback
|
||||
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
|
||||
txn.rollbackSecondaryRow(i)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// just for test
|
||||
func (txn *themisTxn) brokenCommitPrimary() error {
|
||||
// do nothing
|
||||
log.Warn("Simulating primary commit failed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// just for test
|
||||
func (txn *themisTxn) brokenCommitSecondary() {
|
||||
// do nothing
|
||||
log.Warn("Simulating secondary commit failed")
|
||||
}
|
||||
|
||||
func (txn *themisTxn) brokenPrewriteSecondary() error {
|
||||
log.Warn("Simulating prewrite secondary failed")
|
||||
for i, rm := range txn.secondaryRows {
|
||||
if i == len(txn.secondary)-1 {
|
||||
if !txn.conf.brokenPrewriteSecondaryAndRollbackTest {
|
||||
// simulating prewrite failed, need rollback
|
||||
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
|
||||
txn.rollbackSecondaryRow(i)
|
||||
}
|
||||
// maybe rollback occurs error too
|
||||
return ErrSimulated
|
||||
}
|
||||
txn.prewriteRowWithLockClean(rm.tbl, rm, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) batchPrewriteSecondaries() error {
|
||||
wg := sync.WaitGroup{}
|
||||
//will batch prewrite all rows in a region
|
||||
rsRowMap, err := txn.groupByRegion()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
errChan := make(chan error, len(rsRowMap))
|
||||
defer close(errChan)
|
||||
successChan := make(chan map[string]*rowMutation, len(rsRowMap))
|
||||
defer close(successChan)
|
||||
|
||||
for _, regionRowMap := range rsRowMap {
|
||||
wg.Add(1)
|
||||
_, firstRowM := getFirstEntity(regionRowMap)
|
||||
go func(tbl []byte, rMap map[string]*rowMutation) {
|
||||
defer wg.Done()
|
||||
err := txn.batchPrewriteSecondaryRowsWithLockClean(tbl, rMap)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
} else {
|
||||
successChan <- rMap
|
||||
}
|
||||
}(firstRowM.tbl, regionRowMap)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if len(errChan) != 0 {
|
||||
// occur error, clean success prewrite mutations
|
||||
log.Warnf("batch prewrite secondary rows error, rolling back %d %d", len(successChan), txn.startTs)
|
||||
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case succMutMap := <-successChan:
|
||||
{
|
||||
for _, rowMut := range succMutMap {
|
||||
txn.rollbackRow(rowMut.tbl, rowMut)
|
||||
}
|
||||
}
|
||||
default:
|
||||
break L
|
||||
}
|
||||
}
|
||||
|
||||
err := <-errChan
|
||||
if err != nil {
|
||||
log.Error("batch prewrite secondary rows error, txn:", txn.startTs, err)
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getFirstEntity(rowMap map[string]*rowMutation) (string, *rowMutation) {
|
||||
for row, rowM := range rowMap {
|
||||
return row, rowM
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func getBatchGroupKey(rInfo *hbase.RegionInfo, tblName string) string {
|
||||
return rInfo.Server + "_" + rInfo.Name
|
||||
}
|
||||
|
||||
func (txn *themisTxn) rollbackRow(tbl []byte, mutation *rowMutation) error {
|
||||
l := fmt.Sprintf("\nrolling back %q %d {\n", mutation.row, txn.startTs)
|
||||
for _, v := range mutation.getColumns() {
|
||||
l += fmt.Sprintf("\t%s:%s\n", string(v.Family), string(v.Qual))
|
||||
}
|
||||
l += "}\n"
|
||||
log.Warn(l)
|
||||
for _, col := range mutation.getColumns() {
|
||||
cc := &hbase.ColumnCoordinate{
|
||||
Table: tbl,
|
||||
Row: mutation.row,
|
||||
Column: col,
|
||||
}
|
||||
err := txn.lockCleaner.EraseLockAndData(cc, txn.startTs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) rollbackSecondaryRow(successIndex int) error {
|
||||
for i := successIndex; i >= 0; i-- {
|
||||
r := txn.secondaryRows[i]
|
||||
err := txn.rollbackRow(r.tbl, r)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *themisTxn) GetScanner(tbl []byte, startKey, endKey []byte, batchSize int) *ThemisScanner {
|
||||
scanner := newThemisScanner(tbl, txn, batchSize, txn.client)
|
||||
if startKey != nil {
|
||||
scanner.setStartRow(startKey)
|
||||
}
|
||||
if endKey != nil {
|
||||
scanner.setStopRow(endKey)
|
||||
}
|
||||
return scanner
|
||||
}
|
||||
|
||||
func (txn *themisTxn) Release() {
|
||||
txn.primary = nil
|
||||
txn.primaryRow = nil
|
||||
txn.secondary = nil
|
||||
txn.secondaryRows = nil
|
||||
txn.startTs = 0
|
||||
txn.commitTs = 0
|
||||
}
|
||||
|
||||
func (txn *themisTxn) String() string {
|
||||
return fmt.Sprintf("%d", txn.startTs)
|
||||
}
|
||||
|
||||
func (txn *themisTxn) GetCommitTS() uint64 {
|
||||
return txn.commitTs
|
||||
}
|
||||
|
||||
func (txn *themisTxn) GetStartTS() uint64 {
|
||||
return txn.startTs
|
||||
}
|
||||
|
||||
func (txn *themisTxn) LockRow(tbl string, rowkey []byte) error {
|
||||
g := hbase.NewGet(rowkey)
|
||||
r, err := txn.Get(tbl, g)
|
||||
if err != nil {
|
||||
log.Warnf("get row error, table:%s, row:%q, error:%v", tbl, rowkey, err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if r == nil {
|
||||
log.Warnf("has not data to lock, table:%s, row:%q", tbl, rowkey)
|
||||
return nil
|
||||
}
|
||||
for _, v := range r.Columns {
|
||||
txn.mutationCache.addMutation([]byte(tbl), rowkey, &v.Column, hbase.TypeMinimum, nil, true)
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue