istio原理分析:pilot服务发现

istio原理分析:pilot服务发现

简要分析istio两大核心组件之一pilot-discovery部分的原理。

源码分析基于istio 1.1.4

概要说明:

  • k8sServiceController使用List/Watch获取和维护service、node、pod和其他的资源对象,将k8s的数据结构转换为istio服务发现的数据结构;
  • DiscoveryService基于Controller上获取的服务发现数据,提供HTTP/gRPC协议的服务供Envoy使用。
  • 当k8sServiceController监听到k8s中service、endpoint等变化时,通知DiscoveryServer。DiscoveryServer通过长链接将变更后的配置信息通过rpc协议push到envoy中。

一句话总结:

通过controller -> queue -> handler -> envoy数据传递,将k8s apiserver的变化传递到pilot的服务发现代码,最终传递envoy服务中。

关键代码目录:

  • /pilot/pkg/serviceregistry/ 使用适配各种平台的controller
  • /pilot/pkg/serviceregistry/kube 适配k8s平台。
  • /pilot/pkg/proxy/envoy/discovery.go 实现envoy xds服务

服务发现的基础接口,所有适配istio的平台必须实现下面几个接口:

 /pilot/pkg/module/service.go:
type ServiceDiscovery interface {
    //获取系统中所有的服务
    Services() ([]*Service, error)
    //仅用于测试,获取指定域名的服务
    GetService(hostname Hostname) (*Service, error)
    //根据服务名、端口和label获取服务
    InstancesByPort(hostname Hostname, servicePort int, labels LabelsCollection) ([]*ServiceInstance, error)
    // 返回和proxy共存的的服务 (如k8s中同一个pod内)
    // 对于单独的proxy返回空
    GetProxyServiceInstances(*Proxy) ([]*ServiceInstance, error)
    GetProxyWorkloadLabels(*Proxy) (LabelsCollection, error)
    // 返回指定ipv4地址的管理端口。这些管理端口用于健康检查等等。
    ManagementPorts(addr string) PortList
    // 用于健康检查
    WorkloadHealthCheckInfo(addr string) ProbeList
    // Deprecated
    GetIstioServiceAccounts(hostname Hostname, ports []int) []string
}

pilot-discovery 关键启动代码如下

/pilot/cmd/pilot-discovery/main.go:
// Create the server for the discovery service.
 discoveryServer, err := bootstrap.NewServer(serverArgs)
 if err != nil {
     return fmt.Errorf("failed to create discovery service: %v", err)
 }
 // Start the server
 _, err = discoveryServer.Start(stop)
 if err != nil {
     return fmt.Errorf("failed to start discovery service: %v", err)
 }

NewServer函数实现如下:

初始化监听配置变化的controller、平台服务发现的controller

 /pilot/pkg/bootstrap/server.go:
/ NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
    ...
    //初始处理istio规则的controller(在k8s平台下实际上监听istio CRD对象的变化)
    if err := s.initConfigController(&args); err != nil {
        return nil, err
    }
    //初始化发现平台服务变化的controller (在k8s平台下实际上监听service pods nodes的变化)
    if err := s.initServiceControllers(&args); err != nil {
        return nil, err
    }
    // 使用上面两个controller的信息,实现envoy data plan
    if err := s.initDiscoveryService(&args); err != nil {
        return nil, err
    }
    ...
    return s, nil
}
  • 在k8s平台下,上面的initConfigController()、initServiceControllers(),
    调用/pilot/pkg/serviceregistry/kube/controller.go中的NewController(),创建k8s的controller;
  • initDiscoveryService()调用/pilot/pkg/proxy/envoy/discovery.go的NewDiscoveryService(),创建服务于envoy的ds服务。

下面看一下创建k8s Controller的代码:

/pilot/pkg/serviceregistry/kube/controller.go:
//典型k8s controller代码,编写k8s controller都按这个套路
// NewController creates a new Kubernetes controller
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
    ...
    sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
    svcInformer := sharedInformers.Core().V1().Services().Informer()
//监听Service
    out.services = out.createCacheHandler(svcInformer, "Services")
    epInformer := sharedInformers.Core().V1().Endpoints().Informer()
//监听Endpoint
    out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
    nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
//监听Node
    out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
    podInformer := sharedInformers.Core().V1().Pods().Informer()
//监听Pod
    out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
    return out
}

监听事件实际处理函数:

 //   createCacheHanlder 注册指定时间的处理函数
// 用于监听Service、Endpoint、Node和Pod的变化,istio CRD, Ingress变化。
//  为了提高系统的稳定性,istio将监听的变化信息先写入内部队列,然后后续,读取队列的信息,进行进一步处理。
//  本函数实现了监控发现信息变化、将写入队列的操作。
func (c *Controller) createCacheHandler(informer cache.SharedIndexInformer, otype string) cacheHandler {
    handler := &ChainHandler{funcs: []Handler{c.notify}}
    informer.AddEventHandler(
        //处理资源添加、更新、删除的事件
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                //将事件放入pilot自己处理队列,进行稍后处理和流控;queue的处理流程将会调用handler.Apply函数
                c.queue.Push(Task{handler: handler.Apply, obj: obj, event: model.EventAdd})
            },
            UpdateFunc: func(old, cur interface{}) {
                //省略部分代码
                }
            },
            DeleteFunc: func(obj interface{}) {    
//省略部分代码  
            },
        })
    return cacheHandler{informer: informer, handler: handler}
}

/pilot/pkg/serviceregistry/kube/queue.go 内部队列,用于排队处理前面的各种资源变化。

使用如下函数向队列添加事件的处理handler

 /pilot/pkg/serviceregistry/kube/controller.go  
// AppendInstanceHandler implements a service catalog operation
func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
    if c.endpoints.handler == nil {
        return nil
    }
    //调用ChainHandler.Append函数,在关心的事件中增加处理函数
    c.endpoints.handler.Append(func(obj interface{}, event model.Event) error {
     ...

给envoy 实现数据平面的初始化代码如下:

/pilot/pkg/proxy/envoy/discovery.go:
/ NewDiscoveryService creates an Envoy discovery service on a given port
func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache,
    environment *model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) {
    //...
    // Flush cached discovery responses whenever services, service
    // instances, or routing configuration changes.
     //out.clearCache()会触发配置推送下发的动作
    serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
   // 注册队列消息的处理hanler,
    // 通过这里可以感知到上面Controller写入的队列消息变化
    if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
        return nil, err
    }
    instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
// 注册队列消息的处理hanler
    if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
        return nil, err
    }
    //...
kuaikuai

老程序员一名