go/grpshuffle_server/main.go (114 lines of code) (raw):

package main import ( "fmt" "google.golang.org/grpc/reflection" "log" "net" "net/http" "os" "time" "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/korosuke613/grpshuffle/go/grpshuffle" grpshuffleServer "github.com/korosuke613/grpshuffle/go/grpshuffle_server/lib" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v2" "go.uber.org/zap" "google.golang.org/grpc" health "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" ) func extractFields(fullMethod string, req interface{}) map[string]interface{} { ret := make(map[string]interface{}) switch args := req.(type) { case *grpshuffle.ShuffleRequest: ret["Divide"] = args.Divide ret["Targets"] = args.Targets default: return nil } return ret } func main() { app := &cli.App{ Name: "grpshuffle-server", Usage: "Server of groshuffle", Action: func(c *cli.Context) error { kep := keepalive.EnforcementPolicy{ MinTime: 60 * time.Second, } logger, _ := zap.NewProduction() serv := grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kep), grpc.StreamInterceptor( grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(extractFields)), grpc_zap.StreamServerInterceptor(logger), grpc_prometheus.StreamServerInterceptor, ), ), grpc.UnaryInterceptor( grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(extractFields)), grpc_zap.UnaryServerInterceptor(logger), grpc_prometheus.UnaryServerInterceptor, ), ), ) grpshuffle.RegisterComputeServer(serv, &grpshuffleServer.Server{}) health.RegisterHealthServer(serv, &grpshuffleServer.HealthServer{}) grpc_prometheus.Register(serv) // Register Server Reflection // https://github.com/grpc/grpc-go/blob/91967153f567adc812d8da223ef984d02a3664ed/Documentation/server-reflection-tutorial.md reflection.Register(serv) if grpshuffleServer.PrometheusEnable { http.Handle("/metrics", promhttp.Handler()) prometheusAddr := fmt.Sprintf(":%v", grpshuffleServer.PrometheusPort) go func() { log.Printf("Launch Prometheus server %v...", grpshuffleServer.PrometheusPort) if err := http.ListenAndServe(prometheusAddr, nil); err != nil { panic(err) } }() } log.Printf("Launch grpshuffle server %v...", grpshuffleServer.Port) l, err := net.Listen("tcp", fmt.Sprintf(":%d", grpshuffleServer.Port)) if err != nil { return fmt.Errorf("failed to listen:%v", err) } err = serv.Serve(l) if err != nil { return fmt.Errorf("failed to serve:%v", err) } return nil }, Flags: []cli.Flag{ &cli.IntFlag{ Name: "port", Aliases: []string{"P"}, Usage: "`PORT` of server", EnvVars: []string{"GRPSHUFFLE_PORT"}, Value: 13333, Destination: &grpshuffleServer.Port, }, &cli.BoolFlag{ Name: "prometheus-enable", Usage: "With this option, serve Prometheus", EnvVars: []string{"GRPSHUFFLE_PROMETHEUS_ENABLE"}, Value: false, Destination: &grpshuffleServer.PrometheusEnable, }, &cli.IntFlag{ Name: "prometheus-port", Usage: "`PORT` of prometheus", EnvVars: []string{"GRPSHUFFLE_PROMETHEUS_PORT"}, Value: 8081, Destination: &grpshuffleServer.PrometheusPort, }, }, } err := app.Run(os.Args) if err != nil { log.Fatal(err) } }