Skip to content
This repository was archived by the owner on Mar 31, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/evanphx/json-patch/v5 v5.8.1
github.com/go-logr/logr v1.3.0
github.com/go-logr/zapr v1.2.4
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.14.0
go.uber.org/zap v1.26.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
Expand Down Expand Up @@ -272,8 +274,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
176 changes: 129 additions & 47 deletions pkg/pluginhelper/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package pluginhelper

import (
"context"
"errors"
"net"
"os"
"os/signal"
"path"
"syscall"

"github.com/cloudnative-pg/cnpg-i/pkg/identity"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
Expand All @@ -31,53 +37,7 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri
},
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
logger := logging.FromContext(cmd.Context())

identityResponse, err := identityImpl.GetPluginMetadata(
cmd.Context(),
&identity.GetPluginMetadataRequest{})
if err != nil {
logger.Error(err, "Error while querying the identity service")
return err
}

pluginPath := viper.GetString("plugin-path")
pluginName := identityResponse.Name
pluginDisplayName := identityResponse.DisplayName
pluginVersion := identityResponse.Version
socketName := path.Join(pluginPath, identityResponse.Name)

grpcServer := grpc.NewServer()
identity.RegisterIdentityServer(
grpcServer,
identityImpl)
for _, enrich := range enrichers {
enrich(grpcServer)
}

listener, err := net.Listen(
unixNetwork,
socketName,
)
if err != nil {
logger.Error(err, "While starting server")
return err
}

logger.Info(
"Starting plugin",
"path", pluginPath,
"name", pluginName,
"displayName", pluginDisplayName,
"version", pluginVersion,
"socketName", socketName,
)
err = grpcServer.Serve(listener)
if err != nil {
logger.Error(err, "While terminatind server")
}

return err
return run(cmd.Context(), identityImpl, enrichers...)
},
}

Expand All @@ -97,3 +57,125 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri

return cmd
}

// run starts listining for GRPC requests
func run(ctx context.Context, identityImpl identity.IdentityServer, enrichers ...ServerEnricher) error {
logger := logging.FromContext(ctx)

identityResponse, err := identityImpl.GetPluginMetadata(
ctx,
&identity.GetPluginMetadataRequest{})
if err != nil {
logger.Error(err, "Error while querying the identity service")
return err
}

pluginPath := viper.GetString("plugin-path")
pluginName := identityResponse.Name
pluginDisplayName := identityResponse.DisplayName
pluginVersion := identityResponse.Version
socketName := path.Join(pluginPath, identityResponse.Name)

// Remove stale unix socket it still existent
if err := removeStaleSocket(ctx, socketName); err != nil {
logger.Error(err, "While removing old unix socket")
return err
}

// Start accepting connections on the socket
listener, err := net.Listen(
unixNetwork,
socketName,
)
if err != nil {
logger.Error(err, "While starting server")
return err
}

// Handle quit-like signal
handleSignals(ctx, listener)

// Create GRPC server
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandlerContext(panicRecoveryHandler(listener))),
),
grpc.ChainStreamInterceptor(
recovery.StreamServerInterceptor(recovery.WithRecoveryHandlerContext(panicRecoveryHandler(listener))),
),
)
identity.RegisterIdentityServer(
grpcServer,
identityImpl)
for _, enrich := range enrichers {
enrich(grpcServer)
}

logger.Info(
"Starting plugin",
"path", pluginPath,
"name", pluginName,
"displayName", pluginDisplayName,
"version", pluginVersion,
"socketName", socketName,
)

if err = grpcServer.Serve(listener); !errors.Is(err, net.ErrClosed) {
logger.Error(err, "While terminating server")
}

return nil
}

// removeStaleSocket removes a stale unix domain socket
func removeStaleSocket(ctx context.Context, pluginPath string) error {
logger := logging.FromContext(ctx)
_, err := os.Stat(pluginPath)

switch {
case err == nil:
logger.Info("Removing stale socket", "pluginPath", pluginPath)
return os.Remove(pluginPath)

case errors.Is(err, os.ErrNotExist):
return nil

default:
return err
}
}

// handleSignals makes sure that we close the listening socket
// when we receive a quit-like signal
func handleSignals(ctx context.Context, listener net.Listener) {
logger := logging.FromContext(ctx)

sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGABRT, syscall.SIGINT)
go func(c chan os.Signal) {
sig := <-c
logger.Info(
"Caught signal, shutting down.",
"signal", sig.String())

if err := listener.Close(); err != nil {
logger.Error(err, "While stopping server")
}

os.Exit(1)
}(sigc)
}

func panicRecoveryHandler(listener net.Listener) recovery.RecoveryHandlerFuncContext {
return func(ctx context.Context, err any) error {
logger := logging.FromContext(ctx)
logger.Info("Panic occurred", "error", err)

if closeError := listener.Close(); closeError != nil {
logger.Error(closeError, "While stopping server")
}

os.Exit(1)
return nil
}
}