admin 管理员组文章数量: 887021
nfs
本文GitBook地址:.html
如果要开发一个
Dynamic Provisioner
,需要使用到the helper library。
1. Dynamic Provisioner
1.1. Provisioner Interface
开发Dynamic Provisioner
需要实现Provisioner接口,该接口有两个方法,分别是:
- Provision:创建存储资源,并且返回一个PV对象。
- Delete:移除对应的存储资源,但并没有删除PV对象。
Provisioner 接口源码如下:
// Provisioner is an interface that creates templates for PersistentVolumes
// and can create the volume as a new resource in the infrastructure provider.
// It can also remove the volume it created from the underlying storage
// provider.
type Provisioner interface {// Provision creates a volume i.e. the storage asset and returns a PV object// for the volumeProvision(VolumeOptions) (*v1.PersistentVolume, error)// Delete removes the storage asset that was created by Provision backing the// given PV. Does not delete the PV object itself.//// May return IgnoredError to indicate that the call has been ignored and no// action taken.Delete(*v1.PersistentVolume) error
}
1.2. VolumeOptions
Provisioner
接口的Provision
方法的入参是一个VolumeOptions
对象。VolumeOptions
对象包含了创建PV对象所需要的信息,例如:PV的回收策略,PV的名字,PV所对应的PVC对象以及PVC的StorageClass
对象使用的参数等。
VolumeOptions 源码如下:
// VolumeOptions contains option information about a volume
// .4/pkg/volume/plugins.go
type VolumeOptions struct {// Reclamation policy for a persistent volumePersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy// PV.Name of the appropriate PersistentVolume. Used to generate cloud// volume name.PVName string// PV mount options. Not validated - mount of the PVs will simply fail if one is invalid.MountOptions []string// PVC is reference to the claim that lead to provisioning of a new PV.// Provisioners *must* create a PV that would be matched by this PVC,// i.e. with required capacity, accessMode, labels matching PVC.Selector and// so on.PVC *v1.PersistentVolumeClaim// Volume provisioning parameters from StorageClassParameters map[string]string// Node selected by the scheduler for the volume.SelectedNode *v1.Node// Topology constraint parameter from StorageClassAllowedTopologies []v1.TopologySelectorTerm
}
1.3. ProvisionController
ProvisionController
是一个给PVC提供PV的控制器,具体执行Provisioner
接口的Provision
和Delete
的方法的所有逻辑。
1.4. 开发provisioner的步骤
- 写一个
provisioner
实现Provisioner
接口(包含Provision
和Delete
的方法)。 - 通过该
provisioner
构建`ProvisionController
。 - 执行
ProvisionController
的Run
方法。
2. NFS Client Provisioner
nfs-client-provisioner
是一个automatic provisioner
,使用NFS作为存储,自动创建PV和对应的PVC,本身不提供NFS存储,需要外部先有一套NFS存储服务。
- PV以
${namespace}-${pvcName}-${pvName}
的命名格式提供(在NFS服务器上) - PV回收的时候以
archieved-${namespace}-${pvcName}-${pvName}
的命名格式(在NFS服务器上)
以下通过nfs-client-provisioner
的源码分析来说明开发自定义provisioner
整个过程。nfs-client-provisioner
的主要代码都在provisioner.go的文件中。
nfs-client-provisioner
源码地址:
2.1. Main函数
2.1.1. 读取环境变量
源码如下:
func main() {flag.Parse()flag.Set("logtostderr", "true")server := os.Getenv("NFS_SERVER")if server == "" {glog.Fatal("NFS_SERVER not set")}path := os.Getenv("NFS_PATH")if path == "" {glog.Fatal("NFS_PATH not set")}provisionerName := os.Getenv(provisionerNameKey)if provisionerName == "" {glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey)}...
}
main函数先获取NFS_SERVER
、NFS_PATH
、PROVISIONER_NAME
三个环境变量的值,因此在部署nfs-client-provisioner的时候,需要将这三个环境变量的值传入。
NFS_SERVER
:NFS服务端的IP地址。NFS_PATH
:NFS服务端设置的共享目录PROVISIONER_NAME
:provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。
例如StorageClass
对象的yaml文件如下:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:name: managed-nfs-storage
provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.
2.1.2. 获取clientset对象
源码如下:
// Create an InClusterConfig and use it to create a client for the controller
// to use to communicate with Kubernetes
config, err := rest.InClusterConfig()
if err != nil {glog.Fatalf("Failed to create config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {glog.Fatalf("Failed to create client: %v", err)
}
通过读取对应的k8s的配置,创建clientset
对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。
2.1.3. 构造nfsProvisioner对象
源码如下:
// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {glog.Fatalf("Error getting server version: %v", err)
}clientNFSProvisioner := &nfsProvisioner{client: clientset,server: server,path: path,
}
通过clientset
、server
、path
等值构造nfsProvisioner
对象,同时还获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。
nfsProvisioner
类型定义如下:
type nfsProvisioner struct {client kubernetes.Interfaceserver stringpath string
}var _ controller.Provisioner = &nfsProvisioner{}
nfsProvisioner
是一个自定义的provisioner
,用来实现Provisioner
的接口,其中的属性除了server
、path
这两个关于NFS相关的参数,还包含了client
,主要用来调用k8s的API。
var _ controller.Provisioner = &nfsProvisioner{}
以上用法用来检测nfsProvisioner
是否实现了Provisioner
的接口。
2.1.4. 构建并运行ProvisionController
源码如下:
// Start the provision controller which will dynamically provision efs NFS
// PVs
pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion)
pc.Run(wait.NeverStop)
通过nfsProvisioner
构造ProvisionController
对象并执行Run
方法,ProvisionController
实现了具体的PV和PVC的相关逻辑,Run
方法以常驻进程的方式运行。
2.2. Provision和Delete方法
2.2.1. Provision方法
nfsProvisioner
的Provision
方法具体源码参考:.go#L56
Provision
方法用来创建存储资源,并且返回一个PV
对象。其中入参是VolumeOptions
,用来指定PV
对象的相关属性。
1、构建PV和PVC的名称
func (p *nfsProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {if options.PVC.Spec.Selector != nil {return nil, fmt.Errorf("claim Selector is not supported")}glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options)pvcNamespace := options.PVC.NamespacepvcName := options.PVC.NamepvName := strings.Join([]string{pvcNamespace, pvcName, options.PVName}, "-")fullPath := filepath.Join(mountPath, pvName)glog.V(4).Infof("creating path %s", fullPath)if err := os.MkdirAll(fullPath, 0777); err != nil {return nil, errors.New("unable to create directory to provision new pv: " + err.Error())}os.Chmod(fullPath, 0777)path := filepath.Join(p.path, pvName)...
}
通过VolumeOptions
的入参,构建PV和PVC的名称,以及创建路径path。
2、构造PV对象
pv := &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: options.PVName,},Spec: v1.PersistentVolumeSpec{PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,AccessModes: options.PVC.Spec.AccessModes,MountOptions: options.MountOptions,Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],},PersistentVolumeSource: v1.PersistentVolumeSource{NFS: &v1.NFSVolumeSource{Server: p.server,Path: path,ReadOnly: false,},},},
}
return pv, nil
综上可以看出,Provision
方法只是通过VolumeOptions
参数来构建PV
对象,并没有执行具体PV
的创建或删除的操作。
不同类型的Provisioner
的,一般是PersistentVolumeSource
类型和参数不同,例如nfs-provisioner
对应的PersistentVolumeSource
为NFS
,并且需要传入NFS
相关的参数:Server
,Path
等。
2.2.2. Delete方法
nfsProvisioner
的delete
方法具体源码参考:.go#L99
1、获取pvName和path等相关参数
func (p *nfsProvisioner) Delete(volume *v1.PersistentVolume) error {path := volume.Spec.PersistentVolumeSource.NFS.PathpvName := filepath.Base(path)oldPath := filepath.Join(mountPath, pvName)if _, err := os.Stat(oldPath); os.IsNotExist(err) {glog.Warningf("path %s does not exist, deletion skipped", oldPath)return nil}...
}
通过path
和pvName
生成oldPath
,其中oldPath
是原先NFS服务器上pod
对应的数据持久化存储路径。
2、获取archiveOnDelete参数并删除数据
// Get the storage class for this volume.
storageClass, err := p.getClassForVolume(volume)
if err != nil {return err
}
// Determine if the "archiveOnDelete" parameter exists.
// If it exists and has a falsey value, delete the directory.
// Otherwise, archive it.
archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"]
if exists {archiveBool, err := strconv.ParseBool(archiveOnDelete)if err != nil {return err}if !archiveBool {return os.RemoveAll(oldPath)}
}
如果storageClass
对象中指定archiveOnDelete
参数并且值为false
,则会自动删除oldPath
下的所有数据,即pod
对应的数据持久化存储数据。
archiveOnDelete
字面意思为删除时是否存档,false表示不存档,即删除数据,true表示存档,即重命名路径。
3、重命名旧数据路径
archivePath := filepath.Join(mountPath, "archived-"+pvName)
glog.V(4).Infof("archiving path %s to %s", oldPath, archivePath)
return os.Rename(oldPath, archivePath)
如果storageClass
对象中没有指定archiveOnDelete
参数或者值为true
,表明需要删除时存档,即将oldPath
重命名,命名格式为oldPath
前面增加archived-
的前缀。
3. ProvisionController
3.1. ProvisionController结构体
源码具体参考:.go#L82
ProvisionController
是一个给PVC提供PV的控制器,具体执行Provisioner
接口的Provision
和Delete
的方法的所有逻辑。
3.1.1. 入参
// ProvisionController is a controller that provisions PersistentVolumes for
// PersistentVolumeClaims.
type ProvisionController struct {client kubernetes.Interface// The name of the provisioner for which this controller dynamically// provisions volumes. The value of annDynamicallyProvisioned and// annStorageProvisioner to set & watch for, respectivelyprovisionerName string// The provisioner the controller will use to provision and delete volumes.// Presumably this implementer of Provisioner carries its own// volume-specific options and such that it needs in order to provision// volumes.provisioner Provisioner// Kubernetes cluster server version:// * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic// provisioning is not officially supported, though it works// * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is// officially supported// * 1.6: storage classes enter GAkubeVersion *utilversion.Version...
}
client
、provisionerName
、provisioner
、kubeVersion
等属性作为NewProvisionController
的入参。
client
:clientset客户端,用来调用k8s的API。provisionerName
:provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。provisioner
:具体的provisioner的实现者,本文为nfsProvisioner
。kubeVersion
:k8s的版本信息。
3.1.2. Controller和Informer
type ProvisionController struct {...claimInformer cache.SharedInformerclaims cache.StoreclaimController cache.ControllervolumeInformer cache.SharedInformervolumes cache.StorevolumeController cache.ControllerclassInformer cache.SharedInformerclasses cache.StoreclassController cache.Controller...
}
ProvisionController
结构体中包含了PV
、PVC
、StorageClass
三个对象的Controller
、Informer
和Store
,主要用来执行这三个对象的相关操作。
- Controller:通用的控制框架
- Informer:消息通知器
- Store:通用的对象存储接口
3.1.3. workqueue
type ProvisionController struct {...claimQueue workqueue.RateLimitingInterfacevolumeQueue workqueue.RateLimitingInterface...
}
claimQueue
和volumeQueue
分别是PV
和PVC
的任务队列。
3.1.4. 其他
// Identity of this controller, generated at creation time and not persisted
// across restarts. Useful only for debugging, for seeing the source of
// events. controller.provisioner may have its own, different notion of
// identity which may/may not persist across restarts
id string
component string
eventRecorder record.EventRecorderresyncPeriod time.DurationexponentialBackOffOnError bool
threadiness intcreateProvisionedPVRetryCount int
createProvisionedPVInterval time.DurationfailedProvisionThreshold, failedDeleteThreshold int// The port for metrics server to serve on.
metricsPort int32
// The IP address for metrics server to serve on.
metricsAddress string
// The path of metrics endpoint path.
metricsPath string// Parameters of leaderelection.LeaderElectionConfig.
leaseDuration, renewDeadline, retryPeriod time.DurationhasRun bool
hasRunLock *sync.Mutex
3.2. NewProvisionController方法
源码地址:.go#L383
NewProvisionController
方法主要用来构造ProvisionController
。
3.2.1. 初始化默认值
// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(client kubernetes.Interface,provisionerName string,provisioner Provisioner,kubeVersion string,options ...func(*ProvisionController) error,
) *ProvisionController {...controller := &ProvisionController{client: client,provisionerName: provisionerName,provisioner: provisioner,kubeVersion: utilversion.MustParseSemantic(kubeVersion),id: id,component: component,eventRecorder: eventRecorder,resyncPeriod: DefaultResyncPeriod,exponentialBackOffOnError: DefaultExponentialBackOffOnError,threadiness: DefaultThreadiness,createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,failedProvisionThreshold: DefaultFailedProvisionThreshold,failedDeleteThreshold: DefaultFailedDeleteThreshold,leaseDuration: DefaultLeaseDuration,renewDeadline: DefaultRenewDeadline,retryPeriod: DefaultRetryPeriod,metricsPort: DefaultMetricsPort,metricsAddress: DefaultMetricsAddress,metricsPath: DefaultMetricsPath,hasRun: false,hasRunLock: &sync.Mutex{},}...
}
3.2.2. 初始化任务队列
ratelimiter := workqueue.NewMaxOfRateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
if !controller.exponentialBackOffOnError {ratelimiter = workqueue.NewMaxOfRateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)
}
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")
3.2.3. ListWatch
// PVC
claimSource := &cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)},
}
// PV
volumeSource := &cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return client.CoreV1().PersistentVolumes().List(options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {return client.CoreV1().PersistentVolumes().Watch(options)},
}
// StorageClass
classSource = &cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return client.StorageV1().StorageClasses().List(options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {return client.StorageV1().StorageClasses().Watch(options)},
}
list-watch
机制是k8s中用来监听对象变化的核心机制,ListWatch
包含ListFunc
和WatchFunc
两个函数,且不能为空,以上代码分别构造了PV、PVC、StorageClass三个对象的ListWatch
结构体。该机制的实现在client-go
的cache
包中,具体参考:.io/client-go/tools/cache。
更多ListWatch
代码如下:
具体参考:.io/client-go/tools/cache/listwatch.go#L34
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {// List should return a list type object; the Items field will be extracted, and the// ResourceVersion field will be used to start the watch in the right place.List(options metav1.ListOptions) (runtime.Object, error)// Watch should begin a watch at the specified version.Watch(options metav1.ListOptions) (watch.Interface, error)
}// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {ListFunc ListFuncWatchFunc WatchFunc// DisableChunking requests no chunking for this list watcher.DisableChunking bool
}
3.2.4. ResourceEventHandlerFuncs
// PVC
claimHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
}
// PV
volumeHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
}
// StorageClass
classHandler := cache.ResourceEventHandlerFuncs{// We don't need an actual event handler for StorageClasses,// but we must pass a non-nil one to cache.NewInformer()AddFunc: nil,UpdateFunc: nil,DeleteFunc: nil,
}
ResourceEventHandlerFuncs
是资源事件处理函数,主要用来对k8s资源对象增删改
变化的事件进行消息通知,该函数实现了ResourceEventHandler
的接口。具体代码逻辑在client-go
的cache包中。
更多ResourceEventHandlerFuncs
代码可参考:
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {OnAdd(obj interface{})OnUpdate(oldObj, newObj interface{})OnDelete(obj interface{})
}// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {AddFunc func(obj interface{})UpdateFunc func(oldObj, newObj interface{})DeleteFunc func(obj interface{})
}
3.2.5. 构造Store和Controller
1、PVC
if controller.claimInformer != nil {controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)controller.claims, controller.claimController =controller.claimInformer.GetStore(),controller.claimInformer.GetController()
} else {controller.claims, controller.claimController =cache.NewInformer(claimSource,&v1.PersistentVolumeClaim{},controller.resyncPeriod,claimHandler,)
}
2、PV
if controller.volumeInformer != nil {controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)controller.volumes, controller.volumeController =controller.volumeInformer.GetStore(),controller.volumeInformer.GetController()
} else {controller.volumes, controller.volumeController =cache.NewInformer(volumeSource,&v1.PersistentVolume{},controller.resyncPeriod,volumeHandler,)
}
3、StorageClass
if controller.classInformer != nil {// no resource event handler needed for StorageClassescontroller.classes, controller.classController =controller.classInformer.GetStore(),controller.classInformer.GetController()
} else {controller.classes, controller.classController = cache.NewInformer(classSource,versionedClassType,controller.resyncPeriod,classHandler,)
}
通过cache.NewInformer
的方法构造,入参是ListWatch
结构体和ResourceEventHandlerFuncs
函数等,返回值是Store
和Controller
。
通过以上各个部分的构造,最后返回一个具体的ProvisionController
对象。
3.3. ProvisionController.Run方法
ProvisionController
的Run
方法是以常驻进程的方式运行,函数内部再运行其他的controller。
3.3.1. prometheus数据收集
// Run starts all of this controller's control loops
func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {run := func(stopCh <-chan struct{}) {...if ctrl.metricsPort > 0 {prometheus.MustRegister([]prometheus.Collector{metrics.PersistentVolumeClaimProvisionTotal,metrics.PersistentVolumeClaimProvisionFailedTotal,metrics.PersistentVolumeClaimProvisionDurationSeconds,metrics.PersistentVolumeDeleteTotal,metrics.PersistentVolumeDeleteFailedTotal,metrics.PersistentVolumeDeleteDurationSeconds,}...)http.Handle(ctrl.metricsPath, promhttp.Handler())address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10))glog.Infof("Starting metrics server at %s\n", address)go wait.Forever(func() {err := http.ListenAndServe(address, nil)if err != nil {glog.Errorf("Failed to listen on %s: %v", address, err)}}, 5*time.Second)}...
}
3.3.2. Controller.Run
// If a SharedInformer has been passed in, this controller should not
// call Run again
if ctrl.claimInformer == nil {go ctrl.claimController.Run(stopCh)
}
if ctrl.volumeInformer == nil {go ctrl.volumeController.Run(stopCh)
}
if ctrl.classInformer == nil {go ctrl.classController.Run(stopCh)
}
运行消息通知器Informer。
3.3.3. Worker
for i := 0; i < ctrl.threadiness; i++ {go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh)
}
runClaimWorker
和runVolumeWorker
分别为PVC和PV的worker,这两个的具体执行体分别是processNextClaimWorkItem
和processNextVolumeWorkItem
。
执行流程如下:
PVC的函数调用流程
runClaimWorker→processNextClaimWorkItem→syncClaimHandler→syncClaim→provisionClaimOperation
PV的函数调用流程
runVolumeWorker→processNextVolumeWorkItem→syncVolumeHandler→syncVolume→deleteVolumeOperation
可见最后执行的函数分别是provisionClaimOperation
和deleteVolumeOperation
。
3.4. Operation
3.4.1. provisionClaimOperation
1、provisionClaimOperation
入参是PVC,通过PVC获得PV对象,并判断PV对象是否存在,如果存在则退出后续操作。
// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns error, which indicates whether provisioning should be retried
// (requeue the claim) or not
func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {// Most code here is identical to that found in controller.go of kube's PV controller...claimClass := helper.GetPersistentVolumeClaimClass(claim)operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)glog.Infof(logOperation(operation, "started"))// A previous doProvisionClaim may just have finished while we were waiting for// the locks. Check that PV (with deterministic name) hasn't been provisioned// yet.pvName := ctrl.getProvisionedVolumeNameForClaim(claim)volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})if err == nil && volume != nil {// Volume has been already provisioned, nothing to do.glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))return nil}...
}
2、获取StorageClass对象中的Provisioner
和ReclaimPolicy
参数,如果provisionerName
和StorageClass
对象中的provisioner
字段不一致则报错并退出执行。
provisioner, parameters, err := ctrl.getStorageClassFields(claimClass)
if err != nil {glog.Errorf(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))return nil
}
if provisioner != ctrl.provisionerName {// class.Provisioner has either changed since shouldProvision() or// annDynamicallyProvisioned contains different provisioner than// class.Provisioner.glog.Errorf(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner))return nil
}
// Check if this provisioner can provision this claim.
if err = ctrl.canProvision(claim); err != nil {ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())glog.Errorf(logOperation(operation, "failed to provision volume: %v", err))return nil
}reclaimPolicy := v1.PersistentVolumeReclaimDelete
if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) {reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass)if err != nil {return err}
}
3、执行具体的provisioner.Provision
方法,构建PV对象,例如本文中的provisioner
是nfs-provisioner
。
options := VolumeOptions{PersistentVolumeReclaimPolicy: reclaimPolicy,PVName: pvName,PVC: claim,MountOptions: mountOptions,Parameters: parameters,SelectedNode: selectedNode,AllowedTopologies: allowedTopologies,
}ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))volume, err = ctrl.provisioner.Provision(options)
if err != nil {if ierr, ok := err.(*IgnoredError); ok {// Provision ignored, do nothing and hope another provisioner will provision it.glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr))return nil}err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())return err
}
4、创建k8s的PV对象。
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name))if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {// Save succeeded.if err != nil {glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))err = nil} else {glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name))}break}// Save failed, try again after a while.glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))time.Sleep(ctrl.createProvisionedPVInterval)
}
5、创建PV失败,清理存储资源。
if err != nil {// Save failed. Now we have a storage asset outside of Kubernetes,// but we don't have appropriate PV object for it.// Emit some event here and try to delete the storage asset several// times....for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {if err = ctrl.provisioner.Delete(volume); err == nil {// Delete succeededglog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name))break}// Delete failed, try again after a while.glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))time.Sleep(ctrl.createProvisionedPVInterval)}if err != nil {// Delete failed several times. There is an orphaned volume and there// is nothing we can do about it.strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)glog.Error(logOperation(operation, strerr))ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)}
}
如果创建成功,则打印成功的日志,并返回nil
。
3.4.2. deleteVolumeOperation
1、deleteVolumeOperation
入参是PV,先获得PV对象,并判断是否需要删除。
// deleteVolumeOperation attempts to delete the volume backing the given
// volume. Returns error, which indicates whether deletion should be retried
// (requeue the volume) or not
func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {...// This method may have been waiting for a volume lock for some time.// Our check does not have to be as sophisticated as PV controller's, we can// trust that the PV controller has set the PV to Released/Failed and it's// ours to deletenewVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})if err != nil {return nil}if !ctrl.shouldDelete(newVolume) {glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))return nil}...
}
2、调用具体的provisioner
的Delete
方法,例如,如果是nfs-provisioner,则是调用nfs-provisioner的Delete方法。
err = ctrl.provisioner.Delete(volume)
if err != nil {if ierr, ok := err.(*IgnoredError); ok {// Delete ignored, do nothing and hope another provisioner will delete it.glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr))return nil}// Delete failed, emit an event.glog.Errorf(logOperation(operation, "volume deletion failed: %v", err))ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())return err
}
3、删除k8s中的PV对象。
// Delete the volume
if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {// Oops, could not delete the volume and therefore the controller will// try to delete the volume again on next update.glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err))return err
}
4. 总结
Provisioner
接口包含Provision
和Delete
两个方法,自定义的provisioner
需要实现这两个方法,这两个方法只是处理了跟存储类型相关的事项,并没有针对PV
、PVC
对象的增删等操作。Provision
方法主要用来构造PV对象,不同类型的Provisioner
的,一般是PersistentVolumeSource
类型和参数不同,例如nfs-provisioner
对应的PersistentVolumeSource
为NFS
,并且需要传入NFS
相关的参数:Server
,Path
等。Delete
方法主要针对对应的存储类型,做数据存档(备份)或删除的处理。StorageClass
对象需要单独创建,用来指定具体的provisioner
来执行相关逻辑。provisionClaimOperation
和deleteVolumeOperation
具体执行了k8s中PV
对象的创建和删除操作,同时调用了具体provisioner
的Provision
和Delete
两个方法来对存储数据做处理。
参考文章
- .go
- .go
本文标签: NFS
版权声明:本文标题:nfs 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1730816323h1387698.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论