APIServer源码分析之路由注册

APiServer源码分析之路由注册

作者:阳明 2023-03-17 17:51:30

云计算

云原生 先通过 GenericConfig 创建一个名为 Apiextensions-Apiserver 的 GenericServer,GenericServer 提供了一个通用的 Http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。

创新互联建站专注为客户提供全方位的互联网综合服务,包含不限于网站建设、做网站、河北网络推广、微信小程序开发、河北网络营销、河北企业策划、河北品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联建站为所有大学生创业者提供河北建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com

前面我们对 Kube APIServer 的入口点和 go-restful 有一个基础了解后,我们就可以开始来了解下 APIExtensionServer 是如何实例化的了。

APIExtensionServer

APIExtensionServer 的创建流程大致包含以下几个步骤:

  • 创建 GeneriAPIServer
  • 实例化 CustomResourceDefinitions
  • 实例化 APIGroupInfo
  • InstallAPIGroup

三种类型的 Server 底层都需要依赖 GeneriAPIServer,第二步创建的 CustomResourceDefinitions 就是当前类型的 Server 对象,用于后续进行路由注册。APIGroupInfo 是用于每个版本、每个资源类型对应的存储对象。最后调用 ​​InstallAPIGroup​​ 进行路由注册,把每一个资源的版本,类型映射到一个 URI 地址中。代码如下所示:

// cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}

// 真正的代码位于:/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New 从指定的配置返回 CustomResourceDefinitions 的新实例
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// APIExtensionsServer 依赖 GenericAPIServer

// 通过 GenericConfig 创建一个名为 apiextensions-apiserver 的 genericServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)


// 实例化 CustomResourceDefinitions 对象
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}

apiResourceConfig := c.GenericConfig.MergedResourceConfig

// 实例化 APIGroupInfo 对象,APIGroup 指该 server 需要暴露的 API
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)

// 如果开启了 v1 版本,将资源版本、资源、资源存储存放在APIGroupInfo的map中
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)

storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}

// 注册API
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}

// 初始化 crd clientset 和 informers,用于初始化控制器
crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)

delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}

versionDiscoveryHandler := &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
delegate: delegateHandler,
}
groupDiscoveryHandler := &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
// 初始化控制器
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
// 申请handler处理器
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
// ......
)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
// 初始化其他控制器
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
crdClient.ApiextensionsV1(),
crdHandler,
)
// 初始化openapi控制器
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
var openapiv3Controller *openapiv3controller.Controller
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
}
// 将 informer 以及 controller 添加到 PostStartHook 中
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
// 注册hook函数,启动前面实例化的各种controller
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {

if s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}
// 启动各种控制器
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)

discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}

return nil
})
// ....

return s, nil
}

先通过 GenericConfig 创建一个名为 apiextensions-apiserver 的 genericServer,genericServer 提供了一个通用的 http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。无论是 APIServer 还是 APIExtensionsServer 都依赖于 genericServer,实现方式如下所示:

// vendor/k8s.io/apiserver/pkg/server/config.go
// New 创建了一个新的服务器,该服务器将处理链与传递的服务器逻辑地结合在一起。
// name被用来区分日志记录
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// ...

handlerChainBuilder := func(handler http.Handler) http.Handler {
// BuildHandlerChainFunc 允许你通过装饰 apiHandler 来构建自定义处理程序链
// 目前默认的处理链函数为 DefaultBuildHandlerChain:里面包含了大量默认的处理方式
return c.BuildHandlerChainFunc(handler, c.Config)
}

apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

s := &GenericAPIServer{

Handler: apiServerHandler,

listedPathProvider: apiServerHandler,

// ......
}

// ......
// 安装一些额外的路由,比如索引全部接口、metrics接口等
installAPI(s, c.Config)

return s, nil
}

// vendor/k8s.io/apiserver/pkg/server/handler.go
// HandlerChainBuilderFn 被用来包装正在使用提供的处理器链 GoRestfulContainer 处理器
// 它通常用于应用过滤,如身份验证和授权
type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler

// 该函数就是用来按照 go-restful 的模式初始化 Container
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}

gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})

director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}

return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}

然后实例化 CRD 和 APIGroupInfo,其中的 APIGroupInfo 对象用于描述资源组信息,一个资源对应一个 APIGroupInfo 对象,每个资源对应一个资源存储对象:

// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
return APIGroupInfo{
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
// 这个map用于存储资源、资源存储对象的映射关系
// 格式:资源版本/资源/资源存储对象
// 资源存储对象 RESTStorage,负责资源的CRUD
// 后续将 RESTStorage 转换为 http 的 handler 函数
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},

OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
Scheme: scheme,
ParameterCodec: parameterCodec,
NegotiatedSerializer: codecs,
}
}

然后需要注册 APIGroupInfo,通过 s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo) 来实现的,将 APIGroupInfo 中的资源对象注册到APIExtensionServerHandler 函数。其过程是:

  • 遍历 APIGroupInfo。
  • 将资源组、资源版本、资源名称映射到 http path 请求路径。
  • 通过 InstallREST 函数将资源存储对象作为资源的 handlers 方法。
  • 最后用 go-restfu l的 ws.Route 将定义好的请求路径和 handlers 方法添加路由到 go-restful。

详细的代码如下所示:

// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go

// 在API中暴露指定的 APIGroup
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
return s.InstallAPIGroups(apiGroupInfo)
}

func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {

// ...
// 获取 OpenAPI 模型
openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
// 遍历所有的资源信息,一次安装资源版本处理器
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}

apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
// ...
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}

s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}

// installAPIResources 用于安装 RESTStorage 以支持每个 api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {

apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
// ...
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes

// 核心就是调用 InstallREST,参数为go-restful的container对象
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
// ...
resourceInfos = append(resourceInfos, r...)
}

// ...

return nil
}

// InstallREST 将 REST handlers (storage, watch, proxy and redirect) 注册到 restful 容器中。
// 预期提供的路径 root 前缀将服务于所有操作,root 不能以斜线结尾。
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 比如从 InstallAPI 调用链下来这里的 g.Root 为 /apis,这样就可以确定 handler 的前缀为 /apis/{goup}/{version}
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
// 实例化 APIInstaller 对象
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 调用 Install 函数:注册 api,返回 go-restful 的 WebService 对象
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
// 将 WebService 添加到 Container 中,这需要了解 go-restful 框架的知识
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}

上面整个过程都是为了注册 API 来做的准备,核心是在 installer.Install() 函数中,该函数就是将 API 资源添加处理器的。

// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go

// 使用api installer 的前缀和版本创建一个新的 restful webservice 对象
// 这部分都属于 go-restful 框架的用法
func (a *APIInstaller) newWebService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(a.prefix)
// a.prefix 包含 "prefix/group/version"
ws.Doc("API at " + a.prefix)
ws.Consumes("*/*")
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
ws.Produces(append(mediaTypes, streamMediaTypes...)...)
ws.ApiVersion(a.group.GroupVersion.String())
return ws
}

func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error

// 新建一个 WebService 对象(go-restful框架中的)
ws := a.newWebService()

// 将 paths 排序
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
// 获取swagger spec规范
for _, path := range paths {
// 将 Storage 转换成 Router,然后将路由注册到 webservice
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)

if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
if resourceInfo != nil {
resourceInfos = append(resourceInfos, resourceInfo)
}
}
return apiResources, resourceInfos, ws, errors
}

这里最重要的就是 registerResourceHandlers 函数了,这个方法很长,核心功能是根据 storage 构造 handler,再将 handler 和 path 构造成 go-restful 框架的 Route 对象,最后 Route 添加到 webservice。

// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
// ......

// 是否是命名空间级别
var namespaceScoped bool
// ......

// storage 支持哪些 verbs 操作,用于了解每个 path 所支持的操作。
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)

// ......

// 获取指定范围的操作列表
switch {
case !namespaceScoped:
// 处理非命名空间范围的资源,如节点
// ......

// 添加 actions 到资源路径:/api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// ......
default:
// 命名空间级别的资源对象
namespaceParamName := "namespaces"
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}
// ......
// 构造 action 列表
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
// ......

}

// 为 actions 创建 Routes 路由

// 配置go-restful产生的MIME类型
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
allMediaTypes := append(mediaTypes, streamMediaTypes...)
ws.Produces(allMediaTypes...)

// ...
for _, action := range actions {
// ......

// 构造 go-restful 的 RouteBuilder 对象
routes := []*restful.RouteBuilder{}

// 如果是子资源,kind应该是prent的kind
if isSubresource {
parentStorage, ok := a.group.Storage[resource]

fqParentKind, err := GetResourceKind(a.group.GroupVersion, parentStorage, a.group.Typer)

kind = fqParentKind.Kind
}

// 根据不同的 Verb,注册到不同的 handler 中去
switch action.Verb {
case "GET":
var handler restful.RouteFunction // go-restful的处理器
// 初始化handler
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}

//...

// 构造 route(这都属于go-restful框架的使用方法)
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)

// 将route添加到routes
addParams(route, action.Params)
routes = append(routes, route)
// ... 其他verb处理方式基本一致
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
// ......
}

// 遍历路由,加入到 WebService 中
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
// 将route加入到WebService中去
ws.Route(route)
}
}

// ......

return &apiResource, resourceInfo, nil
}

registerResourceHandlers 函数很长,但是我们可以先抛开细节,整体上了解下,可以看到就是先通过 Storage 判断支持哪些 Verbs 操作,然后生成 actions 列表,然后将每个 action 构造路由列表,最后也是将这些路由添加到 go-restful 的 WebService 中去,这里构造的路由绑定的处理器实现方式路由不同,比如 GET 方式的 handler 是通过 restfulGetResource 来实例化的,POST 方式的是通过 restfulCreateResource 来实例化的,实现方式基本差不多。

GET 方式 handler 函数 restfulGetResource 实现如下所示:

// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func restfulGetResource(r rest.Getter, scope handlers.RequestScope) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.GetResource(r, &scope)(res.ResponseWriter, req.Request)
}
}

restfulGetResource 函数得到的就是 restful.RouteFunction,这是 go-restful 的方式,真正去处理的是 ​​handlers.GetResource​​ 这个函数,这个函数里面调用的是 getResourceHandler,该函数返回的就是一个 http 标准库 handler 函数,处理对应的路由请求。

GET 请求的处理过程比较简单,通过请求的查询构造出一个 metav1.GetOptions,然后交给 Getter 接口处理,最后将查询结果转换后返回给请求者。

//vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go

// GetResource 返回一个函数,该函数处理从 rest.Storage 对象中检索单个资源的操作。
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// 初始化需要的 GetOptions
options := metav1.GetOptions{}
// 获取查询的参数
if values := req.URL.Query(); len(values) > 0 {
// ...
// 将查询的参数进行解码,编程 GetOptions 对象
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
// 然后使用 Getter 接口来处理
return r.Get(ctx, name, &options)
})
}

// getResourceHandler 是用于获取请求的 HTTP Handler函数
// 它委托给传入的 getterFunc 来执行实际的 get
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// ...

namespace, name, err := scope.Namer.Name(req)

/// ...
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)

outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)

// ...
// 使用 getterFunc 来执行实际的 get 操作,
result, err := getter(ctx, name, req, trace)
// ...

// 将结果转换为用户需要的格式返回给用户
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)

}
}

POST 的处理器也是类似的,对应的逻辑在 restfulCreateResource 中:

// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}

同样真正去处理的是 handlers.CreateResource 这个函数,这个函数里面调用的是 createHandler,该函数返回的就是一个 http 标准库 handler 函数,处理对应的路由请求。

//vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go

// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}

// CreateResource 返回将处理资源创建的函数
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}

createHandler 的实现代码比较长,主要做了一下几件事:

  1. 对查询串进行解码生成 metav1.CreateOptions 。
  2. 对请求的 body 体中的数据进行解码,生成资源对象。解码的对象版本是 internal 版本,internal 版本是该资源对象所有版本字段的全集。针对不同版本的对象内部可以使用相同的代码进行处理。
  3. 对对象进行修改的准入控制,判断是否修需要修改对象。
  4. 交给 creater 接口创建资源对象。
  5. 将数据转换为期望的格式写入 response 中,调用 creater 接口返回的结果仍然是 internal 版本,编码时,会编码成用户请求的版本返回给用户。
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go

// 返回一个 http handler 函数,处理对应的路由请求
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
// 标准 http handler 函数
return func(w http.ResponseWriter, req *http.Request) {
// ...

outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)

// 找到合适的 Serializer
gv := scope.Kind.GroupVersion()
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)

// 将请求解码成 CreateOptions
options := &metav1.CreateOptions{}
values := req.URL.Query()
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
// ...
}
// ...
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))

defaultGVK := scope.Kind
original := r.New()

// ...

// 找到合适的解码器
decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)

// 请请求体 body 进行解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)


ctx = request.WithNamespace(ctx, namespace)
// 审计、准入、请求日志记录
ae := audit.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
audit.LogRequestObject(req.Context(), obj, objGV, scope.Resource, scope.Subresource, scope.Serializer)

userInfo, _ := request.UserFrom(ctx)

admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}

// 处理请求
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
// ...
// 执行准入控制的mutate操作,就是在创建对象的时候进行修改
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// ......
// 调用创建方法
result, err := requestFunc()
return result, err
})


code := http.StatusCreated
status, ok := result.(*metav1.Status)
if ok && 网站名称:APIServer源码分析之路由注册
本文来源:http://www.hantingmc.com/qtweb/news14/19714.html

成都网站建设公司_创新互联,为您提供网站排名品牌网站建设面包屑导航建站公司虚拟主机软件开发

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联