DeviceManager作为k8s资源类型的扩展方式,在需要为容器配置新的资源类型的时候就需要通过device plugin的方式将device添加到集群里。这篇文章主要是介绍在k8s端用户如何添加device,device添加到集群后如何管理的。
创建DeviceManager
DeviceManager属于containerManager中的一部分,它的初始化也在containerManager初始化的时候。
1
2
3
4
5
6
|
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}
|
先NewManagerImpl
初始化deviceManager实体,并添加到topologyManager。numaNodeInfo
是通过读取宿主机上的cpu NUMA架构来获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
}
var numaNodes []int
for node := range numaNodeInfo {
numaNodes = append(numaNodes, node)
}
dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
socketname: file,
socketdir: dir,
allDevices: make(map[string]map[string]pluginapi.Device),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
// The following structures are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
manager.checkpointManager = checkpointManager
return manager, nil
}
|
device和kubelet通过固定的/var/lib/kubelet/device-plugins/kubelet.sock
通信,初始化manager callback函数,当device有更新操作都将通过这个callback完成设备的更新操作。
checkPointManager
是用来完成设备分配信息的持久化,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type impl struct {
path string
store utilstore.Store
mutex sync.Mutex
}
// NewCheckpointManager returns a new instance of a checkpoint manager
func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{})
if err != nil {
return nil, err
}
return &impl{path: checkpointDir, store: fstore}, nil
}
|
再详细看一下callback的具体实现,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices {
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
if err := m.writeCheckpoint(); err != nil {
klog.Errorf("writing checkpoint encountered %v", err)
}
}
|
这里会根据device的health
状态将device添加到healthyDevices
或unhealthyDevices
。然后将device信息和使用了devices的pod信息记录在/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint
文件中。
启动DeviceManager
在containerManager启动过程中会将deviceManager也启动。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
klog.V(2).Infof("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
if err != nil {
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
socketPath := filepath.Join(m.socketdir, m.socketname)
if err = os.MkdirAll(m.socketdir, 0750); err != nil {
return err
}
if selinux.SELinuxEnabled() {
if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil {
klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err)
}
}
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
}
s, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf(errListenSocket+" %v", err)
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterRegistrationServer(m.server, m)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()
klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
return nil
}
|
readCheckpoint
从kubelet_internal_checkpoint文件读取数据,恢复相关数据,包括了podDevices
, allocatedDevices
,并将healthDevices
和unhealthDevices
初始化为空,最后配置endpoints设置endponit的stoptime为当前时间,然后等待deviceplugin进行re-register。
removeContents
删除/var/lib/kubelet/device-plugin
目录下所有的socket文件,从而触发re-register操作。
- 启动grpc服务
Device注册
启动grpc服务之后,devicep通过Register向kubelet注册device信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
var versionCompatible bool
for _, v := range pluginapi.SupportedVersions {
if r.Version == v {
versionCompatible = true
break
}
}
if !versionCompatible {
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
klog.Infof("Bad registration request from device plugin: %s", errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
// TODO: for now, always accepts newest device plugin. Later may consider to
// add some policies here, e.g., verify whether an old device plugin with the
// same resource name is still alive to determine whether we want to accept
// the new registration.
go m.addEndpoint(r)
return &pluginapi.Empty{}, nil
}
|
- 检查版本信息是否支持;
- 检查资源名称是否为扩展类型,不能是
kubernetes.io/
为前缀,不能有request.
为前缀,并且名字是符合规范的;
addEndpoint
注册endpoint
1
2
3
4
5
6
7
8
9
10
11
|
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
if err != nil {
klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
m.registerEndpoint(r.ResourceName, r.Options, new)
go func() {
m.runEndpoint(r.ResourceName, new)
}()
}
|
初始化endpoint结构,尝试连接endpoint的socket,如果能正常连接则正常进行注册。将endpointInfo加入到m.endpoints结构。最后执行runEndpoint
。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
e.run()
e.stop()
m.mutex.Lock()
defer m.mutex.Unlock()
if old, ok := m.endpoints[resourceName]; ok && old.e == e {
m.markResourceUnhealthy(resourceName)
}
klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}
|
- e.run中将通过
ListAndWatch
gRPC调用获取device plugin的设备更新情况;
- 将device通过callback即前面的
genericDeviceUpdateCallback
函数实现更新device的health状态。
- 如果前面run退出了for循环则会执行stop断开socket链接;
- 将healthDevice配置为空,然后将之前healthDevice添加到unhealthDevices结构。
到这里就完成了所有的注册流程。
Allocate分配资源
为pod的container分配device
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
for podUID := range m.devicesToReuse {
if podUID != string(pod.UID) {
delete(m.devicesToReuse, podUID)
}
}
// Allocate resources for init containers first as we know the caller always loops
// through init containers before looping through app containers. Should the caller
// ever change those semantics, this logic will need to be amended.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
}
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
|
- 从devicesToReuse中查看是否有该pod的reuse信息,如果devicesToReuse信息不属于pod则把devicesToReuse信息删除;
- 判断container是否为initContainer,如果是则通过
allocateContainerResources
为container分配所有的device资源,针对扩展资源只能是request=limit;
- UpdateAllocatedDevices,获取activePods,把podDevices中不是active的pod分配信息从podDevices取出并从allocatedDevices删除;
- 执行
devicesToAllocate
- 先获取这个pod的container的这个resource资源的device信息,如果已经分配的部分device,还需要分配的资源量就需要减去已经分配的部分;
- 为container分配device,默认情况下是取通过
available.UnsortedList()[:needed]
取可用的前几个device,如果有topologyAffinityStore的配置,则需要按照topology原则分配device;
- 将上面分配的device加入到allocatedDevices;
- 根据devicesToAllocate分配的devices调用endpoint的
allocate
gRPC接口分配对应的设备,然后将返回结果和设备分配信息添加到podDevices;
writeCheckpoint
,将分配资源写入到kubelet_internal_checkpoint
文件;
- 如果是InitContainer还需要把分配出去的device添加到
devicesToReuse
;
- 如果不是InitContainer则直接调用
allocateContainerResources
进行device分配,只不过不需要将device信息添加到devicesToReuse
;
资源上报
kubelet会定期将资源信息上报给apiserver,关于deviceManager部分会调用GetCapacity
函数获取device信息。在GetCapacity
中,
- 先查看
healthDevices
资源量,把资源量添加到capacity
和allocatable
;
- 查看
unhealthDevices
资源量,更新capacity
总量,最后返回数据;
- 如果device已经删除时间超过5分钟,则需要将资源从endpoint, healthyDevices中删除相关的device信息,最后会将信息更新到
kubelet_internal_checkpoint
。
总结
启动containerManager时也会启动deviceManager,dm通过kubelet.sock启动gRPC服务,device plugin通过Register向kubelet注册device信息,kubelet通过ListAndWatch监听device更新,并将device信息添加到unhealthy和healthy结构,最后dm会通过Allocate对申请的device资源的container分配device资源。