Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions sqlchain/ackindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ import (
"github.com/CovenantSQL/CovenantSQL/utils/log"
)

var (
// Global atomic counters for stats
multiIndexCount int32
responseCount int32
ackCount int32
)

type multiAckIndex struct {
owner *ackIndex // for atomic counter access only

sync.RWMutex
// respIndex is the index of query responses without acks
respIndex map[types.QueryKey]*types.SignedResponseHeader
Expand All @@ -54,7 +49,7 @@ func (i *multiAckIndex) addResponse(resp *types.SignedResponseHeader) (err error
return
}
i.respIndex[key] = resp
atomic.AddInt32(&responseCount, 1)
atomic.AddInt32(&i.owner.responseCount, 1)
return
}

Expand All @@ -77,8 +72,8 @@ func (i *multiAckIndex) register(ack *types.SignedAckHeader) (err error) {
}
delete(i.respIndex, key)
i.ackIndex[key] = ack
atomic.AddInt32(&responseCount, -1)
atomic.AddInt32(&ackCount, 1)
atomic.AddInt32(&i.owner.responseCount, -1)
atomic.AddInt32(&i.owner.ackCount, 1)
return
}

Expand All @@ -89,7 +84,7 @@ func (i *multiAckIndex) remove(ack *types.SignedAckHeader) (err error) {
defer i.Unlock()
if _, ok := i.respIndex[key]; ok {
delete(i.respIndex, key)
atomic.AddInt32(&responseCount, -1)
atomic.AddInt32(&i.owner.responseCount, -1)
return
}
if oack, ok := i.ackIndex[key]; ok {
Expand All @@ -99,7 +94,7 @@ func (i *multiAckIndex) remove(ack *types.SignedAckHeader) (err error) {
return
}
delete(i.ackIndex, key)
atomic.AddInt32(&ackCount, -1)
atomic.AddInt32(&i.owner.ackCount, -1)
return
}
err = errors.Wrapf(ErrQueryNotFound, "remove key %s -x- ack %s", &key, ack.Hash())
Expand Down Expand Up @@ -151,6 +146,11 @@ type ackIndex struct {

sync.RWMutex
barrier int32

// Atomic counters for stats
multiIndexCount int32
responseCount int32
ackCount int32
}

func newAckIndex() *ackIndex {
Expand All @@ -169,11 +169,12 @@ func (i *ackIndex) load(h int32) (mi *multiAckIndex, err error) {
}
if mi, ok = i.hi[h]; !ok {
mi = &multiAckIndex{
owner: i,
respIndex: make(map[types.QueryKey]*types.SignedResponseHeader),
ackIndex: make(map[types.QueryKey]*types.SignedAckHeader),
}
i.hi[h] = mi
atomic.AddInt32(&multiIndexCount, 1)
atomic.AddInt32(&i.multiIndexCount, 1)
}
return
}
Expand All @@ -192,10 +193,10 @@ func (i *ackIndex) advance(h int32) {
// Record expired and not acknowledged queries
for _, v := range dl {
v.expire()
atomic.AddInt32(&responseCount, int32(-len(v.respIndex)))
atomic.AddInt32(&ackCount, int32(-len(v.ackIndex)))
atomic.AddInt32(&i.responseCount, int32(-len(v.respIndex)))
atomic.AddInt32(&i.ackCount, int32(-len(v.ackIndex)))
}
atomic.AddInt32(&multiIndexCount, int32(-len(dl)))
atomic.AddInt32(&i.multiIndexCount, int32(-len(dl)))
}

func (i *ackIndex) addResponse(h int32, resp *types.SignedResponseHeader) (err error) {
Expand Down
28 changes: 23 additions & 5 deletions sqlchain/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,49 @@ package sqlchain
import (
"encoding/binary"
"sync"
"sync/atomic"

"github.com/CovenantSQL/CovenantSQL/crypto/hash"
"github.com/CovenantSQL/CovenantSQL/types"
)

type blockNode struct {
parent *blockNode
block *types.Block
block atomic.Value // store *types.Block
hash hash.Hash
height int32 // height is the chain height of the head
count int32 // count counts the blocks (except genesis) at this head
}

func newBlockNode(height int32, block *types.Block, parent *blockNode) *blockNode {
func newBlockNodeEx(height int32, hash *hash.Hash, block *types.Block, parent *blockNode) *blockNode {
var count int32
if parent != nil {
count = parent.count + 1
}
return &blockNode{
hash: block.SignedHeader.HSV.DataHash,
node := &blockNode{
hash: *hash,
parent: parent,
block: block,
height: height,
count: count,
}
node.block.Store(block)
return node
}

func newBlockNode(height int32, block *types.Block, parent *blockNode) *blockNode {
return newBlockNodeEx(height, block.BlockHash(), block, parent)
}

func (n *blockNode) load() *types.Block {
return n.block.Load().(*types.Block)
}

func (n *blockNode) clear() (cleared bool) {
if n.load() != nil {
cleared = true
n.block.Store((*types.Block)(nil))
}
return
}

func (n *blockNode) ancestor(height int32) (ancestor *blockNode) {
Expand Down
63 changes: 27 additions & 36 deletions sqlchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"database/sql"
"encoding/binary"
"fmt"
rt "runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -46,10 +45,6 @@ import (
xs "github.com/CovenantSQL/CovenantSQL/xenomint/sqlite"
)

const (
minBlockCacheTTL = int32(30)
)

var (
metaBlockIndex = [4]byte{'B', 'L', 'C', 'K'}
metaResponseIndex = [4]byte{'R', 'E', 'S', 'P'}
Expand All @@ -61,18 +56,8 @@ var (
leveldbInit sync.Once
blkDB *leveldb.DB
txDB *leveldb.DB

// Atomic counters for stats
cachedBlockCount int32
)

func trackBlock(b *types.Block) {
atomic.AddInt32(&cachedBlockCount, 1)
rt.SetFinalizer(b, func(_ *types.Block) {
atomic.AddInt32(&cachedBlockCount, -1)
})
}

// heightToKey converts a height in int32 to a key in bytes.
func heightToKey(h int32) (key []byte) {
key = make([]byte, 4)
Expand Down Expand Up @@ -125,6 +110,9 @@ type Chain struct {
metaBlockIndex []byte
metaResponseIndex []byte
metaAckIndex []byte

// Atomic counters for stats
cachedBlockCount int32
}

// NewChain creates a new sql-chain struct.
Expand Down Expand Up @@ -257,7 +245,9 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
id = nid
}

last = newBlockNode(chain.rt.getHeightFromTime(block.Timestamp()), block, parent)
// do not cache block in memory in reloading
last = newBlockNodeEx(
chain.rt.getHeightFromTime(block.Timestamp()), block.BlockHash(), nil, parent)
chain.bi.addBlock(last)
}
if err = blockIter.Error(); err != nil {
Expand All @@ -281,7 +271,6 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
}
chain.rt.setHead(head)
chain.st.SetSeq(id)
chain.pruneBlockCache()

// Read queries and rebuild memory index
respIter := txDB.NewIterator(util.BytesPrefix(chain.metaResponseIndex), nil)
Expand Down Expand Up @@ -368,6 +357,7 @@ func (c *Chain) pushBlock(b *types.Block) (err error) {
err = errors.Wrapf(err, "put %s", string(node.indexKey()))
return
}
atomic.AddInt32(&c.cachedBlockCount, 1)
c.rt.setHead(head)
c.bi.addBlock(node)

Expand Down Expand Up @@ -469,7 +459,6 @@ func (c *Chain) produceBlock(now time.Time) (err error) {
QueryTxs: make([]*types.QueryAsTx, len(qts)),
Acks: c.ai.acks(c.rt.getHeightFromTime(now)),
}
trackBlock(block)
for i, v := range qts {
// TODO(leventeliu): maybe block waiting at a ready channel instead?
for !v.Ready() {
Expand Down Expand Up @@ -571,7 +560,6 @@ func (c *Chain) syncHead() {
); err != nil || resp.Block == nil {
ile.WithError(err).Debug("failed to fetch block from peer")
} else {
trackBlock(resp.Block)
select {
case c.blocks <- resp.Block:
case <-c.rt.ctx.Done():
Expand Down Expand Up @@ -700,7 +688,11 @@ func (c *Chain) processBlocks(ctx context.Context) {
select {
case h := <-c.heights:
// Trigger billing
if uint64(h)%c.updatePeriod == 0 {
index, total := c.rt.getIndexTotal()
period := int32(c.updatePeriod)
isBillingPeriod := (h%period == 0)
isMyTurnBilling := (h/period%total == index)
if isBillingPeriod && isMyTurnBilling {
ub, err := c.billing(h, c.rt.getHead().node)
if err != nil {
le.WithError(err).Error("billing failed")
Expand Down Expand Up @@ -792,12 +784,8 @@ func (c *Chain) Stop() (err error) {
// FetchBlock fetches the block at specified height from local cache.
func (c *Chain) FetchBlock(height int32) (b *types.Block, err error) {
if n := c.rt.getHead().node.ancestor(height); n != nil {
b, err = c.fetchBlockByIndexKey(n.indexKey())
if err != nil {
return
}
return c.fetchBlockByIndexKey(n.indexKey())
}

return
}

Expand Down Expand Up @@ -834,7 +822,6 @@ func (c *Chain) fetchBlockByIndexKey(indexKey []byte) (b *types.Block, err error
}

b = &types.Block{}
trackBlock(b)
err = utils.DecodeMsgPack(v, b)
if err != nil {
err = errors.Wrapf(err, "fetch block %s", string(k))
Expand Down Expand Up @@ -961,28 +948,32 @@ func (c *Chain) remove(ack *types.SignedAckHeader) (err error) {

func (c *Chain) pruneBlockCache() {
var (
head = c.rt.getHead().node
lastCnt int32
head = c.rt.getHead().node
nextTurn = c.rt.getNextTurn()
lastCnt int32
)
if head == nil {
return
}
lastCnt = head.count - c.rt.blockCacheTTL
lastCnt = nextTurn - c.rt.blockCacheTTL
if h := c.rt.getLastBillingHeight(); h < lastCnt {
lastCnt = h // also keep cache for billing if possible
}
// Move to last count position
for ; head != nil && head.count > lastCnt; head = head.parent {
}
// Prune block references
for ; head != nil && head.block != nil; head = head.parent {
head.block = nil
for ; head != nil && head.clear(); head = head.parent {
atomic.AddInt32(&c.cachedBlockCount, -1)
}
}

func (c *Chain) stat() {
var (
ic = atomic.LoadInt32(&multiIndexCount)
rc = atomic.LoadInt32(&responseCount)
tc = atomic.LoadInt32(&ackCount)
bc = atomic.LoadInt32(&cachedBlockCount)
ic = atomic.LoadInt32(&c.ai.multiIndexCount)
rc = atomic.LoadInt32(&c.ai.responseCount)
tc = atomic.LoadInt32(&c.ai.ackCount)
bc = atomic.LoadInt32(&c.cachedBlockCount)
)
// Print chain stats
c.logEntry().WithFields(log.Fields{
Expand Down Expand Up @@ -1011,7 +1002,7 @@ func (c *Chain) billing(h int32, node *blockNode) (ub *types.UpdateBilling, err
for iter = node; iter != nil && iter.height > h; iter = iter.parent {
}
for iter != nil && iter.height > minHeight {
var block = iter.block
var block = iter.load()
// Not cached, recover from storage
if block == nil {
if block, err = c.FetchBlock(iter.height); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions sqlchain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,17 @@ func TestMultiChain(t *testing.T) {
i, c.rt.getPeerInfoString())
continue
}
if node.block != nil {
t.Logf("checking block %v at height %d in peer %s",
node.block.BlockHash(), i, c.rt.getPeerInfoString())
block := node.load()
if block == nil {
var err error
if block, err = c.FetchBlock(node.height); err != nil || block == nil {
t.Errorf("failed to load block %v at height %d in peer %s: %v",
block.BlockHash(), i, c.rt.getPeerInfoString(), err)
continue
}
}
t.Logf("checking block %v at height %d in peer %s",
block.BlockHash(), i, c.rt.getPeerInfoString())
}
}(v.chain)
}
Expand Down
8 changes: 2 additions & 6 deletions sqlchain/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,9 @@ type runtime struct {
}

func blockCacheTTLRequired(c *Config) (ttl int32) {
var billingRequiredTTL = int32(2 * c.UpdatePeriod)
ttl = c.BlockCacheTTL
if ttl < minBlockCacheTTL {
ttl = minBlockCacheTTL
}
if ttl < billingRequiredTTL {
ttl = billingRequiredTTL
if ttl < 0 {
ttl = 0
}
return
}
Expand Down
12 changes: 6 additions & 6 deletions sqlchain/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@ func TestBlockCacheTTL(t *testing.T) {
}{
{
config: &Config{
BlockCacheTTL: 0,
BlockCacheTTL: -1,
UpdatePeriod: 0,
},
expect: minBlockCacheTTL,
expect: 0,
},
{
config: &Config{
BlockCacheTTL: minBlockCacheTTL + 1,
BlockCacheTTL: 100,
UpdatePeriod: 0,
},
expect: minBlockCacheTTL + 1,
expect: 100,
},
{
config: &Config{
BlockCacheTTL: 0,
UpdatePeriod: uint64(minBlockCacheTTL + 1),
UpdatePeriod: 100,
},
expect: 2 * (minBlockCacheTTL + 1),
expect: 0,
},
}
for _, v := range cases {
Expand Down