Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 48 additions & 46 deletions sqlchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ var (
metaBlockIndex = [4]byte{'B', 'L', 'C', 'K'}
metaResponseIndex = [4]byte{'R', 'E', 'S', 'P'}
metaAckIndex = [4]byte{'Q', 'A', 'C', 'K'}
leveldbConf = opt.Options{

leveldbConf = opt.Options{
Compression: opt.SnappyCompression,
}
leveldbInit sync.Once
blkDB *leveldb.DB
txDB *leveldb.DB

// Atomic counters for stats
cachedBlockCount int32
Expand Down Expand Up @@ -94,15 +98,11 @@ func keyWithSymbolToHeight(k []byte) int32 {

// Chain represents a sql-chain.
type Chain struct {
// bdb stores state, profile and block
bdb *leveldb.DB
// tdb stores ack/request/response
tdb *leveldb.DB
bi *blockIndex
ai *ackIndex
st *x.State
cl *rpc.Caller
rt *runtime
bi *blockIndex
ai *ackIndex
st *x.State
cl *rpc.Caller
rt *runtime

blocks chan *types.Block
heights chan int32
Expand All @@ -121,6 +121,10 @@ type Chain struct {
pk *asymmetric.PrivateKey
// addr is the AccountAddress generate from public key.
addr *proto.AccountAddress
// key prefixes
metaBlockIndex []byte
metaResponseIndex []byte
metaAckIndex []byte
}

// NewChain creates a new sql-chain struct.
Expand All @@ -131,23 +135,29 @@ func NewChain(c *Config) (chain *Chain, err error) {
// NewChainWithContext creates a new sql-chain struct with context.
func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err error) {
le := log.WithField("db", c.DatabaseID)
// Open LevelDB for block and state
bdbFile := c.ChainFilePrefix + "-block-state.ldb"
bdb, err := leveldb.OpenFile(bdbFile, &leveldbConf)
if err != nil {
err = errors.Wrapf(err, "open leveldb %s", bdbFile)
return
}
le.Debugf("opened chain bdb %s", bdbFile)

// Open LevelDB for ack/request/response
tdbFile := c.ChainFilePrefix + "-ack-req-resp.ldb"
tdb, err := leveldb.OpenFile(tdbFile, &leveldbConf)
leveldbInit.Do(func() {
// Open LevelDB for block and state
bdbFile := c.ChainFilePrefix + "-block-state.ldb"
blkDB, err = leveldb.OpenFile(bdbFile, &leveldbConf)
if err != nil {
err = errors.Wrapf(err, "open leveldb %s", bdbFile)
return
}
le.Debugf("opened chain bdb %s", bdbFile)

// Open LevelDB for ack/request/response
tdbFile := c.ChainFilePrefix + "-ack-req-resp.ldb"
txDB, err = leveldb.OpenFile(tdbFile, &leveldbConf)
if err != nil {
err = errors.Wrapf(err, "open leveldb %s", tdbFile)
return
}
le.Debugf("opened chain tdb %s", tdbFile)
})
if err != nil {
err = errors.Wrapf(err, "open leveldb %s", tdbFile)
return
}
le.Debugf("opened chain tdb %s", tdbFile)

// Open storage
var strg xi.Storage
Expand All @@ -173,8 +183,6 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro

// Create chain state
chain = &Chain{
bdb: bdb,
tdb: tdb,
bi: newBlockIndex(),
ai: newAckIndex(),
st: x.NewState(sql.IsolationLevel(c.IsolationLevel), c.Server, strg),
Expand All @@ -189,16 +197,19 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
updatePeriod: c.UpdatePeriod,
databaseID: c.DatabaseID,

pk: pk,
addr: &addr,
pk: pk,
addr: &addr,
metaBlockIndex: utils.ConcatAll([]byte(c.DatabaseID), metaBlockIndex[:]),
metaResponseIndex: utils.ConcatAll([]byte(c.DatabaseID), metaResponseIndex[:]),
metaAckIndex: utils.ConcatAll([]byte(c.DatabaseID), metaAckIndex[:]),
}
le = le.WithField("peer", chain.rt.getPeerInfoString())

// Read blocks and rebuild memory index
var (
id uint64
last, parent *blockNode
blockIter = chain.bdb.NewIterator(util.BytesPrefix(metaBlockIndex[:]), nil)
blockIter = blkDB.NewIterator(util.BytesPrefix(chain.metaBlockIndex), nil)
)
defer blockIter.Release()
for blockIter.Next() {
Expand Down Expand Up @@ -267,7 +278,7 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
chain.pruneBlockCache()

// Read queries and rebuild memory index
respIter := chain.tdb.NewIterator(util.BytesPrefix(metaResponseIndex[:]), nil)
respIter := txDB.NewIterator(util.BytesPrefix(chain.metaResponseIndex), nil)
defer respIter.Release()
for respIter.Next() {
k := respIter.Key()
Expand All @@ -289,7 +300,7 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
return
}

ackIter := chain.tdb.NewIterator(util.BytesPrefix(metaAckIndex[:]), nil)
ackIter := txDB.NewIterator(util.BytesPrefix(chain.metaAckIndex), nil)
defer ackIter.Release()
for ackIter.Next() {
k := ackIter.Key()
Expand Down Expand Up @@ -338,15 +349,15 @@ func (c *Chain) pushBlock(b *types.Block) (err error) {
Height: node.height,
}

blockKey = utils.ConcatAll(metaBlockIndex[:], node.indexKey())
blockKey = utils.ConcatAll(c.metaBlockIndex, node.indexKey())
encBlock *bytes.Buffer
)
if encBlock, err = utils.EncodeMsgPack(b); err != nil {
return
}

// Put block
err = c.bdb.Put(blockKey, encBlock.Bytes(), nil)
err = blkDB.Put(blockKey, encBlock.Bytes(), nil)
if err != nil {
err = errors.Wrapf(err, "put %s", string(node.indexKey()))
return
Expand Down Expand Up @@ -408,14 +419,14 @@ func (c *Chain) pushAckedQuery(ack *types.SignedAckHeader) (err error) {
return
}

tdbKey := utils.ConcatAll(metaAckIndex[:], k, ack.Hash().AsBytes())
tdbKey := utils.ConcatAll(c.metaAckIndex, k, ack.Hash().AsBytes())

if err = c.register(ack); err != nil {
err = errors.Wrapf(err, "register ack %v at height %d", ack.Hash(), h)
return
}

if err = c.tdb.Put(tdbKey, enc.Bytes(), nil); err != nil {
if err = txDB.Put(tdbKey, enc.Bytes(), nil); err != nil {
err = errors.Wrapf(err, "put ack %d %s", h, ack.Hash().String())
return
}
Expand Down Expand Up @@ -763,17 +774,8 @@ func (c *Chain) Stop() (err error) {
le.Debug("stopping chain")
c.rt.stop(c.databaseID)
le.Debug("chain service and workers stopped")
// Close LevelDB file
var ierr error
if ierr = c.bdb.Close(); ierr != nil && err == nil {
err = ierr
}
le.WithError(ierr).Debug("chain database closed")
if ierr = c.tdb.Close(); ierr != nil && err == nil {
err = ierr
}
le.WithError(ierr).Debug("chain database closed")
// Close state
var ierr error
if ierr = c.st.Close(false); ierr != nil && err == nil {
err = ierr
}
Expand Down Expand Up @@ -817,9 +819,9 @@ func (c *Chain) FetchBlockByCount(count int32) (b *types.Block, realCount int32,
}

func (c *Chain) fetchBlockByIndexKey(indexKey []byte) (b *types.Block, err error) {
k := utils.ConcatAll(metaBlockIndex[:], indexKey)
k := utils.ConcatAll(c.metaBlockIndex, indexKey)
var v []byte
v, err = c.bdb.Get(k, nil)
v, err = blkDB.Get(k, nil)
if err != nil {
err = errors.Wrapf(err, "fetch block %s", string(k))
return
Expand Down
2 changes: 1 addition & 1 deletion worker/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func NewDatabase(cfg *DBConfig, peers *proto.Peers,
}

// init chain
chainFile := filepath.Join(cfg.DataDir, SQLChainFileName)
chainFile := filepath.Join(cfg.RootDir, SQLChainFileName)
if db.nodeID, err = kms.GetLocalNodeID(); err != nil {
return
}
Expand Down
1 change: 1 addition & 0 deletions worker/db_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// DBConfig defines the database config.
type DBConfig struct {
DatabaseID proto.DatabaseID
RootDir string
DataDir string
KayakMux *DBKayakMuxService
ChainMux *sqlchain.MuxService
Expand Down
1 change: 1 addition & 0 deletions worker/dbms.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (dbms *DBMS) Create(instance *types.ServiceInstance, cleanup bool) (err err
// new db
dbCfg := &DBConfig{
DatabaseID: instance.DatabaseID,
RootDir: dbms.cfg.RootDir,
DataDir: rootDir,
KayakMux: dbms.kayakMux,
ChainMux: dbms.chainMux,
Expand Down