kubelet 调用 containerd 内置的 CRI 插件管理容器和镜像,并通过 CNI 插件给 Pod 配置网络。
CRI Plugin部分可定制化部分包括 runtime service 和 image service两部分。我们可以通过对CRI Plugin进行定制化开发,以解决部分在上层Kubernetes部分难以解决的问题。
Runtime Service
Kubernetes CRI Runtime Service Interface
// code ref: <https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L11294>
// RuntimeServiceServer is the server API for RuntimeService service.
type RuntimeServiceServer interface {
// Version returns the runtime name, runtime version, and runtime API version.
Version(context.Context, *VersionRequest) (*VersionResponse, error)
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
// the sandbox is in the ready state on success.
RunPodSandbox(context.Context, *RunPodSandboxRequest) (*RunPodSandboxResponse, error)
// StopPodSandbox stops any running process that is part of the sandbox and
// reclaims network resources (e.g., IP addresses) allocated to the sandbox.
// If there are any running containers in the sandbox, they must be forcibly
// terminated.
// This call is idempotent, and must not return an error if all relevant
// resources have already been reclaimed. kubelet will call StopPodSandbox
// at least once before calling RemovePodSandbox. It will also attempt to
// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
// multiple StopPodSandbox calls are expected.
StopPodSandbox(context.Context, *StopPodSandboxRequest) (*StopPodSandboxResponse, error)
// RemovePodSandbox removes the sandbox. If there are any running containers
// in the sandbox, they must be forcibly terminated and removed.
// This call is idempotent, and must not return an error if the sandbox has
// already been removed.
/*
see more codes in <https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go>
*/
// RuntimeConfig returns configuration information of the runtime.
// A couple of notes:
// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.
// The former is for having runtime tell Kubelet what to do, the latter vice versa.
// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.
// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should
// avoid updating them without a full node reboot.
RuntimeConfig(context.Context, *RuntimeConfigRequest) (*RuntimeConfigResponse, error)
}
Containerd RuntimeServiceServer 标准实现
// code ref: <https://github.com/containerd/containerd/blob/v1.7.20/pkg/cri/server/service.go#L63>
// CRIService is the interface implement CRI remote service server.
type CRIService interface {
runtime.RuntimeServiceServer
runtime.ImageServiceServer
// Closer is used by containerd to gracefully stop cri service.
io.Closer
Run(ready func()) error
Register(*grpc.Server) error
}
开发CRI Plugin
kubelet配置
- 修改config.yaml kubelet配置如下,修改
containerRuntimeEndpoint
为你将要运行的sock地址#cat /var/lib/kubelet/config.yaml address: 0.0.0.0 apiVersion: kubelet.config.k8s.io/v1beta1 containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
- 修改systemd配置 通过修改systemd中kubelet启动命令修改 CRI runtime endpoint,这种方式已经过时,同时会覆盖config.yaml配置
“`bash# cat /etc/systemd/system/kubelet.service.d/10-kubeadm.conf [Service] Environment="KUBELET_KUBECONFIG_ARGS=--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --kubeconfig=/etc/kubernetes/kubelet.conf" Environment="KUBELET_CONFIG_ARGS=--config=/var/lib/kubelet/config.yaml" EnvironmentFile=-/var/lib/kubelet/kubeadm-flags.env Environment="KUBELET_EXTRA_ARGS= \\ --runtime-request-timeout=15m --container-runtime-endpoint=unix:///var/run/containerd/containerd.sock" ExecStart= ExecStart=/usr/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_CONFIG_ARGS $KUBELET_KUBEADM_ARGS $KUBELET_EXTRA_ARGS
“`
实现 RuntimeServiceServer interface
下面是一个CRI Runtime interface最简单的实现,启动 Server 并初始化RuntimeServiceClient
,代理所有接口到 runtimeapi.RuntimeServiceClient
package server
import (
"context"
"log/slog"
"net"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
type Server struct {
options Options
client runtimeapi.RuntimeServiceClient
server *grpc.Server
listener net.Listener
}
// make sure Server impl runtimeapi.RuntimeServiceServer
var _ runtimeapi.RuntimeServiceServer = (*Server)(nil)
func New(options Options) (*Server, error) {
listener, err := net.Listen("unix", options.ShimSocket)
if err != nil {
return nil, err
}
server := grpc.NewServer()
return &Server{
server: server,
listener: listener,
}, nil
}
func (s *Server) Start() error {
conn, err := grpc.NewClient(s.options.CRISocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
s.client = runtimeapi.NewRuntimeServiceClient(conn)
runtimeapi.RegisterRuntimeServiceServer(s.server, s)
// do serve after client is created and registered
go func() {
_ = s.server.Serve(s.listener)
}()
return waitForServer(s.options.ShimSocket, time.Second)
}
func (s *Server) Stop() {
s.server.Stop()
s.listener.Close()
}
func (s *Server) Version(ctx context.Context, request *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) {
slog.Info("Doing version request", "request", request)
resp, err := s.client.Version(ctx, request)
if err != nil {
slog.Error("failed to get version", "error", err)
return resp, err
}
slog.Debug("Got version response", "response", resp)
return resp, err
}
func (s *Server) RunPodSandbox(ctx context.Context, request *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
slog.Info("Doing run pod sandbox request", "request", request)
return s.client.RunPodSandbox(ctx, request)
}
func (s *Server) StopPodSandbox(ctx context.Context, request *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
slog.Info("Doing stop pod sandbox request", "request", request)
return s.client.StopPodSandbox(ctx, request)
}
func (s *Server) RemovePodSandbox(ctx context.Context, request *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) {
slog.Info("Doing remove pod sandbox request", "request", request)
return s.client.RemovePodSandbox(ctx, request)
}
/*
todo: add more funcs to impl RuntimeServiceServer interface
*/
func waitForServer(socket string, timeout time.Duration, opts ...interface{}) error {
// impl this. wait until socket has been served.
}
应用
通过实现RuntimeServiceServer
,我们可以在kubelet调用创建/删除容器时,添加自己的逻辑,生产中的应用列举:
- 在容器创建时注入无感知注入/修改/删除容器 env,并对 k8s apiserver不可见;
- 在容器删除/暂停时通过容器commit操作,保存容器状态到镜像中;
- 限制具体容器操作,如消耗较大的exec操作等;
tips:由于能获取到的容器信息有限,对容器进行上述操作进行的筛选可以通过容器env判断,也可以通过plugin连接containerd,创建containerd client获取容器更多信息。
Image Service
Kubernetes CRI Image Service Interface
// code ref: <https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L12215>
// ImageServiceServer is the server API for ImageService service.
type ImageServiceServer interface {
// ListImages lists existing images.
ListImages(context.Context, *ListImagesRequest) (*ListImagesResponse, error)
// ImageStatus returns the status of the image. If the image is not
// present, returns a response with ImageStatusResponse.Image set to
// nil.
ImageStatus(context.Context, *ImageStatusRequest) (*ImageStatusResponse, error)
// PullImage pulls an image with authentication config.
PullImage(context.Context, *PullImageRequest) (*PullImageResponse, error)
// RemoveImage removes the image.
// This call is idempotent, and must not return an error if the image has
// already been removed.
RemoveImage(context.Context, *RemoveImageRequest) (*RemoveImageResponse, error)
// ImageFSInfo returns information of the filesystem that is used to store images.
ImageFsInfo(context.Context, *ImageFsInfoRequest) (*ImageFsInfoResponse, error)
}
Image Service标准实现
可以发现,RuntimeServiceServer 和 ImageServiceServer 同时在这里实现,具体代码参考 https://github.com/containerd/containerd/blob/main/internal/cri/server/images/service.go#L51。
实现 ImageServiceServer interface
同实现 RuntimeServiceServer interface一样,这里不再赘述。
应用
可以发现 ImageServiceServer interface 中可以定义镜像或者修改镜像信息,那么就有很多应用场景:
- 修改镜像拉取地址等操作,适合用于在内网环境下,全量替换k8s镜像拉取地址,修改到内网registry或者代理registry;
- 或者从安全性考虑,在pod启动前,提前拉取镜像,验证镜像安全性等。