Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

const (
paramUseLeader = "use_leader"
paramUseFollower = "use_follower"
paramMirror = "mirror"
paramUseLeader = "use_leader"
paramUseFollower = "use_follower"
paramUseDirectRPC = "use_direct_rpc"
paramMirror = "mirror"
)

// Config is a configuration parsed from a DSN string.
Expand All @@ -42,6 +43,9 @@ type Config struct {
// UseFollower use follower nodes to do queries
UseFollower bool

// UseDirectRPC use direct RPC to access the miner
UseDirectRPC bool

// Mirror option forces client to query from mirror server
Mirror string
}
Expand Down Expand Up @@ -71,6 +75,9 @@ func (cfg *Config) FormatDSN() string {
if cfg.Mirror != "" {
newQuery.Add(paramMirror, cfg.Mirror)
}
if cfg.UseDirectRPC {
newQuery.Add(paramUseDirectRPC, strconv.FormatBool(cfg.UseDirectRPC))
}
u.RawQuery = newQuery.Encode()

return u.String()
Expand Down Expand Up @@ -98,6 +105,7 @@ func ParseDSN(dsn string) (cfg *Config, err error) {
cfg.UseLeader = true
}
cfg.Mirror = q.Get(paramMirror)
cfg.UseDirectRPC, _ = strconv.ParseBool(q.Get(paramUseDirectRPC))

return cfg, nil
}
21 changes: 17 additions & 4 deletions client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/route"
rpc "github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/types"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"github.com/CovenantSQL/CovenantSQL/utils/trace"
Expand Down Expand Up @@ -87,15 +88,21 @@ func newConn(cfg *Config) (c *conn, err error) {
if cfg.Mirror != "" {
c.leader = &pconn{
parent: c,
pCaller: rpc.NewRawCaller(cfg.Mirror),
pCaller: mux.NewRawCaller(cfg.Mirror),
}

// no ack workers required, mirror mode does not support ack worker
} else {
if cfg.UseLeader {
var caller rpc.PCaller
if cfg.UseDirectRPC {
caller = rpc.NewPersistentCaller(peers.Leader)
} else {
caller = mux.NewPersistentCaller(peers.Leader)
}
c.leader = &pconn{
parent: c,
pCaller: rpc.NewPersistentCaller(peers.Leader),
pCaller: caller,
}
}

Expand All @@ -104,9 +111,15 @@ func newConn(cfg *Config) (c *conn, err error) {
for {
node := peers.Servers[randSource.Intn(len(peers.Servers))]
if node != peers.Leader {
var caller rpc.PCaller
if cfg.UseDirectRPC {
caller = rpc.NewPersistentCaller(node)
} else {
caller = mux.NewPersistentCaller(node)
}
c.follower = &pconn{
parent: c,
pCaller: rpc.NewPersistentCaller(node),
pCaller: caller,
}
break
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/cql-minerd/dbms.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/crypto/hash"
rpc "github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/worker"
)

var rootHash = hash.Hash{}

func startDBMS(server *rpc.Server, onCreateDB func()) (dbms *worker.DBMS, err error) {
func startDBMS(server *mux.Server, direct *rpc.Server, onCreateDB func()) (dbms *worker.DBMS, err error) {
if conf.GConf.Miner == nil {
err = errors.New("invalid database config")
return
Expand All @@ -36,6 +37,7 @@ func startDBMS(server *rpc.Server, onCreateDB func()) (dbms *worker.DBMS, err er
cfg := &worker.DBMSConfig{
RootDir: conf.GConf.Miner.RootDir,
Server: server,
DirectServer: direct,
MaxReqTimeGap: conf.GConf.Miner.MaxReqTimeGap,
OnCreateDatabase: onCreateDB,
}
Expand Down
26 changes: 24 additions & 2 deletions cmd/cql-minerd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/CovenantSQL/CovenantSQL/crypto"
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/naconn"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/route"
rpc "github.com/CovenantSQL/CovenantSQL/rpc/mux"
Expand Down Expand Up @@ -71,6 +72,7 @@ var (
benchMinerCount int
benchBypassSignature bool
benchEventualConsistency bool
benchMinerDirectRPC bool
benchMinerConfigDir string
)

Expand All @@ -81,10 +83,20 @@ func init() {
"Benchmark bypassing signature.")
flag.BoolVar(&benchEventualConsistency, "bench-eventual-consistency", false,
"Benchmark with eventaul consistency.")
flag.BoolVar(&benchMinerDirectRPC, "bench-direct-rpc", false,
"Benchmark with with direct RPC protocol.")
flag.StringVar(&benchMinerConfigDir, "bench-miner-config-dir", "",
"Benchmark custome miner config directory.")
}

func TestMain(m *testing.M) {
flag.Parse()
if benchMinerDirectRPC {
naconn.RegisterResolver(rpc.NewDirectResolver())
}
os.Exit(m.Run())
}

func startNodes() {
ctx := context.Background()

Expand Down Expand Up @@ -703,6 +715,7 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {

var i int64
i = -1
db.SetMaxIdleConns(256)

b.Run(makeBenchName("INSERT"), func(b *testing.B) {
b.ResetTimer()
Expand Down Expand Up @@ -834,6 +847,7 @@ func benchMiner(b *testing.B, minerCount uint16) {
ResourceMeta: types.ResourceMeta{
Node: minerCount,
UseEventualConsistency: benchEventualConsistency,
IsolationLevel: int(sql.LevelReadUncommitted),
},
}
// wait for chain service
Expand All @@ -856,6 +870,13 @@ func benchMiner(b *testing.B, minerCount uint16) {
dsn = os.Getenv("DSN")
}

if benchMinerDirectRPC {
dsnCfg, err := client.ParseDSN(dsn)
So(err, ShouldBeNil)
dsnCfg.UseDirectRPC = true
dsn = dsnCfg.FormatDSN()
}

db, err := sql.Open("covenantsql", dsn)
So(err, ShouldBeNil)

Expand Down Expand Up @@ -942,8 +963,9 @@ func benchOutsideMinerWithTargetMinerList(
// create
meta := client.ResourceMeta{
ResourceMeta: types.ResourceMeta{
TargetMiners: targetMiners,
Node: minerCount,
TargetMiners: targetMiners,
Node: minerCount,
IsolationLevel: int(sql.LevelReadUncommitted),
},
AdvancePayment: 1000000000,
}
Expand Down
25 changes: 17 additions & 8 deletions cmd/cql-minerd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/metric"
rpc "github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
_ "github.com/CovenantSQL/CovenantSQL/utils/log/debug"
Expand Down Expand Up @@ -169,8 +170,11 @@ func main() {
conf.GConf.GenerateKeyPair = genKeyPair

// start rpc
var server *rpc.Server
if server, err = initNode(); err != nil {
var (
server *mux.Server
direct *rpc.Server
)
if server, direct, err = initNode(); err != nil {
log.WithError(err).Fatal("init node failed")
}

Expand Down Expand Up @@ -213,7 +217,7 @@ func main() {

// start dbms
var dbms *worker.DBMS
if dbms, err = startDBMS(server, func() {
if dbms, err = startDBMS(server, direct, func() {
sendProvideService(reg)
}); err != nil {
log.WithError(err).Fatal("start dbms failed")
Expand All @@ -225,10 +229,15 @@ func main() {
go func() {
server.Serve()
}()
defer func() {
_ = server.Listener.Close()
server.Stop()
}()
defer server.Stop()

// start direct rpc server
if direct != nil {
go func() {
direct.Serve()
}()
defer direct.Stop()
}

if metricLog {
go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.StandardLogger())
Expand Down
31 changes: 23 additions & 8 deletions cmd/cql-minerd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/route"
rpc "github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/rpc/mux"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
)

func initNode() (server *rpc.Server, err error) {
func initNode() (server *mux.Server, direct *rpc.Server, err error) {
var masterKey []byte
if !conf.GConf.UseTestMasterKey {
// read master key
Expand All @@ -53,30 +54,44 @@ func initNode() (server *rpc.Server, err error) {
// init kms routing
route.InitKMS(conf.GConf.PubKeyStoreFile)

err = rpc.RegisterNodeToBP(30 * time.Second)
err = mux.RegisterNodeToBP(30 * time.Second)
if err != nil {
log.Fatalf("register node to BP failed: %v", err)
}

// init server
utils.RemoveAll(conf.GConf.PubKeyStoreFile + "*")
if server, err = createServer(
conf.GConf.PrivateKeyFile, conf.GConf.PubKeyStoreFile, masterKey, conf.GConf.ListenAddr); err != nil {
conf.GConf.PrivateKeyFile, masterKey, conf.GConf.ListenAddr); err != nil {
log.WithError(err).Error("create server failed")
return
}
if direct, err = createDirectServer(
conf.GConf.PrivateKeyFile, masterKey, conf.GConf.ListenDirectAddr); err != nil {
log.WithError(err).Error("create direct server failed")
return
}

return
}

func createServer(privateKeyPath, pubKeyStorePath string, masterKey []byte, listenAddr string) (server *rpc.Server, err error) {
utils.RemoveAll(pubKeyStorePath + "*")
func createServer(privateKeyPath string, masterKey []byte, listenAddr string) (server *mux.Server, err error) {
server = mux.NewServer()
if err != nil {
return
}
err = server.InitRPCServer(listenAddr, privateKeyPath, masterKey)
return
}

func createDirectServer(privateKeyPath string, masterKey []byte, listenAddr string) (server *rpc.Server, err error) {
if listenAddr == "" {
return nil, nil
}
server = rpc.NewServer()
if err != nil {
return
}

err = server.InitRPCServer(listenAddr, privateKeyPath, masterKey)

return
}
11 changes: 6 additions & 5 deletions cmd/cqld/initconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ func initNodePeers(nodeID proto.NodeID, publicKeystorePath string) (nodes *[]pro
rawNodeID := &proto.RawNodeID{Hash: *rawNodeIDHash}
route.SetNodeAddrCache(rawNodeID, p.Addr)
node := &proto.Node{
ID: p.ID,
Addr: p.Addr,
PublicKey: p.PublicKey,
Nonce: p.Nonce,
Role: p.Role,
ID: p.ID,
Addr: p.Addr,
DirectAddr: p.DirectAddr,
PublicKey: p.PublicKey,
Nonce: p.Nonce,
Role: p.Role,
}
err = kms.SetNode(node)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ type Config struct {
StartupSyncHoles bool `yaml:"StartupSyncHoles,omitempty"`
GenerateKeyPair bool `yaml:"-"`
//TODO(auxten): set yaml key for config
WorkingRoot string `yaml:"WorkingRoot"`
PubKeyStoreFile string `yaml:"PubKeyStoreFile"`
PrivateKeyFile string `yaml:"PrivateKeyFile"`
DHTFileName string `yaml:"DHTFileName"`
ListenAddr string `yaml:"ListenAddr"`
ThisNodeID proto.NodeID `yaml:"ThisNodeID"`
ValidDNSKeys map[string]string `yaml:"ValidDNSKeys"` // map[DNSKEY]domain
WorkingRoot string `yaml:"WorkingRoot"`
PubKeyStoreFile string `yaml:"PubKeyStoreFile"`
PrivateKeyFile string `yaml:"PrivateKeyFile"`
DHTFileName string `yaml:"DHTFileName"`
ListenAddr string `yaml:"ListenAddr"`
ListenDirectAddr string `yaml:"ListenDirectAddr",omitempty`
ThisNodeID proto.NodeID `yaml:"ThisNodeID"`
ValidDNSKeys map[string]string `yaml:"ValidDNSKeys"` // map[DNSKEY]domain
// Check By BP DHT.Ping
MinNodeIDDifficulty int `yaml:"MinNodeIDDifficulty"`

Expand Down
Loading