为了保证服务能够正常提供服务,k8s提供了容器健康检查和就需检查功能,目前支持TCP、Http、和命令三种方式, 这里将不会分析所有的流程,只会看健康检查相关的流程。 在kubelet的在syncLoopIteration函数中将会根据pod的操作类型进行相关的处理。这里涉及到的操作主要有AddRestoreRemove。 在Add和Restore中,通过HandlePodAdditions操作,Remove通过HandlePodRemoves实现。接下来将详细分析HandlePodAdditions实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		kl.podManager.AddPod(pod)

		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}

		if !kl.podIsTerminated(pod) {
			// Only go through the admission process if the pod is not
			// terminated.

			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			activePods := kl.filterOutTerminatedPods(existingPods)

			// Check if we can admit the pod; if not, reject it.
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    //这里会对pod进行分发处理,进行pod创建的真正流程,这里不会详细分析
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    //这个是这里的重点
		kl.probeManager.AddPod(pod)
	}
}

这里重点看kl.probeManager.AddPod操作。关注manager结构,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type manager struct {
	// Map of active workers for probes
	workers map[probeKey]*worker
	// Lock for accessing & mutating workers
	workerLock sync.RWMutex

	// The statusManager cache provides pod IP and container IDs for probing.
	statusManager status.Manager

	// readinessManager manages the results of readiness probes
	readinessManager results.Manager

	// livenessManager manages the results of liveness probes
	livenessManager results.Manager

	// startupManager manages the results of startup probes
	startupManager results.Manager

	// prober executes the probe actions.
	prober *prober
}

managerNewMainKubelet中进行实例化,

1
2
3
4
5
6
7
klet.probeManager = prober.NewManager(
		klet.statusManager,
		klet.livenessManager,
		klet.startupManager,
		klet.runner,
		containerRefManager,
		kubeDeps.Recorder)

其中readinessManager,livenessManager,startupManager都是用来存储检查结果的。

1
2
3
4
5
6
7
// NewManager creates and returns an empty results manager.
func NewManager() Manager {
  return &manager{
      cache:   make(map[kubecontainer.ContainerID]Result),
      updates: make(chan Update, 20),
  }
}

statusManager是容器状态管理控制器,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
	return &manager{
		kubeClient:        kubeClient,
		podManager:        podManager,
		podStatuses:       make(map[types.UID]versionedPodStatus),
		podStatusChannel:  make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
		apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
		podDeletionSafety: podDeletionSafety,
	}
}

prober结构则是通过,kubelet中的runtimecontainerRefManagerRecoder实现的。 probeManager的启动是在start。这个到后面再分析,先看probe实现。

1
2
3
4
5
6
7
//kubernetes/pkg/kubelet/prober/probe_manager.go
func (m *manager) Start() {
	// Start syncing readiness.
	go wait.Forever(m.updateReadiness, 0)
	// Start syncing startup.
	go wait.Forever(m.updateStartup, 0)
}

接着分析kl.probeManager.AddPod,在这里会分别为pod中的每个container都创建worker,然后再worker中进行状态检测。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (m *manager) AddPod(pod *v1.Pod) {
	m.workerLock.Lock()
	defer m.workerLock.Unlock()

	key := probeKey{podUID: pod.UID}
	for _, c := range pod.Spec.Containers {
		key.containerName = c.Name

		if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
			key.probeType = startup
			if _, ok := m.workers[key]; ok {
				klog.Errorf("Startup probe already exists! %v - %v",
					format.Pod(pod), c.Name)
				return
			}
			w := newWorker(m, startup, pod, c)
			m.workers[key] = w
			go w.run()
		}

		if c.ReadinessProbe != nil {
			key.probeType = readiness
			if _, ok := m.workers[key]; ok {
				klog.Errorf("Readiness probe already exists! %v - %v",
					format.Pod(pod), c.Name)
				return
			}
			w := newWorker(m, readiness, pod, c)
			m.workers[key] = w
			go w.run()
		}

		if c.LivenessProbe != nil {
			key.probeType = liveness
			if _, ok := m.workers[key]; ok {
				klog.Errorf("Liveness probe already exists! %v - %v",
					format.Pod(pod), c.Name)
				return
			}
			w := newWorker(m, liveness, pod, c)
			m.workers[key] = w
			go w.run()
		}
	}
}

在Add中主要包括这几个步骤。1. 轮询pod中的每个容器;2. 根据每个容器的startupProbeReadinessProbeLivenessProbe三种检查的配置情况创建对应的worker;3. 执行检测w.run()

看其中一种检查类型的配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
			key.probeType = startup
			if _, ok := m.workers[key]; ok {
				klog.Errorf("Startup probe already exists! %v - %v",
					format.Pod(pod), c.Name)
				return
			}
			w := newWorker(m, startup, pod, c)
			m.workers[key] = w
			go w.run()
		}

针对Startup类型的检测会多一个features.StartupProbe的开关,其他两种则没有。先根据检测类型,存储到workers的key。如果这个pod的这个container已经存在,则会直接返回,否则会为这个container创建一个worker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type worker struct {
	// Channel for stopping the probe.
	stopCh chan struct{}

	// The pod containing this probe (read-only)
	pod *v1.Pod

	// The container to probe (read-only)
	container v1.Container

	// Describes the probe configuration (read-only)
	spec *v1.Probe

	// The type of the worker.
	probeType probeType

	// The probe value during the initial delay.
	initialValue results.Result

	// Where to store this workers results.
	resultsManager results.Manager
	probeManager   *manager

	// The last known container ID for this worker.
	containerID kubecontainer.ContainerID
	// The last probe result for this worker.
	lastResult results.Result
	// How many times in a row the probe has returned the same result.
	resultRun int

	// If set, skip probing.
	onHold bool

	// proberResultsMetricLabels holds the labels attached to this worker
	// for the ProberResults metric by result.
	proberResultsSuccessfulMetricLabels metrics.Labels
	proberResultsFailedMetricLabels     metrics.Labels
	proberResultsUnknownMetricLabels    metrics.Labels
}

然后再执行到 w.run(),doProbe(),在doProbe()中主要完成了以下步骤。

  1. 获取pod的status;
  2. 如果pod已经是v1.PodFailed或者v1.PodSuccesseeded两种状态,则停止检测;
  3. 从pod status中获取对应的容器状态,然后根据容器状态做相应的处理,如果容器id和获取的不一样,容器状态不是running,再做相关处理,然后返回,如果是startup类型的检测,并且容器已经启动则返回不再检测;
  4. 执行检测w.probeManager.prober.probe
  5. 根据执行结果设置对应的resultsManager。 这里具体分析一下检测过程。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
	var probeSpec *v1.Probe
	switch probeType {
	case readiness:
		probeSpec = container.ReadinessProbe
	case liveness:
		probeSpec = container.LivenessProbe
	case startup:
		probeSpec = container.StartupProbe
	default:
		return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
	}

	ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
	if probeSpec == nil {
		klog.Warningf("%s probe for %s is nil", probeType, ctrName)
		return results.Success, nil
	}

	result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
	if err != nil || (result != probe.Success && result != probe.Warning) {
		// Probe failed in one way or another.
		if err != nil {
			klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
			pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
		} else { // result != probe.Success
			klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
			pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %v", probeType, output)
		}
		return results.Failure, err
	}
	if result == probe.Warning {
		pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %v", probeType, output)
		klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
	} else {
		klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
	}
	return results.Success, nil
}

在上面将通过pb.runProbeWithRetries再进行检测,之后执行runProbe

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
	timeout := time.Duration(p.TimeoutSeconds) * time.Second
	if p.Exec != nil {
		klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
		command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
		return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
	}
	if p.HTTPGet != nil {
		scheme := strings.ToLower(string(p.HTTPGet.Scheme))
		host := p.HTTPGet.Host
		if host == "" {
			host = status.PodIP
		}
		port, err := extractPort(p.HTTPGet.Port, container)
		if err != nil {
			return probe.Unknown, "", err
		}
		path := p.HTTPGet.Path
		klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
		url := formatURL(scheme, host, port, path)
		headers := buildHeader(p.HTTPGet.HTTPHeaders)
		klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
		switch probeType {
		case liveness:
			return pb.livenessHTTP.Probe(url, headers, timeout)
		case startup:
			return pb.startupHTTP.Probe(url, headers, timeout)
		default:
			return pb.readinessHTTP.Probe(url, headers, timeout)
		}
	}
	if p.TCPSocket != nil {
		port, err := extractPort(p.TCPSocket.Port, container)
		if err != nil {
			return probe.Unknown, "", err
		}
		host := p.TCPSocket.Host
		if host == "" {
			host = status.PodIP
		}
		klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
		return pb.tcp.Probe(host, port, timeout)
	}
	klog.Warningf("Failed to find probe builder for container: %v", container)
	return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

runProbe中将根据检测类型进行具体的检测命令检测,httpGet,TCP检测。根据检测结果在doProbe函数中会记录相邻几次检测的结果。 这里会先从缓存中获取之前的记录,如果结果不一样会触发更新。然后在将结果组合成Update{id,result,pod.UID}的方式放入到resultsManager的update channel。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
	m.Lock()
	defer m.Unlock()
	prev, exists := m.cache[id]
	if !exists || prev != result {
		m.cache[id] = result
		return true
	}
	return false
}

从channel中消费结果则在前面刚开始提到的probe_manager的start()。

1
2
3
4
5
6
7
//kubernetes/pkg/kubelet/prober/probe_manager.go
func (m *manager) Start() {
	// Start syncing readiness.
	go wait.Forever(m.updateReadiness, 0)
	// Start syncing startup.
	go wait.Forever(m.updateStartup, 0)
}

我们分析其中一个Readiness的update操作。

1
2
3
4
5
6
func (m *manager) updateReadiness() {
	update := <-m.readinessManager.Updates()

	ready := update.Result == results.Success
	m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
	m.podStatusesLock.Lock()
	defer m.podStatusesLock.Unlock()

	pod, ok := m.podManager.GetPodByUID(podUID)
	if !ok {
		klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
		return
	}

	oldStatus, found := m.podStatuses[pod.UID]
	if !found {
		klog.Warningf("Container readiness changed before pod has synced: %q - %q",
			format.Pod(pod), containerID.String())
		return
	}

	// Find the container to update.
	containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
	if !ok {
		klog.Warningf("Container readiness changed for unknown container: %q - %q",
			format.Pod(pod), containerID.String())
		return
	}

	if containerStatus.Ready == ready {
		klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
			format.Pod(pod), containerID.String())
		return
	}

	// Make sure we're not updating the cached version.
	status := *oldStatus.status.DeepCopy()
	containerStatus, _, _ = findContainerStatus(&status, containerID.String())
	containerStatus.Ready = ready

	// updateConditionFunc updates the corresponding type of condition
	updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
		conditionIndex := -1
		for i, condition := range status.Conditions {
			if condition.Type == conditionType {
				conditionIndex = i
				break
			}
		}
		if conditionIndex != -1 {
			status.Conditions[conditionIndex] = condition
		} else {
			klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
			status.Conditions = append(status.Conditions, condition)
		}
	}
	updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
	updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
	m.updateStatusInternal(pod, status, false)
}

接下来分析状态更新部分。通过GeneratePodReadyConditionGenerateContainersReadyCondition两个函数计算pod和container的ReadyCondition状态,然后通过m.updateStatusInternal更新状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
  klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
    pod.UID, newStatus.version, newStatus.status)
  return true
default:
  // Let the periodic syncBatch handle the update if the channel is full.
  // We can't block, since we hold the mutex lock.
  klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
    format.Pod(pod), status)
  return false
}

在这里会将更新发送到statusManagerpodStatusChannel中,然后kubelet的statusManager在启动的时候会一直处理这里面的事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *manager) Start() {
	// Don't start the status manager if we don't have a client. This will happen
	// on the master, where the kubelet is responsible for bootstrapping the pods
	// of the master components.
	if m.kubeClient == nil {
		klog.Infof("Kubernetes client is nil, not starting status manager.")
		return
	}

	klog.Info("Starting to sync pod status with apiserver")
	//lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
	syncTicker := time.Tick(syncPeriod)
	// syncPod and syncBatch share the same go routine to avoid sync races.
	go wait.Forever(func() {
		for {
			select {
			case syncRequest := <-m.podStatusChannel:
				klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
					syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
				m.syncPod(syncRequest.podUID, syncRequest.status)
			case <-syncTicker:
				klog.V(5).Infof("Status Manager: syncing batch")
				// remove any entries in the status channel since the batch will handle them
				for i := len(m.podStatusChannel); i > 0; i-- {
					<-m.podStatusChannel
				}
				m.syncBatch()
			}
		}
	}, 0)
}

这里是statusManager处理状态更新事件。通过调用syncPod从etcd中获取现在的pod,然后通过statusutil.PatchPodStatus来patch新的状态。