Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions go/internal/feast/registry/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package registry

import (
"context"
"fmt"
"time"

"github.com/feast-dev/feast/go/protos/feast/core"
"github.com/feast-dev/feast/go/protos/feast/registry"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type GrpcRegistryStore struct {
project string
endpoint string
clientId string
conn *grpc.ClientConn
client registry.RegistryServerClient
timeout time.Duration
}

func NewGrpcRegistryStore(config *RegistryConfig, project string) (*GrpcRegistryStore, error) {
log.Info().Msgf("Using gRPC Registry: %s", config.Path)

conn, err := grpc.NewClient(
config.Path,
grpc.WithBlock(),
grpc.WithTimeout(5*time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to gRPC registry: %w", err)
}

client := registry.NewRegistryServerClient(conn)

grs := &GrpcRegistryStore{
project: project,
endpoint: config.Path,
clientId: config.ClientId,
conn: conn,
client: client,
timeout: 5 * time.Second,
}

if err := grs.TestConnectivity(); err != nil {
conn.Close()
return nil, err
}

return grs, nil
}

func (grs *GrpcRegistryStore) TestConnectivity() error {
// May consider removing this, however added for feature parity with http_server, however no obvious healthcheck exists
ctx, cancel := context.WithTimeout(context.Background(), grs.timeout)
defer cancel()

ctx = grs.withMetadata(ctx)

req := &registry.ListEntitiesRequest{
Project: grs.project,
AllowCache: false,
}

_, err := grs.client.ListEntities(ctx, req)
if err != nil {
return fmt.Errorf("gRPC registry connectivity check failed: %w", err)
}

return nil
}

func (grs *GrpcRegistryStore) withMetadata(ctx context.Context) context.Context {
if grs.clientId != "" {
md := metadata.Pairs("client-id", grs.clientId)
ctx = metadata.NewOutgoingContext(ctx, md)
}
return ctx
}

func (grs *GrpcRegistryStore) getEntity(name string, allowCache bool) (*core.Entity, error) {
ctx, cancel := context.WithTimeout(context.Background(), grs.timeout)
defer cancel()

ctx = grs.withMetadata(ctx)

req := &registry.GetEntityRequest{
Name: name,
Project: grs.project,
AllowCache: allowCache,
}

entity, err := grs.client.GetEntity(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get entity %s: %w", name, err)
}

return entity, nil
}

func (grs *GrpcRegistryStore) getFeatureView(name string, allowCache bool) (*core.FeatureView, error) {
ctx, cancel := context.WithTimeout(context.Background(), grs.timeout)
defer cancel()

ctx = grs.withMetadata(ctx)

req := &registry.GetFeatureViewRequest{
Name: name,
Project: grs.project,
AllowCache: allowCache,
}

fv, err := grs.client.GetFeatureView(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get feature view %s: %w", name, err)
}

return fv, nil
}

func (grs *GrpcRegistryStore) getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) {
ctx, cancel := context.WithTimeout(context.Background(), grs.timeout)
defer cancel()

ctx = grs.withMetadata(ctx)

req := &registry.GetSortedFeatureViewRequest{
Name: name,
Project: grs.project,
AllowCache: allowCache,
}

sfv, err := grs.client.GetSortedFeatureView(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get sorted feature view %s: %w", name, err)
}

return sfv, nil
}

func (grs *GrpcRegistryStore) getFeatureService(name string, allowCache bool) (*core.FeatureService, error) {
ctx, cancel := context.WithTimeout(context.Background(), grs.timeout)
defer cancel()

ctx = grs.withMetadata(ctx)

req := &registry.GetFeatureServiceRequest{
Name: name,
Project: grs.project,
AllowCache: allowCache,
}

fs, err := grs.client.GetFeatureService(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get feature service %s: %w", name, err)
}

return fs, nil
}

func (grs *GrpcRegistryStore) getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) {
ctx, cancel := context.WithTimeout(context.Background(), grs.timeout)
defer cancel()

ctx = grs.withMetadata(ctx)

req := &registry.GetOnDemandFeatureViewRequest{
Name: name,
Project: grs.project,
AllowCache: allowCache,
}

odfv, err := grs.client.GetOnDemandFeatureView(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get on-demand feature view %s: %w", name, err)
}

return odfv, nil
}

func (grs *GrpcRegistryStore) GetRegistryProto() (*core.Registry, error) {
// gRPC stores use fallback mode, so this returns empty registry
registry := core.Registry{}
return &registry, nil
}

func (grs *GrpcRegistryStore) UpdateRegistryProto(rp *core.Registry) error {
return &NotImplementedError{FunctionName: "UpdateRegistryProto"}
}

func (grs *GrpcRegistryStore) Teardown() error {
if grs.conn != nil {
return grs.conn.Close()
}
return nil
}

func (grs *GrpcRegistryStore) HasFallback() bool {
return true
}
11 changes: 6 additions & 5 deletions go/internal/feast/registry/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/feast-dev/feast/go/protos/feast/core"

"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -107,7 +108,7 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu
return nil
}

func (r *HttpRegistryStore) getFeatureService(name string, allowCache bool) (*core.FeatureService, error) {
func (r *HttpRegistryStore) GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) {
url := fmt.Sprintf("%s/projects/%s/feature_services/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache)
featureService := &core.FeatureService{}
err := r.loadProtobufMessages(url, func(data []byte) error {
Expand All @@ -123,7 +124,7 @@ func (r *HttpRegistryStore) getFeatureService(name string, allowCache bool) (*co
return featureService, nil
}

func (r *HttpRegistryStore) getEntity(name string, allowCache bool) (*core.Entity, error) {
func (r *HttpRegistryStore) GetEntity(name string, allowCache bool) (*core.Entity, error) {
url := fmt.Sprintf("%s/projects/%s/entities/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache)
entity := &core.Entity{}
err := r.loadProtobufMessages(url, func(data []byte) error {
Expand All @@ -140,7 +141,7 @@ func (r *HttpRegistryStore) getEntity(name string, allowCache bool) (*core.Entit
return entity, nil
}

func (r *HttpRegistryStore) getFeatureView(name string, allowCache bool) (*core.FeatureView, error) {
func (r *HttpRegistryStore) GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) {
url := fmt.Sprintf("%s/projects/%s/feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache)
featureView := &core.FeatureView{}
err := r.loadProtobufMessages(url, func(data []byte) error {
Expand All @@ -156,7 +157,7 @@ func (r *HttpRegistryStore) getFeatureView(name string, allowCache bool) (*core.
return featureView, nil
}

func (r *HttpRegistryStore) getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) {
func (r *HttpRegistryStore) GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) {
url := fmt.Sprintf("%s/projects/%s/on_demand_feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache)
onDemandFeatureView := &core.OnDemandFeatureView{}
err := r.loadProtobufMessages(url, func(data []byte) error {
Expand All @@ -172,7 +173,7 @@ func (r *HttpRegistryStore) getOnDemandFeatureView(name string, allowCache bool)
return onDemandFeatureView, nil
}

func (r *HttpRegistryStore) getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) {
func (r *HttpRegistryStore) GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) {
url := fmt.Sprintf("%s/projects/%s/sorted_feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache)
sortedFeatureView := &core.SortedFeatureView{}
err := r.loadProtobufMessages(url, func(data []byte) error {
Expand Down
Loading
Loading