DeviceManager作为k8s资源类型的扩展方式,在需要为容器配置新的资源类型的时候就需要通过device plugin的方式将device添加到集群里。这篇文章主要是介绍在k8s端用户如何添加device,device添加到集群后如何管理的。

创建DeviceManager

DeviceManager属于containerManager中的一部分,它的初始化也在containerManager初始化的时候。

1
2
3
4
5
6
  if devicePluginEnabled {
		cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
		cm.topologyManager.AddHintProvider(cm.deviceManager)
	} else {
		cm.deviceManager, err = devicemanager.NewManagerStub()
	}

NewManagerImpl初始化deviceManager实体,并添加到topologyManager。numaNodeInfo是通过读取宿主机上的cpu NUMA架构来获取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
	klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)

	if socketPath == "" || !filepath.IsAbs(socketPath) {
		return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
	}

	var numaNodes []int
	for node := range numaNodeInfo {
		numaNodes = append(numaNodes, node)
	}

	dir, file := filepath.Split(socketPath)
	manager := &ManagerImpl{
		endpoints: make(map[string]endpointInfo),

		socketname:            file,
		socketdir:             dir,
		allDevices:            make(map[string]map[string]pluginapi.Device),
		healthyDevices:        make(map[string]sets.String),
		unhealthyDevices:      make(map[string]sets.String),
		allocatedDevices:      make(map[string]sets.String),
		podDevices:            make(podDevices),
		numaNodes:             numaNodes,
		topologyAffinityStore: topologyAffinityStore,
		devicesToReuse:        make(PodReusableDevices),
	}
	manager.callback = manager.genericDeviceUpdateCallback

	// The following structures are populated with real implementations in manager.Start()
	// Before that, initializes them to perform no-op operations.
	manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
	manager.sourcesReady = &sourcesReadyStub{}
	checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
	if err != nil {
		return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
	}
	manager.checkpointManager = checkpointManager

	return manager, nil
}

device和kubelet通过固定的/var/lib/kubelet/device-plugins/kubelet.sock通信,初始化manager callback函数,当device有更新操作都将通过这个callback完成设备的更新操作。
checkPointManager是用来完成设备分配信息的持久化,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type impl struct {
	path  string
	store utilstore.Store
	mutex sync.Mutex
}
// NewCheckpointManager returns a new instance of a checkpoint manager
func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
	fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{})
	if err != nil {
		return nil, err
	}

	return &impl{path: checkpointDir, store: fstore}, nil
}

再详细看一下callback的具体实现,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
	m.mutex.Lock()
	m.healthyDevices[resourceName] = sets.NewString()
	m.unhealthyDevices[resourceName] = sets.NewString()
	m.allDevices[resourceName] = make(map[string]pluginapi.Device)
	for _, dev := range devices {
		m.allDevices[resourceName][dev.ID] = dev
		if dev.Health == pluginapi.Healthy {
			m.healthyDevices[resourceName].Insert(dev.ID)
		} else {
			m.unhealthyDevices[resourceName].Insert(dev.ID)
		}
	}
	m.mutex.Unlock()
	if err := m.writeCheckpoint(); err != nil {
		klog.Errorf("writing checkpoint encountered %v", err)
	}
}

这里会根据device的health状态将device添加到healthyDevicesunhealthyDevices。然后将device信息和使用了devices的pod信息记录在/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint文件中。

启动DeviceManager

在containerManager启动过程中会将deviceManager也启动。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
	klog.V(2).Infof("Starting Device Plugin manager")

	m.activePods = activePods
	m.sourcesReady = sourcesReady

	// Loads in allocatedDevices information from disk.
	err := m.readCheckpoint()
	if err != nil {
		klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
	}

	socketPath := filepath.Join(m.socketdir, m.socketname)
	if err = os.MkdirAll(m.socketdir, 0750); err != nil {
		return err
	}
	if selinux.SELinuxEnabled() {
		if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil {
			klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err)
		}
	}

	// Removes all stale sockets in m.socketdir. Device plugins can monitor
	// this and use it as a signal to re-register with the new Kubelet.
	if err := m.removeContents(m.socketdir); err != nil {
		klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
	}

	s, err := net.Listen("unix", socketPath)
	if err != nil {
		klog.Errorf(errListenSocket+" %v", err)
		return err
	}

	m.wg.Add(1)
	m.server = grpc.NewServer([]grpc.ServerOption{}...)

	pluginapi.RegisterRegistrationServer(m.server, m)
	go func() {
		defer m.wg.Done()
		m.server.Serve(s)
	}()

	klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)

	return nil
}
  • readCheckpoint从kubelet_internal_checkpoint文件读取数据,恢复相关数据,包括了podDevices, allocatedDevices,并将healthDevicesunhealthDevices初始化为空,最后配置endpoints设置endponit的stoptime为当前时间,然后等待deviceplugin进行re-register。
  • removeContents删除/var/lib/kubelet/device-plugin目录下所有的socket文件,从而触发re-register操作。
  • 启动grpc服务

Device注册

启动grpc服务之后,devicep通过Register向kubelet注册device信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
	klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
	metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
	var versionCompatible bool
	for _, v := range pluginapi.SupportedVersions {
		if r.Version == v {
			versionCompatible = true
			break
		}
	}
	if !versionCompatible {
		errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
		klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
		return &pluginapi.Empty{}, fmt.Errorf(errorString)
	}

	if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
		errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
		klog.Infof("Bad registration request from device plugin: %s", errorString)
		return &pluginapi.Empty{}, fmt.Errorf(errorString)
	}

	// TODO: for now, always accepts newest device plugin. Later may consider to
	// add some policies here, e.g., verify whether an old device plugin with the
	// same resource name is still alive to determine whether we want to accept
	// the new registration.
	go m.addEndpoint(r)

	return &pluginapi.Empty{}, nil
}
  • 检查版本信息是否支持;
  • 检查资源名称是否为扩展类型,不能是kubernetes.io/为前缀,不能有request.为前缀,并且名字是符合规范的;
  • addEndpoint注册endpoint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
	new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
	if err != nil {
		klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
		return
	}
	m.registerEndpoint(r.ResourceName, r.Options, new)
	go func() {
		m.runEndpoint(r.ResourceName, new)
	}()
}

初始化endpoint结构,尝试连接endpoint的socket,如果能正常连接则正常进行注册。将endpointInfo加入到m.endpoints结构。最后执行runEndpoint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
	e.run()
	e.stop()

	m.mutex.Lock()
	defer m.mutex.Unlock()

	if old, ok := m.endpoints[resourceName]; ok && old.e == e {
		m.markResourceUnhealthy(resourceName)
	}

	klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}
  • e.run中将通过ListAndWatchgRPC调用获取device plugin的设备更新情况;
    • 将device通过callback即前面的genericDeviceUpdateCallback函数实现更新device的health状态。
  • 如果前面run退出了for循环则会执行stop断开socket链接;
  • 将healthDevice配置为空,然后将之前healthDevice添加到unhealthDevices结构。 到这里就完成了所有的注册流程。

Allocate分配资源

为pod的container分配device

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
	if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
		m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
	}
	// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
	for podUID := range m.devicesToReuse {
		if podUID != string(pod.UID) {
			delete(m.devicesToReuse, podUID)
		}
	}
	// Allocate resources for init containers first as we know the caller always loops
	// through init containers before looping through app containers. Should the caller
	// ever change those semantics, this logic will need to be amended.
	for _, initContainer := range pod.Spec.InitContainers {
		if container.Name == initContainer.Name {
			if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
				return err
			}
			m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
			return nil
		}
	}
	if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
		return err
	}
	m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
	return nil

}
  • 从devicesToReuse中查看是否有该pod的reuse信息,如果devicesToReuse信息不属于pod则把devicesToReuse信息删除;
  • 判断container是否为initContainer,如果是则通过allocateContainerResources为container分配所有的device资源,针对扩展资源只能是request=limit;
    • UpdateAllocatedDevices,获取activePods,把podDevices中不是active的pod分配信息从podDevices取出并从allocatedDevices删除;
    • 执行devicesToAllocate
      • 先获取这个pod的container的这个resource资源的device信息,如果已经分配的部分device,还需要分配的资源量就需要减去已经分配的部分;
      • 为container分配device,默认情况下是取通过available.UnsortedList()[:needed]取可用的前几个device,如果有topologyAffinityStore的配置,则需要按照topology原则分配device;
      • 将上面分配的device加入到allocatedDevices;
    • 根据devicesToAllocate分配的devices调用endpoint的allocategRPC接口分配对应的设备,然后将返回结果和设备分配信息添加到podDevices;
    • writeCheckpoint,将分配资源写入到kubelet_internal_checkpoint文件;
  • 如果是InitContainer还需要把分配出去的device添加到devicesToReuse
  • 如果不是InitContainer则直接调用allocateContainerResources进行device分配,只不过不需要将device信息添加到devicesToReuse

资源上报

kubelet会定期将资源信息上报给apiserver,关于deviceManager部分会调用GetCapacity函数获取device信息。在GetCapacity中,

  • 先查看healthDevices资源量,把资源量添加到capacityallocatable
  • 查看unhealthDevices资源量,更新capacity总量,最后返回数据;
  • 如果device已经删除时间超过5分钟,则需要将资源从endpoint, healthyDevices中删除相关的device信息,最后会将信息更新到kubelet_internal_checkpoint

总结

启动containerManager时也会启动deviceManager,dm通过kubelet.sock启动gRPC服务,device plugin通过Register向kubelet注册device信息,kubelet通过ListAndWatch监听device更新,并将device信息添加到unhealthy和healthy结构,最后dm会通过Allocate对申请的device资源的container分配device资源。