istio原理分析:pilot服务发现
简要分析istio两大核心组件之一pilot-discovery部分的原理。
源码分析基于istio 1.1.4

概要说明:
- k8sServiceController使用List/Watch获取和维护service、node、pod和其他的资源对象,将k8s的数据结构转换为istio服务发现的数据结构;
- DiscoveryService基于Controller上获取的服务发现数据,提供HTTP/gRPC协议的服务供Envoy使用。
- 当k8sServiceController监听到k8s中service、endpoint等变化时,通知DiscoveryServer。DiscoveryServer通过长链接将变更后的配置信息通过rpc协议push到envoy中。
一句话总结:
通过controller -> queue -> handler -> envoy数据传递,将k8s apiserver的变化传递到pilot的服务发现代码,最终传递envoy服务中。
关键代码目录:
- /pilot/pkg/serviceregistry/ 使用适配各种平台的controller
- /pilot/pkg/serviceregistry/kube 适配k8s平台。
- /pilot/pkg/proxy/envoy/discovery.go 实现envoy xds服务
服务发现的基础接口,所有适配istio的平台必须实现下面几个接口:
/pilot/pkg/module/service.go:
type ServiceDiscovery interface {
//获取系统中所有的服务
Services() ([]*Service, error)
//仅用于测试,获取指定域名的服务
GetService(hostname Hostname) (*Service, error)
//根据服务名、端口和label获取服务
InstancesByPort(hostname Hostname, servicePort int, labels LabelsCollection) ([]*ServiceInstance, error)
// 返回和proxy共存的的服务 (如k8s中同一个pod内)
// 对于单独的proxy返回空
GetProxyServiceInstances(*Proxy) ([]*ServiceInstance, error)
GetProxyWorkloadLabels(*Proxy) (LabelsCollection, error)
// 返回指定ipv4地址的管理端口。这些管理端口用于健康检查等等。
ManagementPorts(addr string) PortList
// 用于健康检查
WorkloadHealthCheckInfo(addr string) ProbeList
// Deprecated
GetIstioServiceAccounts(hostname Hostname, ports []int) []string
}
pilot-discovery 关键启动代码如下
/pilot/cmd/pilot-discovery/main.go:
// Create the server for the discovery service.
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
// Start the server
_, err = discoveryServer.Start(stop)
if err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}
NewServer函数实现如下:
初始化监听配置变化的controller、平台服务发现的controller
/pilot/pkg/bootstrap/server.go:
/ NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
...
//初始处理istio规则的controller(在k8s平台下实际上监听istio CRD对象的变化)
if err := s.initConfigController(&args); err != nil {
return nil, err
}
//初始化发现平台服务变化的controller (在k8s平台下实际上监听service pods nodes的变化)
if err := s.initServiceControllers(&args); err != nil {
return nil, err
}
// 使用上面两个controller的信息,实现envoy data plan
if err := s.initDiscoveryService(&args); err != nil {
return nil, err
}
...
return s, nil
}
- 在k8s平台下,上面的initConfigController()、initServiceControllers(),
调用/pilot/pkg/serviceregistry/kube/controller.go中的NewController(),创建k8s的controller; - initDiscoveryService()调用/pilot/pkg/proxy/envoy/discovery.go的NewDiscoveryService(),创建服务于envoy的ds服务。
下面看一下创建k8s Controller的代码:
/pilot/pkg/serviceregistry/kube/controller.go:
//典型k8s controller代码,编写k8s controller都按这个套路
// NewController creates a new Kubernetes controller
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
...
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
svcInformer := sharedInformers.Core().V1().Services().Informer()
//监听Service
out.services = out.createCacheHandler(svcInformer, "Services")
epInformer := sharedInformers.Core().V1().Endpoints().Informer()
//监听Endpoint
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
//监听Node
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
podInformer := sharedInformers.Core().V1().Pods().Informer()
//监听Pod
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
return out
}
监听事件实际处理函数:
// createCacheHanlder 注册指定时间的处理函数
// 用于监听Service、Endpoint、Node和Pod的变化,istio CRD, Ingress变化。
// 为了提高系统的稳定性,istio将监听的变化信息先写入内部队列,然后后续,读取队列的信息,进行进一步处理。
// 本函数实现了监控发现信息变化、将写入队列的操作。
func (c *Controller) createCacheHandler(informer cache.SharedIndexInformer, otype string) cacheHandler {
handler := &ChainHandler{funcs: []Handler{c.notify}}
informer.AddEventHandler(
//处理资源添加、更新、删除的事件
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
//将事件放入pilot自己处理队列,进行稍后处理和流控;queue的处理流程将会调用handler.Apply函数
c.queue.Push(Task{handler: handler.Apply, obj: obj, event: model.EventAdd})
},
UpdateFunc: func(old, cur interface{}) {
//省略部分代码
}
},
DeleteFunc: func(obj interface{}) {
//省略部分代码
},
})
return cacheHandler{informer: informer, handler: handler}
}
/pilot/pkg/serviceregistry/kube/queue.go 内部队列,用于排队处理前面的各种资源变化。
使用如下函数向队列添加事件的处理handler
/pilot/pkg/serviceregistry/kube/controller.go
// AppendInstanceHandler implements a service catalog operation
func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
if c.endpoints.handler == nil {
return nil
}
//调用ChainHandler.Append函数,在关心的事件中增加处理函数
c.endpoints.handler.Append(func(obj interface{}, event model.Event) error {
...
给envoy 实现数据平面的初始化代码如下:
/pilot/pkg/proxy/envoy/discovery.go:
/ NewDiscoveryService creates an Envoy discovery service on a given port
func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache,
environment *model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) {
//...
// Flush cached discovery responses whenever services, service
// instances, or routing configuration changes.
//out.clearCache()会触发配置推送下发的动作
serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
// 注册队列消息的处理hanler,
// 通过这里可以感知到上面Controller写入的队列消息变化
if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
return nil, err
}
instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
// 注册队列消息的处理hanler
if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
return nil, err
}
//...
发表评论