作者:阳明 2023-03-17 17:51:30
云计算
云原生 先通过 GenericConfig 创建一个名为 Apiextensions-Apiserver 的 GenericServer,GenericServer 提供了一个通用的 Http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。
创新互联建站专注为客户提供全方位的互联网综合服务,包含不限于网站建设、做网站、河北网络推广、微信小程序开发、河北网络营销、河北企业策划、河北品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联建站为所有大学生创业者提供河北建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com
前面我们对 Kube APIServer 的入口点和 go-restful 有一个基础了解后,我们就可以开始来了解下 APIExtensionServer 是如何实例化的了。
APIExtensionServer 的创建流程大致包含以下几个步骤:
三种类型的 Server 底层都需要依赖 GeneriAPIServer,第二步创建的 CustomResourceDefinitions 就是当前类型的 Server 对象,用于后续进行路由注册。APIGroupInfo 是用于每个版本、每个资源类型对应的存储对象。最后调用 InstallAPIGroup
进行路由注册,把每一个资源的版本,类型映射到一个 URI 地址中。代码如下所示:
// cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
// 真正的代码位于:/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New 从指定的配置返回 CustomResourceDefinitions 的新实例
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// APIExtensionsServer 依赖 GenericAPIServer
// 通过 GenericConfig 创建一个名为 apiextensions-apiserver 的 genericServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
// 实例化 CustomResourceDefinitions 对象
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
apiResourceConfig := c.GenericConfig.MergedResourceConfig
// 实例化 APIGroupInfo 对象,APIGroup 指该 server 需要暴露的 API
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 如果开启了 v1 版本,将资源版本、资源、资源存储存放在APIGroupInfo的map中
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}
// 注册API
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
// 初始化 crd clientset 和 informers,用于初始化控制器
crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}
versionDiscoveryHandler := &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
delegate: delegateHandler,
}
groupDiscoveryHandler := &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
// 初始化控制器
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
// 申请handler处理器
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
// ......
)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
// 初始化其他控制器
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
crdClient.ApiextensionsV1(),
crdHandler,
)
// 初始化openapi控制器
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
var openapiv3Controller *openapiv3controller.Controller
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
}
// 将 informer 以及 controller 添加到 PostStartHook 中
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
// 注册hook函数,启动前面实例化的各种controller
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
if s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}
// 启动各种控制器
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}
return nil
})
// ....
return s, nil
}
先通过 GenericConfig 创建一个名为 apiextensions-apiserver 的 genericServer,genericServer 提供了一个通用的 http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。无论是 APIServer 还是 APIExtensionsServer 都依赖于 genericServer,实现方式如下所示:
// vendor/k8s.io/apiserver/pkg/server/config.go
// New 创建了一个新的服务器,该服务器将处理链与传递的服务器逻辑地结合在一起。
// name被用来区分日志记录
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// ...
handlerChainBuilder := func(handler http.Handler) http.Handler {
// BuildHandlerChainFunc 允许你通过装饰 apiHandler 来构建自定义处理程序链
// 目前默认的处理链函数为 DefaultBuildHandlerChain:里面包含了大量默认的处理方式
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
// ......
}
// ......
// 安装一些额外的路由,比如索引全部接口、metrics接口等
installAPI(s, c.Config)
return s, nil
}
// vendor/k8s.io/apiserver/pkg/server/handler.go
// HandlerChainBuilderFn 被用来包装正在使用提供的处理器链 GoRestfulContainer 处理器
// 它通常用于应用过滤,如身份验证和授权
type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler
// 该函数就是用来按照 go-restful 的模式初始化 Container
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
然后实例化 CRD 和 APIGroupInfo,其中的 APIGroupInfo 对象用于描述资源组信息,一个资源对应一个 APIGroupInfo 对象,每个资源对应一个资源存储对象:
// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
return APIGroupInfo{
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
// 这个map用于存储资源、资源存储对象的映射关系
// 格式:资源版本/资源/资源存储对象
// 资源存储对象 RESTStorage,负责资源的CRUD
// 后续将 RESTStorage 转换为 http 的 handler 函数
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
Scheme: scheme,
ParameterCodec: parameterCodec,
NegotiatedSerializer: codecs,
}
}
然后需要注册 APIGroupInfo,通过 s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo) 来实现的,将 APIGroupInfo 中的资源对象注册到APIExtensionServerHandler 函数。其过程是:
详细的代码如下所示:
// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go
// 在API中暴露指定的 APIGroup
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
return s.InstallAPIGroups(apiGroupInfo)
}
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
// ...
// 获取 OpenAPI 模型
openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
// 遍历所有的资源信息,一次安装资源版本处理器
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
// ...
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}
// installAPIResources 用于安装 RESTStorage 以支持每个 api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
// ...
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
// 核心就是调用 InstallREST,参数为go-restful的container对象
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
// ...
resourceInfos = append(resourceInfos, r...)
}
// ...
return nil
}
// InstallREST 将 REST handlers (storage, watch, proxy and redirect) 注册到 restful 容器中。
// 预期提供的路径 root 前缀将服务于所有操作,root 不能以斜线结尾。
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 比如从 InstallAPI 调用链下来这里的 g.Root 为 /apis,这样就可以确定 handler 的前缀为 /apis/{goup}/{version}
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
// 实例化 APIInstaller 对象
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 调用 Install 函数:注册 api,返回 go-restful 的 WebService 对象
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
// 将 WebService 添加到 Container 中,这需要了解 go-restful 框架的知识
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
上面整个过程都是为了注册 API 来做的准备,核心是在 installer.Install() 函数中,该函数就是将 API 资源添加处理器的。
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
// 使用api installer 的前缀和版本创建一个新的 restful webservice 对象
// 这部分都属于 go-restful 框架的用法
func (a *APIInstaller) newWebService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(a.prefix)
// a.prefix 包含 "prefix/group/version"
ws.Doc("API at " + a.prefix)
ws.Consumes("*/*")
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
ws.Produces(append(mediaTypes, streamMediaTypes...)...)
ws.ApiVersion(a.group.GroupVersion.String())
return ws
}
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
// 新建一个 WebService 对象(go-restful框架中的)
ws := a.newWebService()
// 将 paths 排序
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
// 获取swagger spec规范
for _, path := range paths {
// 将 Storage 转换成 Router,然后将路由注册到 webservice
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
if resourceInfo != nil {
resourceInfos = append(resourceInfos, resourceInfo)
}
}
return apiResources, resourceInfos, ws, errors
}
这里最重要的就是 registerResourceHandlers 函数了,这个方法很长,核心功能是根据 storage 构造 handler,再将 handler 和 path 构造成 go-restful 框架的 Route 对象,最后 Route 添加到 webservice。
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
// ......
// 是否是命名空间级别
var namespaceScoped bool
// ......
// storage 支持哪些 verbs 操作,用于了解每个 path 所支持的操作。
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
// ......
// 获取指定范围的操作列表
switch {
case !namespaceScoped:
// 处理非命名空间范围的资源,如节点
// ......
// 添加 actions 到资源路径:/api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// ......
default:
// 命名空间级别的资源对象
namespaceParamName := "namespaces"
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}
// ......
// 构造 action 列表
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
// ......
}
// 为 actions 创建 Routes 路由
// 配置go-restful产生的MIME类型
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
allMediaTypes := append(mediaTypes, streamMediaTypes...)
ws.Produces(allMediaTypes...)
// ...
for _, action := range actions {
// ......
// 构造 go-restful 的 RouteBuilder 对象
routes := []*restful.RouteBuilder{}
// 如果是子资源,kind应该是prent的kind
if isSubresource {
parentStorage, ok := a.group.Storage[resource]
fqParentKind, err := GetResourceKind(a.group.GroupVersion, parentStorage, a.group.Typer)
kind = fqParentKind.Kind
}
// 根据不同的 Verb,注册到不同的 handler 中去
switch action.Verb {
case "GET":
var handler restful.RouteFunction // go-restful的处理器
// 初始化handler
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
//...
// 构造 route(这都属于go-restful框架的使用方法)
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
// 将route添加到routes
addParams(route, action.Params)
routes = append(routes, route)
// ... 其他verb处理方式基本一致
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
// ......
}
// 遍历路由,加入到 WebService 中
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
// 将route加入到WebService中去
ws.Route(route)
}
}
// ......
return &apiResource, resourceInfo, nil
}
registerResourceHandlers 函数很长,但是我们可以先抛开细节,整体上了解下,可以看到就是先通过 Storage 判断支持哪些 Verbs 操作,然后生成 actions 列表,然后将每个 action 构造路由列表,最后也是将这些路由添加到 go-restful 的 WebService 中去,这里构造的路由绑定的处理器实现方式路由不同,比如 GET 方式的 handler 是通过 restfulGetResource 来实例化的,POST 方式的是通过 restfulCreateResource 来实例化的,实现方式基本差不多。
GET 方式 handler 函数 restfulGetResource 实现如下所示:
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func restfulGetResource(r rest.Getter, scope handlers.RequestScope) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.GetResource(r, &scope)(res.ResponseWriter, req.Request)
}
}
restfulGetResource 函数得到的就是 restful.RouteFunction,这是 go-restful 的方式,真正去处理的是 handlers.GetResource
这个函数,这个函数里面调用的是 getResourceHandler,该函数返回的就是一个 http 标准库 handler 函数,处理对应的路由请求。
GET 请求的处理过程比较简单,通过请求的查询构造出一个 metav1.GetOptions,然后交给 Getter 接口处理,最后将查询结果转换后返回给请求者。
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
// GetResource 返回一个函数,该函数处理从 rest.Storage 对象中检索单个资源的操作。
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// 初始化需要的 GetOptions
options := metav1.GetOptions{}
// 获取查询的参数
if values := req.URL.Query(); len(values) > 0 {
// ...
// 将查询的参数进行解码,编程 GetOptions 对象
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
// 然后使用 Getter 接口来处理
return r.Get(ctx, name, &options)
})
}
// getResourceHandler 是用于获取请求的 HTTP Handler函数
// 它委托给传入的 getterFunc 来执行实际的 get
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// ...
namespace, name, err := scope.Namer.Name(req)
/// ...
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
// ...
// 使用 getterFunc 来执行实际的 get 操作,
result, err := getter(ctx, name, req, trace)
// ...
// 将结果转换为用户需要的格式返回给用户
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
}
POST 的处理器也是类似的,对应的逻辑在 restfulCreateResource 中:
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
同样真正去处理的是 handlers.CreateResource 这个函数,这个函数里面调用的是 createHandler,该函数返回的就是一个 http 标准库 handler 函数,处理对应的路由请求。
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
// CreateResource 返回将处理资源创建的函数
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
createHandler 的实现代码比较长,主要做了一下几件事:
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// 返回一个 http handler 函数,处理对应的路由请求
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
// 标准 http handler 函数
return func(w http.ResponseWriter, req *http.Request) {
// ...
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
// 找到合适的 Serializer
gv := scope.Kind.GroupVersion()
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
// 将请求解码成 CreateOptions
options := &metav1.CreateOptions{}
values := req.URL.Query()
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
// ...
}
// ...
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
defaultGVK := scope.Kind
original := r.New()
// ...
// 找到合适的解码器
decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)
// 请请求体 body 进行解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
ctx = request.WithNamespace(ctx, namespace)
// 审计、准入、请求日志记录
ae := audit.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
audit.LogRequestObject(req.Context(), obj, objGV, scope.Resource, scope.Subresource, scope.Serializer)
userInfo, _ := request.UserFrom(ctx)
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
// 处理请求
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
// ...
// 执行准入控制的mutate操作,就是在创建对象的时候进行修改
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// ......
// 调用创建方法
result, err := requestFunc()
return result, err
})
code := http.StatusCreated
status, ok := result.(*metav1.Status)
if ok && 网站名称:APIServer源码分析之路由注册
本文来源:http://www.hantingmc.com/qtweb/news14/19714.html成都网站建设公司_创新互联,为您提供网站排名、品牌网站建设、面包屑导航、建站公司、虚拟主机、软件开发
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联