diff --git a/pkg/apiserver/integration_test.go b/pkg/apiserver/integration_test.go new file mode 100644 index 00000000..c74407c1 --- /dev/null +++ b/pkg/apiserver/integration_test.go @@ -0,0 +1,340 @@ +package apiserver + +import ( + "context" + "crypto/tls" + "database/sql" + "fmt" + "io" + "net" + "net/http" + "path/filepath" + "strings" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/admission" + kuser "k8s.io/apiserver/pkg/authentication/user" + "sigs.k8s.io/apiserver-runtime/pkg/builder/resource" + + corev1alpha3 "github.com/apoxy-dev/apoxy/api/core/v1alpha3" + a3yclient "github.com/apoxy-dev/apoxy/client/versioned" + "github.com/apoxy-dev/apoxy/pkg/apiserver/auth" +) + +type testServer struct { + addr string + cancel context.CancelFunc + http *http.Client +} + +type observedIdentity struct { + name string + groups []string +} + +type authRecorderPlugin struct { + *admission.Handler + + denyAnonymous bool + observed chan observedIdentity +} + +var _ admission.ValidationInterface = &authRecorderPlugin{} + +func (p *authRecorderPlugin) ValidateInitialization() error { + return nil +} + +func (p *authRecorderPlugin) Validate(_ context.Context, a admission.Attributes, _ admission.ObjectInterfaces) error { + info := a.GetUserInfo() + identity := observedIdentity{} + if info != nil { + identity.name = info.GetName() + identity.groups = append(identity.groups, info.GetGroups()...) + } + + select { + case p.observed <- identity: + default: + } + + if p.denyAnonymous && (info == nil || info.GetName() == kuser.Anonymous) { + return admission.NewForbidden(a, fmt.Errorf("anonymous user is not allowed")) + } + + return nil +} + +func TestAPIServerIntegrationReadyzAndHealthz(t *testing.T) { + srv := startTestServer(t) + t.Cleanup(srv.cancel) + + assertHTTPStatus(t, srv.http, "https://"+srv.addr+"/readyz", http.StatusOK) + assertHTTPStatus(t, srv.http, "https://"+srv.addr+"/healthz", http.StatusOK) +} + +func TestAPIServerIntegrationDomainRecordCRUDAndSQLite(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "apiserver.db") + srv := startTestServer(t, WithSQLitePath(dbPath)) + t.Cleanup(srv.cancel) + + client := newClientset(t, srv.addr) + records := client.CoreV1alpha3().DomainRecords() + + record := &corev1alpha3.DomainRecord{ + ObjectMeta: metav1.ObjectMeta{Name: "api.example.com--a"}, + Spec: corev1alpha3.DomainRecordSpec{ + Name: "api.example.com", + TTL: int32Ptr(60), + Target: corev1alpha3.DomainRecordTarget{ + DNS: &corev1alpha3.DomainRecordTargetDNS{ + A: []string{"192.0.2.10"}, + }, + }, + }, + } + + created, err := records.Create(context.Background(), record, metav1.CreateOptions{}) + require.NoError(t, err) + require.Equal(t, "api.example.com--a", created.Name) + + got, err := records.Get(context.Background(), created.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, int32(60), *got.Spec.TTL) + + list, err := records.List(context.Background(), metav1.ListOptions{}) + require.NoError(t, err) + require.Len(t, list.Items, 1) + + updated := got.DeepCopy() + updated.Spec.TTL = int32Ptr(600) + updated, err = records.Update(context.Background(), updated, metav1.UpdateOptions{}) + require.NoError(t, err) + require.Equal(t, int32(600), *updated.Spec.TTL) + + assertSQLiteObjectRow(t, dbPath, created.Name, false, `"ttl":600`) + + srv.cancel() + + restarted := startTestServer(t, WithSQLitePath(dbPath)) + t.Cleanup(restarted.cancel) + + restartedClient := newClientset(t, restarted.addr) + persisted, err := restartedClient.CoreV1alpha3().DomainRecords().Get(context.Background(), created.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, int32(600), *persisted.Spec.TTL) + + require.NoError(t, restartedClient.CoreV1alpha3().DomainRecords().Delete(context.Background(), created.Name, metav1.DeleteOptions{})) + _, err = restartedClient.CoreV1alpha3().DomainRecords().Get(context.Background(), created.Name, metav1.GetOptions{}) + require.True(t, apierrors.IsNotFound(err), "expected delete to remove record, got %v", err) + + assertSQLiteObjectRow(t, dbPath, created.Name, true, "") +} + +func TestAPIServerIntegrationSimpleAuth(t *testing.T) { + observed := make(chan observedIdentity, 8) + pluginFactory := func(io.Reader) (admission.Interface, error) { + return &authRecorderPlugin{ + Handler: admission.NewHandler(admission.Create), + denyAnonymous: true, + observed: observed, + }, nil + } + + srv := startTestServer( + t, + WithSimpleAuth(), + WithAdmissionPlugin("test-auth-recorder", pluginFactory), + ) + t.Cleanup(srv.cancel) + + authenticated := newClientset( + t, + srv.addr, + WithTransportWrapper(auth.NewTransportWrapperFunc("integration-user", []string{"integration-group"}, nil)), + ) + + record := &corev1alpha3.DomainRecord{ + ObjectMeta: metav1.ObjectMeta{Name: "auth.example.com--a"}, + Spec: corev1alpha3.DomainRecordSpec{ + Name: "auth.example.com", + Target: corev1alpha3.DomainRecordTarget{ + DNS: &corev1alpha3.DomainRecordTargetDNS{A: []string{"192.0.2.20"}}, + }, + }, + } + + _, err := authenticated.CoreV1alpha3().DomainRecords().Create(context.Background(), record, metav1.CreateOptions{}) + require.NoError(t, err) + identity := receiveIdentity(t, observed) + require.Equal(t, "integration-user", identity.name) + require.NotEqual(t, kuser.Anonymous, identity.name) + + anonymous := newClientset(t, srv.addr) + record = &corev1alpha3.DomainRecord{ + ObjectMeta: metav1.ObjectMeta{Name: "anonymous.example.com--a"}, + Spec: corev1alpha3.DomainRecordSpec{ + Name: "anonymous.example.com", + Target: corev1alpha3.DomainRecordTarget{ + DNS: &corev1alpha3.DomainRecordTargetDNS{A: []string{"192.0.2.21"}}, + }, + }, + } + + _, err = anonymous.CoreV1alpha3().DomainRecords().Create(context.Background(), record, metav1.CreateOptions{}) + require.True(t, apierrors.IsForbidden(err), "expected anonymous create to be rejected by admission, got %v", err) + identity = receiveIdentity(t, observed) + require.Equal(t, kuser.Anonymous, identity.name) + require.Contains(t, identity.groups, kuser.AllUnauthenticated) +} + +func TestAPIServerIntegrationDomainRecordDefaultingAndValidation(t *testing.T) { + srv := startTestServer(t) + t.Cleanup(srv.cancel) + + records := newClientset(t, srv.addr).CoreV1alpha3().DomainRecords() + + created, err := records.Create(context.Background(), &corev1alpha3.DomainRecord{ + Spec: corev1alpha3.DomainRecordSpec{ + Name: "defaulted.example.com", + Target: corev1alpha3.DomainRecordTarget{ + DNS: &corev1alpha3.DomainRecordTargetDNS{ + A: []string{"192.0.2.30"}, + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + require.Equal(t, "defaulted.example.com--a", created.Name) + require.NotNil(t, created.Spec.TTL) + require.Equal(t, int32(300), *created.Spec.TTL) + require.Equal(t, "A", created.Status.Type) + + invalid := created.DeepCopy() + invalid.Spec.Name = "renamed.example.com" + _, err = records.Update(context.Background(), invalid, metav1.UpdateOptions{}) + require.True(t, apierrors.IsInvalid(err), "expected immutable-field update to be invalid, got %v", err) + require.Contains(t, err.Error(), "field is immutable after creation") +} + +func startTestServer(t *testing.T, opts ...Option) *testServer { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + + serverOpts, err := defaultOptions(ctx) + require.NoError(t, err) + + serverOpts.resources = []resource.Object{&corev1alpha3.DomainRecord{}} + serverOpts.sqlitePath = filepath.Join(t.TempDir(), "apiserver.db") + serverOpts.bindAddress = "127.0.0.1" + serverOpts.bindPort = reserveTCPPort(t) + + for _, opt := range opts { + opt(serverOpts) + } + + if len(serverOpts.resources) == 0 { + serverOpts.resources = []resource.Object{&corev1alpha3.DomainRecord{}} + } + + require.NoError(t, start(ctx, serverOpts)) + + addr := net.JoinHostPort(serverOpts.loopbackHost(), fmt.Sprintf("%d", serverOpts.bindPort)) + return &testServer{ + addr: addr, + cancel: cancel, + http: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: 5 * time.Second, + }, + } +} + +func newClientset(t *testing.T, addr string, opts ...ClientOption) *a3yclient.Clientset { + t.Helper() + + clientOpts := []ClientOption{WithClientHost(addr)} + clientOpts = append(clientOpts, opts...) + + cfg := NewClientConfig(clientOpts...) + clientset, err := a3yclient.NewForConfig(cfg) + require.NoError(t, err) + return clientset +} + +func reserveTCPPort(t *testing.T) int { + t.Helper() + + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer l.Close() + + return l.Addr().(*net.TCPAddr).Port +} + +func assertHTTPStatus(t *testing.T, client *http.Client, url string, want int) { + t.Helper() + + resp, err := client.Get(url) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, want, resp.StatusCode) +} + +func assertSQLiteObjectRow(t *testing.T, dbPath, name string, deleted bool, wantValueSubstring string) { + t.Helper() + + db, err := sql.Open("sqlite3", dbPath) + require.NoError(t, err) + defer db.Close() + + const query = ` +SELECT + deleted, + COALESCE(CAST(value AS TEXT), ''), + COALESCE(CAST(old_value AS TEXT), '') +FROM kine +WHERE + COALESCE(CAST(value AS TEXT), '') LIKE ? + OR COALESCE(CAST(old_value AS TEXT), '') LIKE ? +ORDER BY id DESC +LIMIT 1` + + pattern := fmt.Sprintf("%%%q%%", name) + var deletedInt int + var value string + var oldValue string + require.NoError(t, db.QueryRow(query, pattern, pattern).Scan(&deletedInt, &value, &oldValue)) + require.Equal(t, deleted, deletedInt == 1) + + if wantValueSubstring != "" { + require.Contains(t, value, wantValueSubstring) + } + if deleted { + require.True(t, strings.Contains(value, name) || strings.Contains(oldValue, name)) + } +} + +func receiveIdentity(t *testing.T, observed <-chan observedIdentity) observedIdentity { + t.Helper() + + select { + case identity := <-observed: + return identity + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for admission plugin to observe user identity") + return observedIdentity{} + } +} + +func int32Ptr(v int32) *int32 { + return &v +} diff --git a/pkg/apiserver/manager.go b/pkg/apiserver/manager.go index 0513be6b..c0553807 100644 --- a/pkg/apiserver/manager.go +++ b/pkg/apiserver/manager.go @@ -4,12 +4,13 @@ import ( "context" "crypto/tls" "errors" - "flag" "fmt" "io" + "net" "net/http" "os" "path/filepath" + "strconv" "strings" "time" @@ -35,7 +36,6 @@ import ( "k8s.io/klog/v2" "k8s.io/kube-openapi/pkg/common" netutils "k8s.io/utils/net" - "sigs.k8s.io/apiserver-runtime/pkg/builder" "sigs.k8s.io/apiserver-runtime/pkg/builder/resource" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -50,6 +50,7 @@ import ( "github.com/apoxy-dev/apoxy/pkg/apiserver/controllers" extensionscontroller "github.com/apoxy-dev/apoxy/pkg/apiserver/extensions" "github.com/apoxy-dev/apoxy/pkg/apiserver/gateway" + builder "github.com/apoxy-dev/apoxy/pkg/apiserver/server/builder" "github.com/apoxy-dev/apoxy/pkg/cryptoutils" "github.com/apoxy-dev/apoxy/pkg/gateway/message" statusrunner "github.com/apoxy-dev/apoxy/pkg/gateway/status/runner" @@ -131,6 +132,7 @@ func registerCrossVersionConversions(s *runtime.Scheme) error { func waitForReadyz(url string, timeout time.Duration) error { t := time.NewTimer(timeout) + defer t.Stop() retryTimeout := 200 * time.Millisecond client := &http.Client{ Transport: &http.Transport{ @@ -142,8 +144,12 @@ func waitForReadyz(url string, timeout time.Duration) error { for { resp, err := client.Get(url + "/readyz") if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() return nil } + if resp != nil { + resp.Body.Close() + } log.Debugf("failed readyz request: %v", err) @@ -225,6 +231,8 @@ type options struct { sqlitePath string sqliteConnArgs map[string]string certPairName, certDir string + bindAddress string + bindPort int enableKubeAPI bool controllerNames []string additionalControllers []CreateController @@ -359,6 +367,20 @@ func WithSQLiteConnArgs(args map[string]string) Option { } } +// WithBindAddress sets the secure serving bind address. +func WithBindAddress(address string) Option { + return func(o *options) { + o.bindAddress = address + } +} + +// WithBindPort sets the secure serving bind port. +func WithBindPort(port int) Option { + return func(o *options) { + o.bindPort = port + } +} + // WithKubeAPI enables the Kubernetes API. func WithKubeAPI() Option { return func(o *options) { @@ -539,6 +561,8 @@ func defaultOptions(ctx context.Context) (*options, error) { enableInClusterAuth: false, certDir: "", certPairName: "tls", + bindAddress: "0.0.0.0", + bindPort: 8443, gcInterval: 10 * time.Minute, @@ -562,6 +586,21 @@ func defaultOptions(ctx context.Context) (*options, error) { return opts, nil } +func (o *options) loopbackHost() string { + ip := net.ParseIP(o.bindAddress) + if ip != nil && ip.IsUnspecified() { + return "localhost" + } + if o.bindAddress == "" { + return "localhost" + } + return o.bindAddress +} + +func (o *options) loopbackHostPort() string { + return net.JoinHostPort(o.loopbackHost(), strconv.Itoa(o.bindPort)) +} + // Manager manages APIServer instance as well as built-in controllers. type Manager struct { ReadyCh chan error @@ -805,25 +844,28 @@ func start( } } else { log.Infof("Using certificate pair name %q and directory %q", opts.certPairName, opts.certDir) - serverCertFile := filepath.Join(opts.certDir, opts.certPairName+".crt") + serverCertFile = filepath.Join(opts.certDir, opts.certPairName+".crt") if _, err := os.Stat(serverCertFile); err != nil { return fmt.Errorf("failed to stat server certificate file %q: %v", serverCertFile, err) } - serverKeyFile := filepath.Join(opts.certDir, opts.certPairName+".key") + serverKeyFile = filepath.Join(opts.certDir, opts.certPairName+".key") if _, err := os.Stat(serverKeyFile); err != nil { return fmt.Errorf("failed to stat server key file %q: %v", serverKeyFile, err) } - serverCAFile := filepath.Join(opts.certDir, "ca.crt") + serverCAFile = filepath.Join(opts.certDir, "ca.crt") if _, err := os.Stat(serverCAFile); err != nil { return fmt.Errorf("failed to stat server CA file %q: %v", serverCAFile, err) } } // Create client for communicating with the API server locally. - clientConfig := NewClientConfig() + clientConfig := NewClientConfig(WithClientHost(opts.loopbackHostPort())) if opts.enableSimpleAuth { w := auth.NewTransportWrapperFunc(apiserverUser, []string{user.SystemPrivilegedGroup}, nil) - clientConfig = NewClientConfig(WithTransportWrapper(w)) + clientConfig = NewClientConfig( + WithClientHost(opts.loopbackHostPort()), + WithTransportWrapper(w), + ) } else if opts.enableInClusterAuth { log.Infof("Using in-cluster configuration") @@ -846,6 +888,7 @@ func start( return fmt.Errorf("failed to create x509 authenticator: %v", err) } clientConfig = NewClientConfig( + WithClientHost(opts.loopbackHostPort()), WithClientTLSConfig(rest.TLSClientConfig{ CertFile: clientCertFile, KeyFile: clientKeyFile, @@ -854,16 +897,6 @@ func start( ) } - // Reset flags. APIServer cmd expects its own flagset. - flag.CommandLine = flag.NewFlagSet("apiserver", flag.ExitOnError) - os.Args = append( - []string{ - os.Args[0], - // Disable API priority and fairness (flow control) which doesn't work anyway. - "--enable-priority-and-fairness=false", - }, - flag.Args()...) // Keep non-flag arguments. - l := log.New(config.Verbose) ctrl.SetLogger(l) klog.SetLogger(l) @@ -878,158 +911,138 @@ func start( } logrus.SetOutput(log.NewDefaultLogWriter(kineLogLevel)) - readyCh := make(chan error) - go func() { - if opts.sqlitePath != "" && !strings.Contains(opts.sqlitePath, ":memory:") { - if _, err := os.Stat(opts.sqlitePath); os.IsNotExist(err) { - if err := os.MkdirAll(filepath.Dir(opts.sqlitePath), 0755); err != nil { - readyCh <- fmt.Errorf("failed to create database directory: %v", err) - return - } - if _, err := os.Create(opts.sqlitePath); err != nil { - readyCh <- fmt.Errorf("failed to create database file: %v", err) - return - } + if opts.sqlitePath != "" && !strings.Contains(opts.sqlitePath, ":memory:") { + if _, err := os.Stat(opts.sqlitePath); os.IsNotExist(err) { + if err := os.MkdirAll(filepath.Dir(opts.sqlitePath), 0755); err != nil { + return fmt.Errorf("failed to create database directory: %v", err) + } + if _, err := os.Create(opts.sqlitePath); err != nil { + return fmt.Errorf("failed to create database file: %v", err) } } - kineStore, err := NewKineStorage(ctx, opts.sqlitePath, opts.sqliteConnArgs, kineLogFormat) - if err != nil { - readyCh <- fmt.Errorf("failed to create kine storage: %w", err) - return - } - - srvBuilder := builder.APIServer. - WithAdditionalSchemeInstallers(registerCrossVersionConversions, registerFieldLabelConversions) - for _, r := range opts.resources { - srvBuilder = srvBuilder.WithResourceAndStorage(r, kineStore) - } - // Use custom OpenAPI definitions if provided, otherwise use the default. - openAPIGetter := opts.openAPIDefinitions - if openAPIGetter == nil { - openAPIGetter = apoxyopenapi.GetOpenAPIDefinitions - } - if err := srvBuilder. - WithOpenAPIDefinitions("apoxy", "0.1.0", openAPIGetter). - DisableAuthorization(). - WithOptionsFns(func(o *builder.ServerOptions) *builder.ServerOptions { - o.StdErr = io.Discard - o.StdOut = io.Discard - - o.RecommendedOptions.CoreAPI = nil - o.RecommendedOptions.Authentication = nil - o.RecommendedOptions.Authorization = nil + } + kineStore, err := NewKineStorage(ctx, opts.sqlitePath, opts.sqliteConnArgs, kineLogFormat) + if err != nil { + return fmt.Errorf("failed to create kine storage: %w", err) + } - // Enable admission plugin chain. CoreAPI is nil so the - // default webhook/lifecycle plugins cannot be used; only - // custom in-process plugins registered via - // WithAdmissionPlugin will run. - admissionOpts := &apiserveropts.AdmissionOptions{ - Plugins: admission.NewPlugins(), - RecommendedPluginOrder: make([]string, 0, len(opts.admissionPlugins)), - } - for _, p := range opts.admissionPlugins { - admissionOpts.Plugins.Register(p.name, p.factory) - admissionOpts.RecommendedPluginOrder = append( - admissionOpts.RecommendedPluginOrder, p.name) - } - o.RecommendedOptions.Admission = admissionOpts - - // Inject the Apoxy SharedInformerFactory into admission - // plugins that implement WantsApoxyInformerFactory. - o.RecommendedOptions.ExtraAdmissionInitializers = func( - c *apiserver.RecommendedConfig, - ) ([]admission.PluginInitializer, error) { - a3yClient, err := a3yclient.NewForConfig(c.ClientConfig) - if err != nil { - return nil, fmt.Errorf("failed to create apoxy client for admission: %w", err) - } - a3yFactory := a3yinformers.NewSharedInformerFactory(a3yClient, 0) - return []admission.PluginInitializer{ - a3yadmission.New(a3yFactory, a3yClient), - }, nil + srvBuilder := builder.NewServerBuilder(). + WithAdditionalSchemeInstallers(registerCrossVersionConversions, registerFieldLabelConversions) + for _, r := range opts.resources { + srvBuilder = srvBuilder.WithResourceAndStorage(r, kineStore) + } + // Use custom OpenAPI definitions if provided, otherwise use the default. + openAPIGetter := opts.openAPIDefinitions + if openAPIGetter == nil { + openAPIGetter = apoxyopenapi.GetOpenAPIDefinitions + } + serverOpts, err := srvBuilder. + WithOpenAPIDefinitions("apoxy", "0.1.0", openAPIGetter). + DisableAuthorization(). + WithOptionsFns(func(o *builder.ServerOptions) *builder.ServerOptions { + o.StdErr = io.Discard + o.StdOut = io.Discard + + o.RecommendedOptions.CoreAPI = nil + o.RecommendedOptions.Authentication = nil + o.RecommendedOptions.Authorization = nil + + // Enable admission plugin chain. CoreAPI is nil so the + // default webhook/lifecycle plugins cannot be used; only + // custom in-process plugins registered via + // WithAdmissionPlugin will run. + admissionOpts := &apiserveropts.AdmissionOptions{ + Plugins: admission.NewPlugins(), + RecommendedPluginOrder: make([]string, 0, len(opts.admissionPlugins)), + } + for _, p := range opts.admissionPlugins { + admissionOpts.Plugins.Register(p.name, p.factory) + admissionOpts.RecommendedPluginOrder = append( + admissionOpts.RecommendedPluginOrder, p.name) + } + o.RecommendedOptions.Admission = admissionOpts + + // Inject the Apoxy SharedInformerFactory into admission + // plugins that implement WantsApoxyInformerFactory. + o.RecommendedOptions.ExtraAdmissionInitializers = func( + c *apiserver.RecommendedConfig, + ) ([]admission.PluginInitializer, error) { + a3yClient, err := a3yclient.NewForConfig(c.ClientConfig) + if err != nil { + return nil, fmt.Errorf("failed to create apoxy client for admission: %w", err) } + a3yFactory := a3yinformers.NewSharedInformerFactory(a3yClient, 0) + return []admission.PluginInitializer{ + a3yadmission.New(a3yFactory, a3yClient), + }, nil + } - o.RecommendedOptions.SecureServing = &apiserveropts.SecureServingOptionsWithLoopback{ - SecureServingOptions: &apiserveropts.SecureServingOptions{ - BindAddress: netutils.ParseIPSloppy("0.0.0.0"), - BindPort: 8443, - ServerCert: apiserveropts.GeneratableKeyCert{ - GeneratedCert: genCert, - }, + o.RecommendedOptions.SecureServing = &apiserveropts.SecureServingOptionsWithLoopback{ + SecureServingOptions: &apiserveropts.SecureServingOptions{ + BindAddress: netutils.ParseIPSloppy(opts.bindAddress), + BindPort: opts.bindPort, + ServerCert: apiserveropts.GeneratableKeyCert{ + GeneratedCert: genCert, }, - } + }, + } - if opts.enableInClusterAuth { - o.RecommendedOptions.Authentication = apiserveropts.NewDelegatingAuthenticationOptions() - o.RecommendedOptions.Authentication.RemoteKubeConfigFileOptional = true - - o.RecommendedOptions.Authorization = apiserveropts.NewDelegatingAuthorizationOptions() - o.RecommendedOptions.Authorization.RemoteKubeConfigFileOptional = true - o.RecommendedOptions.Authorization.AlwaysAllowPaths = []string{"healthz"} - o.RecommendedOptions.Authorization.AlwaysAllowGroups = []string{ - user.SystemPrivilegedGroup, - } - } else { - o.RecommendedOptions.Authentication = nil - o.RecommendedOptions.Authorization = nil - } + if opts.enableInClusterAuth { + o.RecommendedOptions.Authentication = apiserveropts.NewDelegatingAuthenticationOptions() + o.RecommendedOptions.Authentication.RemoteKubeConfigFileOptional = true - return o - }). - WithConfigFns(func(c *apiserver.RecommendedConfig) *apiserver.RecommendedConfig { - // TODO(dilyevsky): Figure out how to make the listener flexible. - // c.SecureServing.Listener = lst + o.RecommendedOptions.Authorization = apiserveropts.NewDelegatingAuthorizationOptions() + o.RecommendedOptions.Authorization.RemoteKubeConfigFileOptional = true + o.RecommendedOptions.Authorization.AlwaysAllowPaths = []string{"healthz"} + o.RecommendedOptions.Authorization.AlwaysAllowGroups = []string{ + user.SystemPrivilegedGroup, + } + } else { + o.RecommendedOptions.Authentication = nil + o.RecommendedOptions.Authorization = nil + } - c.ClientConfig = clientConfig - c.SharedInformerFactory = informers.NewSharedInformerFactory( - kubernetes.NewForConfigOrDie(c.ClientConfig), - 0, + return o + }). + WithConfigFns(func(c *apiserver.RecommendedConfig) *apiserver.RecommendedConfig { + c.ClientConfig = clientConfig + c.SharedInformerFactory = informers.NewSharedInformerFactory( + kubernetes.NewForConfigOrDie(c.ClientConfig), + 0, + ) + c.FlowControl = nil + + if opts.enableSimpleAuth { + // For simple auth, we use a header authenticator and an always allow authorizer. + c.Authentication.Authenticator = auth.NewHeaderAuthenticator() + c.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() + } else if opts.enableInClusterAuth { + // For in-cluster auth, we use the default delegating (to the kube-apiserver) + // authenticator and authorizer. + // The union authenticator will try authenticators in order until one succeeds. + c.Authentication.Authenticator = union.New( + localClientAuth, + c.Authentication.Authenticator, ) - c.FlowControl = nil - - if opts.enableSimpleAuth { - // For simple auth, we use a header authenticator and an always allow authorizer. - c.Authentication.Authenticator = auth.NewHeaderAuthenticator() - c.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() - } else if opts.enableInClusterAuth { - // For in-cluster auth, we use the default delegating (to the kube-apiserver) - // authenticator and authorizer. - // The union authenticator will try authenticators in order until one succeeds. - c.Authentication.Authenticator = union.New( - localClientAuth, - c.Authentication.Authenticator, - ) - } + } - return c - }). - WithoutEtcd(). - Execute(); err != nil { - readyCh <- err - } - }() - go func() { - if err := waitForReadyz("https://localhost:8443", 300*time.Second); err != nil { - readyCh <- fmt.Errorf("failed to wait for /readyz endpoint: %v", err) - return - } - log.Infof("APIServer is ready") - readyCh <- nil - }() - - log.Infof("Waiting for APIServer...") - - select { - case <-ctx.Done(): - return fmt.Errorf("context cancelled while waiting for APIServer: %v", ctx.Err()) - case err, ok := <-readyCh: - if !ok { - return errors.New("APIServer failed to start") - } - if err != nil { - return fmt.Errorf("APIServer failed to start: %v", err) - } + return c + }). + WithoutEtcd(). + Build() + if err != nil { + return fmt.Errorf("failed to build APIServer: %w", err) + } + + if _, err := serverOpts.RunApoxyServer(ctx); err != nil { + return fmt.Errorf("failed to start APIServer: %w", err) } + if err := waitForReadyz("https://"+opts.loopbackHostPort(), 300*time.Second); err != nil { + return fmt.Errorf("failed to wait for /readyz endpoint: %v", err) + } + + log.Infof("APIServer is ready") + return nil } diff --git a/pkg/apiserver/server/README.md b/pkg/apiserver/server/README.md new file mode 100644 index 00000000..369c9d90 --- /dev/null +++ b/pkg/apiserver/server/README.md @@ -0,0 +1,14 @@ +# Apoxy API server + +This package contains the `apiserver` startup path that replaces the old +`apiserver-runtime` sample-apiserver command wiring. + +The active pieces are: + +- `apiserver/`: generic-apiserver config and API group installation +- `builder/`: registration of resources, status subresources, and config hooks +- `start/`: recommended-config application and non-blocking server startup + +The builder still reuses the `sigs.k8s.io/apiserver-runtime/pkg/builder/resource` +interfaces that Apoxy API types already implement, but it no longer depends on +the sample-apiserver-based builder and startup path. diff --git a/pkg/apiserver/server/apiserver/apiserver.go b/pkg/apiserver/server/apiserver/apiserver.go new file mode 100644 index 00000000..707389d8 --- /dev/null +++ b/pkg/apiserver/server/apiserver/apiserver.go @@ -0,0 +1,98 @@ +package apiserver + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/version" + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/util/compatibility" +) + +func NewScheme() *runtime.Scheme { + scheme := runtime.NewScheme() + + // Whitelist the unversioned API types for any apiserver. + unversioned := schema.GroupVersion{ + Group: "", + Version: "v1", + } + metav1.AddToGroupVersion(scheme, unversioned) + + scheme.AddUnversionedTypes(unversioned, + &metav1.Status{}, + &metav1.WatchEvent{}, + &metav1.APIVersions{}, + &metav1.APIGroupList{}, + &metav1.APIGroup{}, + &metav1.APIResourceList{}, + ) + return scheme +} + +// ExtraConfig holds custom apiserver config +type ExtraConfig struct { + Scheme *runtime.Scheme + Codecs serializer.CodecFactory + APIs map[schema.GroupVersionResource]StorageProvider + ServingInfo *genericapiserver.SecureServingInfo + Version *version.Info + ParameterCodec runtime.ParameterCodec +} + +// Config defines the config for the apiserver +type Config struct { + GenericConfig *genericapiserver.RecommendedConfig + ExtraConfig ExtraConfig +} + +// ApoxyServer contains state for an Apoxy API server. +type ApoxyServer struct { + GenericAPIServer *genericapiserver.GenericAPIServer +} + +type completedConfig struct { + GenericConfig genericapiserver.CompletedConfig + ExtraConfig *ExtraConfig +} + +// CompletedConfig embeds a private pointer that cannot be instantiated outside of this package. +type CompletedConfig struct { + *completedConfig +} + +// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. +func (cfg *Config) Complete() CompletedConfig { + cfg.GenericConfig.EffectiveVersion = compatibility.DefaultBuildEffectiveVersion() + + c := completedConfig{} + c.GenericConfig = cfg.GenericConfig.Complete() + c.ExtraConfig = &cfg.ExtraConfig + return CompletedConfig{&c} +} + +// New returns a new instance of ApoxyServer from the given config. +func (c completedConfig) New() (*ApoxyServer, error) { + genericServer, err := c.GenericConfig.New("apoxy-apiserver", genericapiserver.NewEmptyDelegate()) + if err != nil { + return nil, err + } + + s := &ApoxyServer{ + GenericAPIServer: genericServer, + } + + // Add new APIs through inserting into APIs + apiGroups, err := buildAPIGroupInfos(c.ExtraConfig.Scheme, c.ExtraConfig.Codecs, c.ExtraConfig.APIs, c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.ParameterCodec) + if err != nil { + return nil, err + } + for _, apiGroup := range apiGroups { + if err := s.GenericAPIServer.InstallAPIGroup(apiGroup); err != nil { + return nil, err + } + } + + return s, nil +} diff --git a/pkg/apiserver/server/apiserver/conn.go b/pkg/apiserver/server/apiserver/conn.go new file mode 100644 index 00000000..85b7c85a --- /dev/null +++ b/pkg/apiserver/server/apiserver/conn.go @@ -0,0 +1,41 @@ +package apiserver + +import ( + "context" + "net" +) + +// ConnProvider is an interface for providing network connections. +// This allows for custom network implementations, including in-memory connections for testing. +type ConnProvider interface { + Dial(network, address string) (net.Conn, error) + DialContext(ctx context.Context, network, address string) (net.Conn, error) + Listen(network, address string) (net.Listener, error) +} + +// NetworkConnProvider creates a ConnProvider where all connections are redirected over a particular +// network. Useful for use with memconn, which uses "memb" and "memu" +// as in-memory networks. +func NetworkConnProvider(p ConnProvider, network string) ConnProvider { + return networkConnProvider{ + delegate: p, + network: network, + } +} + +type networkConnProvider struct { + delegate ConnProvider + network string +} + +func (p networkConnProvider) Dial(network, address string) (net.Conn, error) { + return p.delegate.Dial(p.network, address) +} + +func (p networkConnProvider) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + return p.delegate.DialContext(ctx, p.network, address) +} + +func (p networkConnProvider) Listen(network, address string) (net.Listener, error) { + return p.delegate.Listen(p.network, address) +} diff --git a/pkg/apiserver/server/apiserver/ext.go b/pkg/apiserver/server/apiserver/ext.go new file mode 100644 index 00000000..a821de76 --- /dev/null +++ b/pkg/apiserver/server/apiserver/ext.go @@ -0,0 +1,53 @@ +package apiserver + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/sets" + genericregistry "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" + pkgserver "k8s.io/apiserver/pkg/server" +) + +type StorageProvider func(s *runtime.Scheme, g genericregistry.RESTOptionsGetter) (rest.Storage, error) + +func buildAPIGroupInfos(scheme *runtime.Scheme, + codecs serializer.CodecFactory, + apiMap map[schema.GroupVersionResource]StorageProvider, + g genericregistry.RESTOptionsGetter, + parameterCodec runtime.ParameterCodec) ([]*pkgserver.APIGroupInfo, error) { + resourcesByGroupVersion := make(map[schema.GroupVersion]sets.String) + groups := sets.NewString() + if parameterCodec == nil { + parameterCodec = metav1.ParameterCodec + } + for gvr := range apiMap { + groups.Insert(gvr.Group) + if resourcesByGroupVersion[gvr.GroupVersion()] == nil { + resourcesByGroupVersion[gvr.GroupVersion()] = sets.NewString() + } + resourcesByGroupVersion[gvr.GroupVersion()].Insert(gvr.Resource) + } + apiGroups := []*pkgserver.APIGroupInfo{} + for _, group := range groups.List() { + apis := map[string]map[string]rest.Storage{} + for gvr, storageProviderFunc := range apiMap { + if gvr.Group == group { + if _, found := apis[gvr.Version]; !found { + apis[gvr.Version] = map[string]rest.Storage{} + } + storage, err := storageProviderFunc(scheme, g) + if err != nil { + return nil, err + } + apis[gvr.Version][gvr.Resource] = storage + } + } + apiGroupInfo := pkgserver.NewDefaultAPIGroupInfo(group, scheme, parameterCodec, codecs) + apiGroupInfo.VersionedResourcesStorageMap = apis + apiGroups = append(apiGroups, &apiGroupInfo) + } + return apiGroups, nil +} diff --git a/pkg/apiserver/server/builder/builder.go b/pkg/apiserver/server/builder/builder.go new file mode 100644 index 00000000..e300c9f3 --- /dev/null +++ b/pkg/apiserver/server/builder/builder.go @@ -0,0 +1,426 @@ +package builder + +import ( + "context" + "fmt" + "strings" + "sync" + + serverapiserver "github.com/apoxy-dev/apoxy/pkg/apiserver/server/apiserver" + "github.com/apoxy-dev/apoxy/pkg/apiserver/server/start" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/registry/generic" + genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + registryrest "k8s.io/apiserver/pkg/registry/rest" + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/storage/names" + openapicommon "k8s.io/kube-openapi/pkg/common" + builderresource "sigs.k8s.io/apiserver-runtime/pkg/builder/resource" + "sigs.k8s.io/apiserver-runtime/pkg/builder/resource/resourcestrategy" + builderutil "sigs.k8s.io/apiserver-runtime/pkg/builder/resource/util" +) + +type ServerOptions = start.ServerOptions +type StoreFn func(*runtime.Scheme, *genericregistry.Store, *generic.StoreOptions) + +// Server builds an API server without going through apiserver-runtime's +// sample-apiserver command wiring. +type Server struct { + apiScheme *runtime.Scheme + openapiScheme *runtime.Scheme + + apiSchemeBuilder runtime.SchemeBuilder + openapiSchemeBuilder runtime.SchemeBuilder + + codecs serializer.CodecFactory + + optionsFns []func(*ServerOptions) *ServerOptions + configFns []start.RecommendedConfigFn + + storageProviders map[schema.GroupResource]*singletonProvider + apis map[schema.GroupVersionResource]serverapiserver.StorageProvider + + registeredGroupVersions map[schema.GroupVersion]struct{} + orderedGroupVersions []schema.GroupVersion +} + +func NewServerBuilder() *Server { + apiScheme := serverapiserver.NewScheme() + openapiScheme := serverapiserver.NewScheme() + + return &Server{ + apiScheme: apiScheme, + openapiScheme: openapiScheme, + codecs: serializer.NewCodecFactory(apiScheme), + storageProviders: make(map[schema.GroupResource]*singletonProvider), + apis: make(map[schema.GroupVersionResource]serverapiserver.StorageProvider), + registeredGroupVersions: make(map[schema.GroupVersion]struct{}), + } +} + +func (s *Server) WithAdditionalSchemeInstallers(fns ...func(*runtime.Scheme) error) *Server { + s.apiSchemeBuilder.Register(fns...) + return s +} + +func (s *Server) WithOptionsFns(fns ...func(*ServerOptions) *ServerOptions) *Server { + s.optionsFns = append(s.optionsFns, fns...) + return s +} + +func (s *Server) WithConfigFns(fns ...func(*genericapiserver.RecommendedConfig) *genericapiserver.RecommendedConfig) *Server { + for _, fn := range fns { + s.configFns = append(s.configFns, start.RecommendedConfigFn(fn)) + } + return s +} + +func (s *Server) WithOpenAPIDefinitions(name, version string, defs openapicommon.GetOpenAPIDefinitions) *Server { + s.configFns = append(s.configFns, start.SetOpenAPIDefinitionFn(s.openapiScheme, name, version, defs)) + return s +} + +func (s *Server) DisableAuthorization() *Server { + return s.WithOptionsFns(func(o *ServerOptions) *ServerOptions { + o.RecommendedOptions.Authorization = nil + return o + }) +} + +func (s *Server) WithoutEtcd() *Server { + return s.WithOptionsFns(func(o *ServerOptions) *ServerOptions { + o.RecommendedOptions.Etcd = nil + return o + }) +} + +func (s *Server) WithResourceAndStorage(obj builderresource.Object, fn StoreFn) *Server { + s.apiSchemeBuilder.Register(builderresource.AddToScheme(obj)) + s.openapiSchemeBuilder.Register(func(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(obj.GetGroupVersionResource().GroupVersion(), obj.New(), obj.NewList()) + return nil + }) + + sp := newStorageProvider(obj, fn) + s.forGroupVersionResource(obj.GetGroupVersionResource(), sp) + s.withStatusSubresource(obj, sp) + return s +} + +func (s *Server) Build() (*start.ApoxyServerOptions, error) { + codec, err := s.buildCodec() + if err != nil { + return nil, err + } + + opts := start.NewServerOptions(codec) + for _, fn := range s.optionsFns { + opts = fn(opts) + } + + return start.NewApoxyServerOptions( + s.apiScheme, + s.codecs, + codec, + s.configFns, + s.apis, + opts, + ), nil +} + +func (s *Server) buildCodec() (runtime.Codec, error) { + registerGroupVersions := func(scheme *runtime.Scheme) error { + versionsByGroup := make(map[string][]schema.GroupVersion) + for _, gv := range s.orderedGroupVersions { + versionsByGroup[gv.Group] = append(versionsByGroup[gv.Group], gv) + } + for _, versions := range versionsByGroup { + if err := scheme.SetVersionPriority(versions...); err != nil { + return err + } + } + for _, gv := range s.orderedGroupVersions { + metav1.AddToGroupVersion(scheme, gv) + } + return nil + } + + s.apiSchemeBuilder.Register(registerGroupVersions) + if err := s.apiSchemeBuilder.AddToScheme(s.apiScheme); err != nil { + return nil, err + } + s.openapiSchemeBuilder.Register(registerGroupVersions) + if err := s.openapiSchemeBuilder.AddToScheme(s.openapiScheme); err != nil { + return nil, err + } + + if len(s.orderedGroupVersions) == 0 { + return nil, fmt.Errorf("no group versions registered") + } + + return s.codecs.LegacyCodec(s.orderedGroupVersions...), nil +} + +func (s *Server) withGroupVersions(versions ...schema.GroupVersion) { + for _, gv := range versions { + if _, ok := s.registeredGroupVersions[gv]; ok { + continue + } + s.registeredGroupVersions[gv] = struct{}{} + s.orderedGroupVersions = append(s.orderedGroupVersions, gv) + } +} + +func (s *Server) forGroupVersionResource(gvr schema.GroupVersionResource, sp serverapiserver.StorageProvider) { + s.withGroupVersions(gvr.GroupVersion()) + + if _, found := s.storageProviders[gvr.GroupResource()]; !found { + s.storageProviders[gvr.GroupResource()] = &singletonProvider{provider: sp} + } + s.apis[gvr] = s.storageProviders[gvr.GroupResource()].Get +} + +func (s *Server) forGroupVersionSubresource(gvr schema.GroupVersionResource, sp serverapiserver.StorageProvider) { + if !strings.Contains(gvr.Resource, "/") { + panic(fmt.Sprintf("expected subresource gvr, got %s", gvr)) + } + s.withGroupVersions(gvr.GroupVersion()) + s.apis[gvr] = sp +} + +func (s *Server) withStatusSubresource(obj builderresource.Object, parent serverapiserver.StorageProvider) { + if _, ok := obj.(builderresource.ObjectWithStatusSubResource); !ok { + return + } + + parentGVR := obj.GetGroupVersionResource() + statusGVR := parentGVR.GroupVersion().WithResource(parentGVR.Resource + "/status") + s.forGroupVersionSubresource(statusGVR, (&statusSubresourceProvider{parent: parent}).Get) +} + +type singletonProvider struct { + once sync.Once + provider serverapiserver.StorageProvider + storage registryrest.Storage + err error +} + +func (s *singletonProvider) Get(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (registryrest.Storage, error) { + s.once.Do(func() { + s.storage, s.err = s.provider(scheme, optsGetter) + }) + return s.storage, s.err +} + +type statusSubresourceProvider struct { + parent serverapiserver.StorageProvider +} + +func (s *statusSubresourceProvider) Get(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (registryrest.Storage, error) { + parentStorage, err := s.parent(scheme, optsGetter) + if err != nil { + return nil, err + } + + stdParentStorage, ok := parentStorage.(registryrest.StandardStorage) + if !ok { + return nil, fmt.Errorf("parent storage for status subresource must implement rest.StandardStorage: %T", parentStorage) + } + + parentStore, ok := stdParentStorage.(*genericregistry.Store) + if !ok { + return nil, fmt.Errorf("status subresource parent must be *registry.Store: %T", stdParentStorage) + } + + statusStore := *parentStore + statusStore.UpdateStrategy = &statusSubresourceStrategy{RESTUpdateStrategy: parentStore.UpdateStrategy} + + return &statusSubresourceStorage{store: &statusStore}, nil +} + +type statusSubresourceStorage struct { + store *genericregistry.Store +} + +func (s *statusSubresourceStorage) New() runtime.Object { + return s.store.New() +} + +func (s *statusSubresourceStorage) Destroy() { + s.store.Destroy() +} + +func (s *statusSubresourceStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + return s.store.Get(ctx, name, options) +} + +func (s *statusSubresourceStorage) Update( + ctx context.Context, + name string, + objInfo registryrest.UpdatedObjectInfo, + createValidation registryrest.ValidateObjectFunc, + updateValidation registryrest.ValidateObjectUpdateFunc, + forceAllowCreate bool, + options *metav1.UpdateOptions, +) (runtime.Object, bool, error) { + return s.store.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) +} + +type statusSubresourceStrategy struct { + registryrest.RESTUpdateStrategy +} + +func (s *statusSubresourceStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) { + statusObj := obj.(builderresource.ObjectWithStatusSubResource) + statusOld := old.(builderresource.ObjectWithStatusSubResource) + + statusObj.GetStatus().CopyTo(statusOld) + if err := builderutil.DeepCopy(statusOld, statusObj); err != nil { + utilruntime.HandleError(err) + } +} + +func newStorageProvider(obj builderresource.Object, fn StoreFn) serverapiserver.StorageProvider { + return func(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (registryrest.Storage, error) { + gvr := obj.GetGroupVersionResource() + strategy := &defaultStrategy{ + Object: obj, + ObjectTyper: scheme, + TableConvertor: registryrest.NewDefaultTableConvertor(gvr.GroupResource()), + } + + store := &genericregistry.Store{ + NewFunc: obj.New, + NewListFunc: obj.NewList, + DefaultQualifiedResource: gvr.GroupResource(), + SingularQualifiedResource: gvr.GroupResource(), + TableConvertor: strategy, + CreateStrategy: strategy, + UpdateStrategy: strategy, + DeleteStrategy: strategy, + StorageVersioner: gvr.GroupVersion(), + } + + options := &generic.StoreOptions{RESTOptions: optsGetter} + if fn != nil { + fn(scheme, store, options) + } + + if err := store.CompleteWithOptions(options); err != nil { + return nil, err + } + + return store, nil + } +} + +type defaultStrategy struct { + Object runtime.Object + runtime.ObjectTyper + TableConvertor registryrest.TableConvertor +} + +func (d defaultStrategy) GenerateName(base string) string { + if d.Object == nil { + return names.SimpleNameGenerator.GenerateName(base) + } + if n, ok := d.Object.(names.NameGenerator); ok { + return n.GenerateName(base) + } + return names.SimpleNameGenerator.GenerateName(base) +} + +func (d defaultStrategy) NamespaceScoped() bool { + if d.Object == nil { + return true + } + if n, ok := d.Object.(registryrest.Scoper); ok { + return n.NamespaceScoped() + } + return true +} + +func (d defaultStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) { + if v, ok := obj.(resourcestrategy.PrepareForCreater); ok { + v.PrepareForCreate(ctx) + } +} + +func (d defaultStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) { + if v, ok := obj.(builderresource.ObjectWithStatusSubResource); ok { + old.(builderresource.ObjectWithStatusSubResource).GetStatus().CopyTo(v) + } + if v, ok := obj.(resourcestrategy.PrepareForUpdater); ok { + v.PrepareForUpdate(ctx, old) + } +} + +func (d defaultStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList { + if v, ok := obj.(resourcestrategy.Validater); ok { + return v.Validate(ctx) + } + return nil +} + +func (d defaultStrategy) AllowCreateOnUpdate() bool { + if d.Object == nil { + return false + } + if n, ok := d.Object.(resourcestrategy.AllowCreateOnUpdater); ok { + return n.AllowCreateOnUpdate() + } + return false +} + +func (d defaultStrategy) AllowUnconditionalUpdate() bool { + if d.Object == nil { + return false + } + if n, ok := d.Object.(resourcestrategy.AllowUnconditionalUpdater); ok { + return n.AllowUnconditionalUpdate() + } + return false +} + +func (d defaultStrategy) Canonicalize(obj runtime.Object) { + if c, ok := obj.(resourcestrategy.Canonicalizer); ok { + c.Canonicalize() + } +} + +func (d defaultStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList { + if v, ok := obj.(resourcestrategy.ValidateUpdater); ok { + return v.ValidateUpdate(ctx, old) + } + return nil +} + +func (d defaultStrategy) ConvertToTable(ctx context.Context, obj runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { + if c, ok := obj.(resourcestrategy.TableConverter); ok { + return c.ConvertToTable(ctx, tableOptions) + } + return d.TableConvertor.ConvertToTable(ctx, obj, tableOptions) +} + +func (d defaultStrategy) WarningsOnCreate(context.Context, runtime.Object) []string { + return nil +} + +func (d defaultStrategy) WarningsOnUpdate(context.Context, runtime.Object, runtime.Object) []string { + return nil +} + +func (d defaultStrategy) GetSingularName() string { + if d.Object == nil { + return "" + } + if n, ok := d.Object.(registryrest.SingularNameProvider); ok { + return n.GetSingularName() + } + return "" +} diff --git a/pkg/apiserver/server/start/start.go b/pkg/apiserver/server/start/start.go new file mode 100644 index 00000000..e46cec6d --- /dev/null +++ b/pkg/apiserver/server/start/start.go @@ -0,0 +1,163 @@ +package start + +import ( + "context" + "fmt" + "io" + "net" + + serverapiserver "github.com/apoxy-dev/apoxy/pkg/apiserver/server/apiserver" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/endpoints/openapi" + "k8s.io/apiserver/pkg/registry/generic" + genericapiserver "k8s.io/apiserver/pkg/server" + apiserveropts "k8s.io/apiserver/pkg/server/options" + "k8s.io/apiserver/pkg/storage/storagebackend" + netutils "k8s.io/utils/net" + openapicommon "k8s.io/kube-openapi/pkg/common" +) + +type RecommendedConfigFn func(*genericapiserver.RecommendedConfig) *genericapiserver.RecommendedConfig + +type ServerOptions struct { + StdOut io.Writer + StdErr io.Writer + + RecommendedOptions *apiserveropts.RecommendedOptions +} + +func NewServerOptions(codec runtime.Codec) *ServerOptions { + return &ServerOptions{ + StdOut: io.Discard, + StdErr: io.Discard, + RecommendedOptions: apiserveropts.NewRecommendedOptions( + "/registry/apoxy", + codec, + ), + } +} + +type ApoxyServerOptions struct { + scheme *runtime.Scheme + codecs serializer.CodecFactory + codec runtime.Codec + recommendedConfigFns []RecommendedConfigFn + apis map[schema.GroupVersionResource]serverapiserver.StorageProvider + serverOptions *ServerOptions +} + +func NewApoxyServerOptions( + scheme *runtime.Scheme, + codecs serializer.CodecFactory, + codec runtime.Codec, + recommendedConfigFns []RecommendedConfigFn, + apis map[schema.GroupVersionResource]serverapiserver.StorageProvider, + serverOptions *ServerOptions, +) *ApoxyServerOptions { + if serverOptions == nil { + serverOptions = NewServerOptions(codec) + } + + return &ApoxyServerOptions{ + scheme: scheme, + codecs: codecs, + codec: codec, + recommendedConfigFns: recommendedConfigFns, + apis: apis, + serverOptions: serverOptions, + } +} + +func (o *ApoxyServerOptions) ApplyRecommendedConfigFns(in *genericapiserver.RecommendedConfig) *genericapiserver.RecommendedConfig { + for _, fn := range o.recommendedConfigFns { + in = fn(in) + } + return in +} + +func SetOpenAPIDefinitionFn(scheme *runtime.Scheme, name, version string, defs openapicommon.GetOpenAPIDefinitions) RecommendedConfigFn { + return func(config *genericapiserver.RecommendedConfig) *genericapiserver.RecommendedConfig { + config.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(defs, openapi.NewDefinitionNamer(scheme)) + config.OpenAPIV3Config.Info.Title = name + config.OpenAPIV3Config.Info.Version = version + + config.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(defs, openapi.NewDefinitionNamer(scheme)) + config.OpenAPIConfig.Info.Title = name + config.OpenAPIConfig.Info.Version = version + return config + } +} + +func (o *ApoxyServerOptions) Config() (*serverapiserver.Config, error) { + if ro := o.serverOptions.RecommendedOptions; ro != nil && ro.SecureServing != nil { + if err := ro.SecureServing.MaybeDefaultWithSelfSignedCerts( + "localhost", + nil, + []net.IP{netutils.ParseIPSloppy("127.0.0.1")}, + ); err != nil { + return nil, fmt.Errorf("error creating self-signed certificates: %w", err) + } + } + + serverConfig := genericapiserver.NewRecommendedConfig(o.codecs) + serverConfig = o.ApplyRecommendedConfigFns(serverConfig) + + if ro := o.serverOptions.RecommendedOptions; ro != nil { + if err := ro.ApplyTo(serverConfig); err != nil { + return nil, err + } + } + + serverConfig = o.ApplyRecommendedConfigFns(serverConfig) + serverConfig.RESTOptionsGetter = o + + return &serverapiserver.Config{ + GenericConfig: serverConfig, + ExtraConfig: serverapiserver.ExtraConfig{ + Scheme: o.scheme, + Codecs: o.codecs, + APIs: o.apis, + }, + }, nil +} + +func (o ApoxyServerOptions) GetRESTOptions(resource schema.GroupResource, _ runtime.Object) (generic.RESTOptions, error) { + return generic.RESTOptions{ + StorageConfig: &storagebackend.ConfigForResource{ + GroupResource: resource, + Config: storagebackend.Config{ + Codec: o.codec, + }, + }, + ResourcePrefix: resource.String(), + }, nil +} + +func (o ApoxyServerOptions) RunApoxyServer(ctx context.Context) (<-chan struct{}, error) { + config, err := o.Config() + if err != nil { + return nil, err + } + + completed := config.Complete() + server, err := completed.New() + if err != nil { + return nil, err + } + + server.GenericAPIServer.AddPostStartHookOrDie("start-apoxy-server-informers", func(context genericapiserver.PostStartHookContext) error { + if completed.GenericConfig.SharedInformerFactory != nil { + completed.GenericConfig.SharedInformerFactory.Start(context.Context.Done()) + } + return nil + }) + + prepared := server.GenericAPIServer.PrepareRun() + stoppedCh, _, err := prepared.NonBlockingRunWithContext(ctx, prepared.ShutdownTimeout) + if err != nil { + return nil, err + } + return stoppedCh, nil +} diff --git a/pkg/apiserver/storage.go b/pkg/apiserver/storage.go index 95038991..5e682b40 100644 --- a/pkg/apiserver/storage.go +++ b/pkg/apiserver/storage.go @@ -6,11 +6,13 @@ import ( "fmt" "log/slog" "os" + "path/filepath" goruntime "runtime" "strings" "time" _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus" driversgeneric "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/endpoint" @@ -22,8 +24,9 @@ import ( genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/util/flowcontrol/request" - "sigs.k8s.io/apiserver-runtime/pkg/builder/rest" "sigs.k8s.io/controller-runtime/pkg/metrics" + + serverbuilder "github.com/apoxy-dev/apoxy/pkg/apiserver/server/builder" ) // SQLite auto_vacuum modes: https://www.sqlite.org/pragma.html#pragma_auto_vacuum @@ -153,7 +156,7 @@ func startIncrementalVacuum(ctx context.Context, path string, interval time.Dura // dbPath is the SQLite database file path (or "file::memory:" for in-memory). // connArgs are SQLite connection parameters (e.g. {"cache": "shared", "_journal_mode": "WAL"}). // logFormat should be "json" for production or "plain" for development. -func NewKineStorage(ctx context.Context, dbPath string, connArgs map[string]string, logFormat string) (rest.StoreFn, error) { +func NewKineStorage(ctx context.Context, dbPath string, connArgs map[string]string, logFormat string) (serverbuilder.StoreFn, error) { // Skipped for in-memory DBs where there are no file pages to reclaim. if !strings.Contains(dbPath, ":memory:") { // Enable incremental auto_vacuum before kine opens the database. @@ -173,13 +176,25 @@ func NewKineStorage(ctx context.Context, dbPath string, connArgs map[string]stri if tmpDir == "" { tmpDir = os.TempDir() } + listenerDir, err := os.MkdirTemp(tmpDir, "apiserver-kine-*") + if err != nil { + return nil, fmt.Errorf("creating kine listener dir: %w", err) + } + go func() { + <-ctx.Done() + _ = os.RemoveAll(listenerDir) + }() + metricsRegisterer := prometheus.WrapRegistererWith( + prometheus.Labels{"kine_listener": filepath.Base(listenerDir)}, + metrics.Registry, + ) etcdConfig, err := endpoint.Listen(ctx, endpoint.Config{ Endpoint: dsn, - Listener: "unix://" + tmpDir + "/apiserver-kine.sock", + Listener: "unix://" + filepath.Join(listenerDir, "kine.sock"), ConnectionPoolConfig: driversgeneric.ConnectionPoolConfig{ MaxOpen: goruntime.NumCPU(), }, - MetricsRegisterer: metrics.Registry, + MetricsRegisterer: metricsRegisterer, // Default are defined in kine: https://github.com/k3s-io/kine/blob/0dc5b174a18cf13b299a2b597afe0608cd769663/pkg/app/app.go#L27 NotifyInterval: 5 * time.Second, EmulatedETCDVersion: "3.5.13",