Skip to content

Commit

Permalink
modifications in grpc.go
Browse files Browse the repository at this point in the history
Signed-off-by: Aishwarya2001 <[email protected]>
  • Loading branch information
Aishwarya2001A committed Feb 12, 2025
1 parent 3581198 commit ebf35de
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 124 deletions.
51 changes: 25 additions & 26 deletions components/ingest-service/cmd/ingest-service/commands/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/chef/automate/components/ingest-service/config"
"github.com/chef/automate/components/ingest-service/grpc"
"github.com/chef/automate/components/ingest-service/rest"
"github.com/chef/automate/components/ingest-service/serveropts"
"github.com/chef/automate/components/ingest-service/storage"
"github.com/chef/automate/lib/grpc/secureconn"
"github.com/chef/automate/lib/tls/certs"
"github.com/chef/automate/lib/tracing"
Expand All @@ -37,21 +35,22 @@ var serveCmd = &cobra.Command{

// construct GRPC endpoint for gateway
endpoint := fmt.Sprintf("%s:%d", conf.Host, conf.Port)
dbConfig := &config.Storage{
URI: conf.Storage.URI,
DBUser: conf.Storage.DBUser,
Database: conf.Storage.Database,
SchemaPath: conf.Storage.SchemaPath,
MaxOpenConns: conf.Storage.MaxOpenConns,
MaxIdleConns: conf.Storage.MaxIdleConns,
}

// Initialize PostgreSQL connection and run migrations
_, err = storage.ConnectAndMigrate(dbConfig)
if err != nil {
logrus.WithError(err).Fatal("Error connecting to DB and running migrations")
}
logrus.Info("Database connection and migrations successful!")
// dbConfig := &config.Storage{
// URI: conf.Storage.URI,
// DBUser: conf.Storage.DBUser,
// Database: conf.Storage.Database,
// SchemaPath: conf.Storage.SchemaPath,
// MaxOpenConns: conf.Storage.MaxOpenConns,
// MaxIdleConns: conf.Storage.MaxIdleConns,
// }

// // Initialize PostgreSQL connection and run migrations
// _, err = storage.ConnectAndMigrate(dbConfig)
// if err != nil {
// logrus.WithError(err).Fatal("Error connecting to DB and running migrations")
// }
// logrus.Info("Database connection and migrations successful!")

// Spawn a gRPC Client in a goroutine
//
Expand All @@ -61,11 +60,11 @@ var serveCmd = &cobra.Command{
//
// TODO: Figure out how to respawn if client crashes?
if os.Getenv(devModeEnvVar) == "true" {
go rest.Spawn(endpoint, conf) // Modified to pass `db`
go rest.Spawn(endpoint, conf)
}

// Start the gRPC Server
err = grpc.Spawn(conf) // Modified to pass `db`
err = grpc.Spawn(conf)
if err != nil {
logrus.WithError(err).Fatal("spawn failed")
}
Expand Down Expand Up @@ -132,14 +131,14 @@ func readCliParams() *serveropts.Opts {
MissingNodesForDeletionRunningDefault: missingNodesForDeletionRunningDefault,
NodesMissingRunningDefault: nodesMissingRunningDefault,
},
Storage: serveropts.StorageConfig{ //Added Storage Configuration
URI: viper.GetString("postgresql-url"),
DBUser: viper.GetString("postgresql-user"),
Database: viper.GetString("postgresql-database"),
SchemaPath: viper.GetString("postgresql-schema-path"),
MaxOpenConns: viper.GetInt("postgresql-max-open-conns"),
MaxIdleConns: viper.GetInt("postgresql-max-idle-conns"),
},
// Storage: serveropts.StorageConfig{ //Added Storage Configuration
// URI: viper.GetString("postgresql-url"),
// DBUser: viper.GetString("postgresql-user"),
// Database: viper.GetString("postgresql-database"),
// SchemaPath: viper.GetString("postgresql-schema-path"),
// MaxOpenConns: viper.GetInt("postgresql-max-open-conns"),
// MaxIdleConns: viper.GetInt("postgresql-max-idle-conns"),
// },
ConnFactory: factory,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package main
import "github.com/chef/automate/components/ingest-service/cmd/ingest-service/commands"

func main() {

// Start the ingest service
commands.Execute()

}
25 changes: 12 additions & 13 deletions components/ingest-service/config/old_job_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/chef/automate/components/ingest-service/serveropts"
toml "github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/spf13/viper"
)

// Enum jobs
Expand Down Expand Up @@ -54,21 +53,21 @@ type Storage struct {
MaxIdleConns int `mapstructure:"max_idle_conns"`
}

func Load(configPath string) (*Config, error) {
v := viper.New()
v.SetConfigFile(configPath)
// func Load(configPath string) (*Config, error) {
// v := viper.New()
// v.SetConfigFile(configPath)

if err := v.ReadInConfig(); err != nil {
return nil, errors.Wrap(err, "error reading config file")
}
// if err := v.ReadInConfig(); err != nil {
// return nil, errors.Wrap(err, "error reading config file")
// }

var cfg Config
if err := v.Unmarshal(&cfg); err != nil {
return nil, errors.Wrap(err, "error unmarshaling config")
}
// var cfg Config
// if err := v.Unmarshal(&cfg); err != nil {
// return nil, errors.Wrap(err, "error unmarshaling config")
// }

return &cfg, nil
}
// return &cfg, nil
// }

// ConfigForJob returns the configuration for the given job index. The
// job indexes are constants in this package and reflect this
Expand Down
22 changes: 20 additions & 2 deletions components/ingest-service/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"github.com/chef/automate/api/interservice/nodemanager/nodes"
"github.com/chef/automate/components/ingest-service/backend"
"github.com/chef/automate/components/ingest-service/backend/elastic"
"github.com/chef/automate/components/ingest-service/config"
"github.com/chef/automate/components/ingest-service/migration"
"github.com/chef/automate/components/ingest-service/pipeline"
"github.com/chef/automate/components/ingest-service/server"
"github.com/chef/automate/components/ingest-service/serveropts"
"github.com/chef/automate/components/ingest-service/storage"
project_update_lib "github.com/chef/automate/lib/authz"
"github.com/chef/automate/lib/cereal"
"github.com/chef/automate/lib/cereal/postgres"
Expand Down Expand Up @@ -67,15 +69,31 @@ func Spawn(opts *serveropts.Opts) error {
}

pgURL, err := pgURL(opts.PGURL, opts.PGDatabase)
if err != nil {
if err != nil || pgURL == "" {
log.WithError(err).Fatal("could not get PG URL")
return err
return fmt.Errorf("invalid database URL")
}

if opts.Storage.SchemaPath == "" {
log.Fatal("Schema path is missing in configuration")
return fmt.Errorf("schema path cannot be empty")
}

db, err := libdb.PGOpen(pgURL)
if err != nil {
return errors.Wrapf(err, "Failed to open database with uri: %s", pgURL)
}
storageConfig := &config.Storage{
URI: pgURL,
SchemaPath: opts.Storage.SchemaPath,
}

err = storage.RunMigrations(storageConfig) // Call the migration function
if err != nil {
log.WithError(err).Fatal("Migration failed")
return err
}
log.Info("Database connection and migrations successful!")

// Authz Interface
authzConn, err := opts.ConnFactory.Dial("authz-service", opts.AuthzAddress)
Expand Down
179 changes: 98 additions & 81 deletions components/ingest-service/storage/db.go
Original file line number Diff line number Diff line change
@@ -1,98 +1,115 @@
package storage
// package storage

import (
"database/sql"
"time"
// import (
// "database/sql"
// "time"

"github.com/go-gorp/gorp"
_ "github.com/lib/pq"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
// "github.com/go-gorp/gorp"
// _ "github.com/lib/pq"
// "github.com/pkg/errors"
// log "github.com/sirupsen/logrus"

"github.com/chef/automate/components/ingest-service/config"
libdb "github.com/chef/automate/lib/db"
"github.com/chef/automate/lib/db/migrator"
"github.com/chef/automate/lib/logger"
)
// "github.com/chef/automate/components/ingest-service/config"
// libdb "github.com/chef/automate/lib/db"
// "github.com/chef/automate/lib/db/migrator"
// "github.com/chef/automate/lib/logger"
// )

type DB struct {
*gorp.DbMap
}
// type DB struct {
// *gorp.DbMap
// }

// ReindexRequest represents a row in the reindex_requests table
type ReindexRequest struct {
RequestID int `db:"request_id"`
Status string `db:"status"`
CreatedAt time.Time `db:"created_at"`
LastUpdated time.Time `db:"last_updated"`
}
// // ReindexRequest represents a row in the reindex_requests table
// type ReindexRequest struct {
// RequestID int `db:"request_id"`
// Status string `db:"status"`
// CreatedAt time.Time `db:"created_at"`
// LastUpdated time.Time `db:"last_updated"`
// }

// ReindexRequestDetailed represents a row in the reindex_request_detailed table
type ReindexRequestDetailed struct {
ID int `db:"id"`
RequestID int `db:"request_id"`
Index string `db:"index"`
FromVersion string `db:"from_version"`
ToVersion string `db:"to_version"`
Stage string `db:"stage"`
OsTaskID string `db:"os_task_id"`
Heartbeat time.Time `db:"heartbeat"`
HavingAlias bool `db:"having_alias"`
AliasList string `db:"alias_list"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
// // ReindexRequestDetailed represents a row in the reindex_request_detailed table
// type ReindexRequestDetailed struct {
// ID int `db:"id"`
// RequestID int `db:"request_id"`
// Index string `db:"index"`
// FromVersion string `db:"from_version"`
// ToVersion string `db:"to_version"`
// Stage string `db:"stage"`
// OsTaskID string `db:"os_task_id"`
// Heartbeat time.Time `db:"heartbeat"`
// HavingAlias bool `db:"having_alias"`
// AliasList string `db:"alias_list"`
// CreatedAt time.Time `db:"created_at"`
// UpdatedAt time.Time `db:"updated_at"`
// }

// ConnectAndMigrate creates a new PostgreSQL connection, connects to the database server, and runs migrations
func ConnectAndMigrate(dbConf *config.Storage) (*DB, error) {
dbConn, err := connect(dbConf)
if err != nil {
return nil, err
}
log.WithFields(log.Fields{
"uri": dbConf.URI,
"schema": dbConf.SchemaPath,
}).Debug("Initializing database")

err = runMigrations(dbConf)
if err != nil {
return nil, errors.Wrap(err, "Migration failed")
}
// // ConnectAndMigrate creates a new PostgreSQL connection, connects to the database server, and runs migrations
// func ConnectAndMigrate(dbConf *config.Storage) (*DB, error) {
// dbConn, err := connect(dbConf)
// if err != nil {
// return nil, err
// }
// log.WithFields(log.Fields{
// "uri": dbConf.URI,
// "schema": dbConf.SchemaPath,
// }).Debug("Initializing database")

db := &DB{
DbMap: &gorp.DbMap{Db: dbConn, Dialect: gorp.PostgresDialect{}},
}
return db, nil
}
// err = runMigrations(dbConf)
// if err != nil {
// return nil, errors.Wrap(err, "Migration failed")
// }

// connect opens a connection to the database
func connect(dbConf *config.Storage) (*sql.DB, error) {
log.WithFields(log.Fields{
"uri": dbConf.URI,
}).Debug("Connecting to PostgreSQL")
// db := &DB{
// DbMap: &gorp.DbMap{Db: dbConn, Dialect: gorp.PostgresDialect{}},
// }
// return db, nil
// }

dbconn, err := libdb.PGOpen(dbConf.URI)
if err != nil {
return nil, errors.Wrapf(err, "Failed to open database with uri: %s", dbConf.URI)
}
// // connect opens a connection to the database
// func connect(dbConf *config.Storage) (*sql.DB, error) {
// log.WithFields(log.Fields{
// "uri": dbConf.URI,
// }).Debug("Connecting to PostgreSQL")

if dbConf.MaxIdleConns > 0 {
dbconn.SetMaxIdleConns(dbConf.MaxIdleConns)
}
if dbConf.MaxOpenConns > 0 {
dbconn.SetMaxOpenConns(dbConf.MaxOpenConns)
}
// dbconn, err := libdb.PGOpen(dbConf.URI)
// if err != nil {
// return nil, errors.Wrapf(err, "Failed to open database with uri: %s", dbConf.URI)
// }

// ping database
err = dbconn.Ping()
if err != nil {
return nil, errors.Wrapf(err, "Failed to ping database with uri: %s", dbConf.URI)
}
// if dbConf.MaxIdleConns > 0 {
// dbconn.SetMaxIdleConns(dbConf.MaxIdleConns)
// }
// if dbConf.MaxOpenConns > 0 {
// dbconn.SetMaxOpenConns(dbConf.MaxOpenConns)
// }

return dbconn, nil
}
// // ping database
// err = dbconn.Ping()
// if err != nil {
// return nil, errors.Wrapf(err, "Failed to ping database with uri: %s", dbConf.URI)
// }

// return dbconn, nil
// }

// func runMigrations(dbConf *config.Storage) error {
// if err := migrator.Migrate(dbConf.URI, dbConf.SchemaPath, logger.NewLogrusStandardLogger(), false); err != nil {
// return errors.Wrapf(err, "Unable to create database schema. [path:%s]", dbConf.SchemaPath)
// }
// return nil
// }

package storage

import (
"github.com/chef/automate/components/ingest-service/config"
"github.com/chef/automate/lib/db/migrator"
"github.com/chef/automate/lib/logger"
"github.com/pkg/errors"
)

func runMigrations(dbConf *config.Storage) error {
// RunMigrations runs the migration scripts from the schema path
func RunMigrations(dbConf *config.Storage) error {
if err := migrator.Migrate(dbConf.URI, dbConf.SchemaPath, logger.NewLogrusStandardLogger(), false); err != nil {
return errors.Wrapf(err, "Unable to create database schema. [path:%s]", dbConf.SchemaPath)
}
Expand Down

0 comments on commit ebf35de

Please sign in to comment.