本文共 10424 字,大约阅读时间需要 34 分钟。
以下是 k8s 的整体架构,在 master 节点上主要是 kube-apiserver(整合了 kube-aggregator),还有 kube-scheduler,以及 kube-controller-manager,包括后端存储 etcd。
其中 kube-apiserver 是一个比较关键的部分,而且前期写得坑很多,导致这一部分虽然看起来是一个 API server 其实代码很复杂,特别冗余,而且目前对 kube-apiserver 还要做拆分,能够支持插入第三方的 apiserver,也就是又一个 aggregated apiserver 的 feature,也是和 kube-apiserver 和里面包的一层 genericserver 揉合在一起了,感觉一个大的系统 API server 越写越挫是一个通病,还好现在 k8s 迷途知返正在调整。
Kube-apiserver 可以是认为在 generic server 上封装的一层官方默认的 apiserver,有第三方需要的情况下,自己也可以在 generic server 上封装一层加入到集成模式中,这里主要介绍 kube-apiserver 的结构。
kube-apiserver 是一个 restful 服务,请求直接通过 HTTP 请求发送,例如创建一个 ubuntu 的 pod,用以下的 pod.yaml 文件。
apiVersion: v1kind: Podmetadata: name: ubuntu1 labels: name: ubuntu1spec: containers: - name: ubuntu1 image: ubuntu command: ["sleep", "1d"]
执行命令 kubectl create -f ./pod.yaml -v=8,可以看到对应的 POST 请求如下。
Request Body: { "apiVersion":"v1","kind":"Pod","metadata":{ "labels":{ "name":"ubuntu1"},"name":"ubuntu1","namespace":"default"},"spec":{ "containers":[{ "command":["sleep","1d"],"image":"ubuntu","name":"ubuntu1"}],"schedulerName":"default-scheduler"}}curl -k -v -XPOST -H "Content-Type: application/json" -H "Accept: application/json" -H "User-Agent: kubectl/v1.7.5 (linux/amd64) kubernetes/17d7182" https://localhost:6443/api/v1/namespaces/default/podsPOST https://localhost:6443/api/v1/namespaces/default/pods 201 Created in 6 milliseconds Response Headers: Content-Type: application/json Content-Length: 1208 Date: Wed, 18 Oct 2017 15:04:17 GMTResponse Body: { "kind":"Pod","apiVersion":"v1","metadata":{ "name":"ubuntu1","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/ubuntu1","uid":"9c9af581-b415-11e7-8033-024d1ba659e8","resourceVersion":"486154","creationTimestamp":"2017-10-18T15:04:17Z","labels":{ "name":"ubuntu1"}},"spec":{ "volumes":[{ "name":"default-token-p0980","secret":{ "secretName":"default-token-p0980","defaultMode":420}}],"containers":[{ "name":"ubuntu1","image":"ubuntu","command":["sleep","1d"],"resources":{},"volumeMounts":[{ "name":"default-token-p0980","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{},"schedulerName":"default-scheduler","tolerations":[{ "key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{ "key":"node.alpha.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}]},"status":{ "phase":"Pending","qosClass":"BestEffort"}}
从 url path 里面可以看到几个划分,path 的分类大概有下面这几种。
路径上整体分成 group, version, resource, 作为核心 API group 的 core(包括 pod, node 之类的 resource),不带 group,直接接在 /api/ 后面,其他的 api group 则接在 /apis 后面。以 pod 为例,pod 对应的数据类型如下,这个数据结构和 POST 请求中的结构的参数是一致的。
如果是 job 的话则是在,pkg/apis/batch/v2alpha1/types.go,和 API 路径是对应的。例子当中 kubectl 加上 level 大于 8 的 log 就会打印请求和相应的 body,可以看到 request body 和上面的数据结构是一致的。这个请求会发送到 apiserver 进行处理并且返回存储之后的 pod。
父结构,主要的配置内容,其中有一个结构 RESTOptionsGetter genericregistry.RESTOptionsGetter 是和 API 初始化相关的,这个接口的实现是在 k8s.io/apiserver/pkg/server/options/etcd.go 中的 storageFactoryRestOptionsFactory 实现的,对应的实现函数是
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { storageConfig, err := f.StorageFactory.NewConfig(resource) if err != nil { return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) } ret := generic.RESTOptions{ StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, EnableGarbageCollection: f.Options.EnableGarbageCollection, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) if err != nil { return generic.RESTOptions{}, err } cacheSize, ok := sizes[resource] if !ok { cacheSize = f.Options.DefaultWatchCacheSize } ret.Decorator = genericregistry.StorageWithCacher(cacheSize) } return ret, nil }
APIGroupInfo 主要定义了一个 API 组的相关信息,观察一下 APIGroupInfo 是如何初始化的。
在 k8s.io/pkg/master/master.go 当中,每个 Resource 都要提供自己的 Provider,比如说 storagerest 就在 k8s.io/kubernetes/pkg/registry/storage/rest/storage_storage.go 定义了 NewRESTStorage 方法。而默认的 resource 的 legacy provider 单独处理。
if c.ExtraConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.ExtraConfig.StorageFactory, ProxyTransport: c.ExtraConfig.ProxyTransport, KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, EventTTL: c.ExtraConfig.EventTTL, ServiceIPRange: c.ExtraConfig.ServiceIPRange, ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, } m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider) }
然后通过调用 k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider 的 NewLegacyRESTStorage 来初始化基础对象的 apigroup info,比如初始化 podStorage,serviceStorage 和 nodeStorage 等等。legacy ApiGrouInfo 的 Scheme, ParamaterCodec, NegotiatedSerializer 都是用 “k8s.io/kubernetes/pkg/api” 包下的全局变量初始化的。
Scheme: api.Scheme, ParameterCodec: api.ParameterCodec, NegotiatedSerializer: api.Codecs,
然后合并成一个 restStorage 存入 apiGroupInfo 中。
restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, ...
举个例子 podStorage 就是用的 genericregistry.Store,这是一个通用的 etc 辅助结构,把 etcd 抽象成存储结构。
// REST implements a RESTStorage for podstype REST struct { *genericregistry.Store proxyTransport http.RoundTripper }
pkg/api.Codecs 是全局默认的 codec 来自下面这段代码。
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory { serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory) return newCodecFactory(scheme, serializers) }
默认具体定义了这几种 serilizer。
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType { jsonSerializer := json.NewSerializer(mf, scheme, scheme, false) jsonPrettySerializer := json.NewSerializer(mf, scheme, scheme, true) yamlSerializer := json.NewYAMLSerializer(mf, scheme, scheme) ...
而且标准库的 json 有很严重的性能问题,换用了 json-iter 但是有很多标准库不兼容的问题,性能提升了大概 20% 但是没办法和进主线,我尝试在上面工作的了一段时间,改了两个问题还是有错,由于时间关系,暂时放弃了这个工作,相关的 issue 在
首先通过 ./staging/src/k8s.io/apiserver/pkg/server/config.go 下的 DefaultBuildHandlerChain 构建 filters。
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer, c.Serializer) handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer, c.Serializer) if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) { handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc) } else { handler = genericapifilters.WithLegacyAudit(handler, c.RequestContextMapper, c.LegacyAuditWriter) } failedHandler := genericapifilters.Unauthorized(c.RequestContextMapper, c.Serializer, c.SupportsBasicAuth) if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) { failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker) } handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) handler = genericfilters.WithPanicRecovery(handler) return handler}
genericfilters.WithPanicRecovery 在 handler 的最外层对出现的 panic 恢复,并且打印每次请求的 log,所以你想观察 API 请求的情况可以 grep wrap.go 就能看到。
apirequest.WithRequestContext 给 request 绑定一个 Context
跟路 url 提取后续请求需要的 group, version, namespace, verb, resource 等信息。
限制 API 调用时间,超时处理提前终止 write。
允许跨域访问。
在 k8s.io/apiserver/pkg/endpoints/filters/authentication.go 下。WithAuthentication 插入鉴权信息,例如证书鉴权,token 鉴权等,并且从鉴权信息当中获取 user 信息(可能是 service account 也可能是外部用户)user 身份是由 里面的几种方式确认的
检查是否有权限进行对应资源的操作。一种是 RBAC 一种是 Node。具体这两种方式可以看这个,RBAC 主要是针对服务的,而 Node 模式主要是针对 kubelet 的。
让用户伪装成其他用户,比如 admin 可以用普通用户的身份创建资源。
通过 genericapiserver 的 InstallLegacyAPIGroup 就注册到路由当中。具体的做法就是根据 version, resource, sub resource, verb 等信息构造路由,然后用 go-restful 注册处理函数。比如说 GET
route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...). Returns(http.StatusOK, "OK", producedObject). Writes(producedObject)
handler 里面做的内容就是序列化,然后根据具体的要求(GET DELETE 等)到 etcd 中操作,当然本身还有一层缓存,这取决于 API 的 options 是希望更新还是直接读缓存(缓存会比 etcd 旧一些),比如对于 kubelet 会不断查询 node 信息,但是 kubelet 本身并不需要最新的信息,这个时候就会从缓存中读取。
开启代理 kubectl proxy,就可以通过 localhost 直接访问 kube-apiserver HTTP 服务。然后执行 go tool pprof http://localhost:8001/debug/pprof/profile 可以获得 profile 结果,下图红色的部分就是调用耗时最多的部分。
除此之外,kube-apiserver 本身也暴露了很多 prometheus 的 metrics 但是往上现在没有现成的模板,只能根据自己的需求来在 prometheus 当作做 query。可以在 k8s.io/apiserver/pkg/endpoints/metrics/metrics.go 里面看到。
之前也说过,超时间调用时会打 log 的,在代码中保存了一些 trace 日志,可以通过 grep Trace来过滤。Trace[%d] 这样开头, %d 是一个 id 可以看到具体的 trace 信息。
本文转自kubernetes中文社区-
转载地址:http://roazx.baihongyu.com/