最近想用kubernetes client-go实现一个监听deployments 变化的功能,在如何判断kubernetes资源的变化有了疑问,查阅文档得知有两个与kubernetes资源对象相关的属性。

  • ResourceVersion
  • Generation

kubernetes所有资源对象都有此属性,先来看下ResourceVersion

ResourceVersion

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "6"
  creationTimestamp: "2020-08-14T10:23:27Z"
  generation: 7
  labels:
    app: nginx
  name: nginx
  namespace: default
  resourceVersion: "3313340114"
  uid: 215f58cf-4be8-472a-8ea5-be6d33b01256

resourceVersion的维护其实是利用了底层存储etcdRevision机制。

apiserver存储相关的接口定义都在interfaces.go

Versioner

定义了与resourceVersion操作相关的接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list. It is required to maintain storage invariants - updating an
// object twice with the same data except for the ResourceVersion and SelfLink must be
// a no-op. A resourceVersion of type uint64 is a 'raw' resourceVersion,
// intended to be sent directly to or from the backend. A resourceVersion of
// type string is a 'safe' resourceVersion, intended for consumption by users.
type Versioner interface {
	// UpdateObject sets storage metadata into an API object. Returns an error if the object
	// cannot be updated correctly. May return nil if the requested object does not need metadata
	// from database.
	UpdateObject(obj runtime.Object, resourceVersion uint64) error
	// UpdateList sets the resource version into an API list object. Returns an error if the object
	// cannot be updated correctly. May return nil if the requested object does not need metadata from
	// database. continueValue is optional and indicates that more results are available if the client
	// passes that value to the server in a subsequent call. remainingItemCount indicates the number
	// of remaining objects if the list is partial. The remainingItemCount field is omitted during
	// serialization if it is set to nil.
	UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, remainingItemCount *int64) error
	// PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should
	// return an error if the specified object cannot be updated.
	PrepareObjectForStorage(obj runtime.Object) error
	// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
	// Should return an error if the specified object does not have a persistable version.
	ObjectResourceVersion(obj runtime.Object) (uint64, error)

	// ParseResourceVersion takes a resource version argument and
	// converts it to the storage backend. For watch we should pass to helper.Watch().
	// Because resourceVersion is an opaque value, the default watch
	// behavior for non-zero watch is to watch the next value (if you pass
	// "1", you will see updates from "2" onwards).
	ParseResourceVersion(resourceVersion string) (uint64, error)
}

APIObjectVersioner

Versioner的实现api_object_versioner.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// APIObjectVersioner implements versioning and extracting etcd node information
// for objects that have an embedded ObjectMeta or ListMeta field.
type APIObjectVersioner struct{}

// UpdateObject implements Versioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
	accessor, err := meta.Accessor(obj)
	if err != nil {
		return err
	}
	versionString := ""
	if resourceVersion != 0 {
		versionString = strconv.FormatUint(resourceVersion, 10)
	}
	accessor.SetResourceVersion(versionString)
	return nil
}
//...省略

根据传入的resourceVersion更新kubernetes 资源中resourceVersion,那么具体resourceVersion是怎么来的呢?回到interfaces.go文件看下storage操作相关的接口定义:

Interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
	// Returns Versioner associated with this interface.
	Versioner() Versioner

	// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
	// in seconds (0 means forever). If no error is returned and out is not nil, out will be
	// set to the read value from database.
	Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error

	// Delete removes the specified key and returns the value that existed at that spot.
	// If key didn't exist, it will return NotFound storage error.
	// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
	// current version of the object to avoid read operation from storage to get it.
	// However, the implementations have to retry in case suggestion is stale.
	Delete(
		ctx context.Context, key string, out runtime.Object, preconditions *Preconditions,
		validateDeletion ValidateObjectFunc, cachedExistingObject runtime.Object) error

	// Watch begins watching the specified key. Events are decoded into API objects,
	// and any items selected by 'p' are sent down to returned watch.Interface.
	// resourceVersion may be used to specify what version to begin watching,
	// which should be the current resourceVersion, and no longer rv+1
	// (e.g. reconnecting without missing any updates).
	// If resource version is "0", this interface will get current object at given key
	// and send it in an "ADDED" event, before watch starts.
	Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)

	// WatchList begins watching the specified key's items. Items are decoded into API
	// objects and any item selected by 'p' are sent down to returned watch.Interface.
	// resourceVersion may be used to specify what version to begin watching,
	// which should be the current resourceVersion, and no longer rv+1
	// (e.g. reconnecting without missing any updates).
	// If resource version is "0", this interface will list current objects directory defined by key
	// and send them in "ADDED" events, before watch starts.
	WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)

	// Get unmarshals json found at key into objPtr. On a not found error, will either
	// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
	// Treats empty responses and nil response nodes exactly like a not found error.
	// The returned contents may be delayed, but it is guaranteed that they will
	// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
	Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error

	// GetToList unmarshals json found at key and opaque it into *List api object
	// (an object that satisfies the runtime.IsList definition).
	// The returned contents may be delayed, but it is guaranteed that they will
	// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
	GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error

	// List unmarshalls jsons found at directory defined by key and opaque them
	// into *List api object (an object that satisfies runtime.IsList definition).
	// The returned contents may be delayed, but it is guaranteed that they will
	// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
	List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error

	// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
	// retrying the update until success if there is index conflict.
	// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
	// other writers are simultaneously updating it, so tryUpdate() needs to take into account
	// the current contents of the object when deciding how the update object should look.
	// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
	// or zero value in 'ptrToType' parameter otherwise.
	// If the object to update has the same value as previous, it won't do any update
	// but will return the object in 'ptrToType' parameter.
	// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
	// current version of the object to avoid read operation from storage to get it.
	// However, the implementations have to retry in case suggestion is stale.
	//
	// Example:
	//
	// s := /* implementation of Interface */
	// err := s.GuaranteedUpdate(
	//     "myKey", &MyType{}, true,
	//     func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
	//       // Before each invocation of the user defined function, "input" is reset to
	//       // current contents for "myKey" in database.
	//       curr := input.(*MyType)  // Guaranteed to succeed.
	//
	//       // Make the modification
	//       curr.Counter++
	//
	//       // Return the modified object - return an error to stop iterating. Return
	//       // a uint64 to alter the TTL on the object, or nil to keep it the same value.
	//       return cur, nil, nil
	//    },
	// )
	GuaranteedUpdate(
		ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
		preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error

	// Count returns number of different entries under the key (generally being path prefix).
	Count(key string) (int64, error)
}

数据的CURD等其它,这里我们只选Get来分析下,看下它的实现:

Get

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
	key = path.Join(s.pathPrefix, key)
	startTime := time.Now()
	getResp, err := s.client.KV.Get(ctx, key)
	metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
	if err != nil {
		return err
	}
	if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
		return err
	}

	if len(getResp.Kvs) == 0 {
		if opts.IgnoreNotFound {
			return runtime.SetZeroValue(out)
		}
		return storage.NewKeyNotFoundError(key, 0)
	}
	kv := getResp.Kvs[0]

	data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}

	return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

kv.ModRevision看到从ETCD读取了key的ModRevision,继续看下decode函数

decode

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
	if _, err := conversion.EnforcePtr(objPtr); err != nil {
		return fmt.Errorf("unable to convert output object to pointer: %v", err)
	}
	_, _, err := codec.Decode(value, nil, objPtr)
	if err != nil {
		return err
	}
	// being unable to set the version does not prevent the object from being extracted
	if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
		klog.Errorf("failed to update object version: %v", err)
	}
	return nil
}

调用Versioner接口更新资源对象的resourceVersion

到这里已经可出KubernetesresourceVersion是利用了底层ETCD kv版本机制。

Etcd Version

ETCD共四种version

  • Revision
  • ModRevision
  • Version
  • CreateRevision

关于他们的区别可以看下这个issue:what is different about Revision, ModRevision and Version?

the Revision is the current revision of etcd. It is incremented every time the v3 backed is modified (e.g., Put, Delete, Txn). ModRevision is the etcd revision of the last update to a key. Version is the number of times the key has been modified since it was created. Get(..., WithRev(rev)) will perform a Get as if the etcd store is still at revision rev.

字段 作用范围 说明
Version Key 单个Key的修改次数,单调递增
Revision 全局 Key在集群中的全局版本号,全局唯一
ModRevison Key Key 最后一次修改时的 Revision
CreateRevision 全局 Key 创建时的 Revision

我们使用docker快速启动一个测试etcd来验证下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
rm -rf /tmp/etcd-data.tmp && mkdir -p /tmp/etcd-data.tmp && \
  docker rmi quay.io/coreos/etcd:v3.5.1 || true && \
  docker run \
  -d \
  -p 2379:2379 \
  -p 2380:2380 \
  --mount type=bind,source=/tmp/etcd-data.tmp,destination=/etcd-data \
  --name etcd-gcr-v3.5.1 \
  quay.io/coreos/etcd:v3.5.1 \
  /usr/local/bin/etcd \
  --name s1 \
  --data-dir /etcd-data \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://0.0.0.0:2379 \
  --listen-peer-urls http://0.0.0.0:2380 \
  --initial-advertise-peer-urls http://0.0.0.0:2380 \
  --initial-cluster s1=http://0.0.0.0:2380 \
  --initial-cluster-token tkn \
  --initial-cluster-state new \
  --log-level info \
  --logger zap \
  --log-outputs stderr 

插入、更新数据,查看相关version的变化

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
root@641fe4972263:/# etcdctl put k1 v1
OK
root@641fe4972263:/# etcdctl get k1 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 6,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azE=",
      "create_revision": 6,
      "mod_revision": 6,
      "version": 1,
      "value": "djE="
    }
  ],
  "count": 1
}
root@641fe4972263:/# etcdctl put k2 v2
OK
root@641fe4972263:/# etcdctl get k2 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 7,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azI=",
      "create_revision": 7,
      "mod_revision": 7,
      "version": 1,
      "value": "djI="
    }
  ],
  "count": 1
}
root@641fe4972263:/# etcdctl get k1 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 7,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azE=",
      "create_revision": 6,
      "mod_revision": 6,
      "version": 1,
      "value": "djE="
    }
  ],
  "count": 1
}
root@641fe4972263:/# etcdctl put k1 nv1
OK
root@641fe4972263:/# etcdctl get k1 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 8,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azE=",
      "create_revision": 6,
      "mod_revision": 8,
      "version": 2,
      "value": "bnYx"
    }
  ],
  "count": 1
}
root@641fe4972263:/# etcdctl get k2 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 8,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azI=",
      "create_revision": 7,
      "mod_revision": 7,
      "version": 1,
      "value": "djI="
    }
  ],
  "count": 1
}

删除key并查看相关version的变化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
root@641fe4972263:/# etcdctl del k1
1
root@641fe4972263:/# etcdctl get k1
root@641fe4972263:/# etcdctl get k1 -w json
{"header":{"cluster_id":18011104697467366872,"member_id":6460912315094810421,"revision":9,"raft_term":3}}

root@641fe4972263:/# etcdctl get k1 --rev=6 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 9,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azE=",
      "create_revision": 6,
      "mod_revision": 6,
      "version": 1,
      "value": "djE="
    }
  ],
  "count": 1
}
root@641fe4972263:/# etcdctl put k1 dnv1
OK
root@641fe4972263:/# etcdctl get k1 -w json|jq
{
  "header": {
    "cluster_id": 18011104697467367000,
    "member_id": 6460912315094811000,
    "revision": 10,
    "raft_term": 3
  },
  "kvs": [
    {
      "key": "azE=",
      "create_revision": 10,
      "mod_revision": 10,
      "version": 1,
      "value": "ZG52MQ=="
    }
  ],
  "count": 1
}

k8s 资源对象版本冲突处理

我们日常使用kubectl apply执行更新可能遇到过下边的错误信息:

1
2
3
4
5
6
7
[root@xnile]$ kubectl apply -f nginx-test.yaml
Error from server (Conflict): error when applying patch:
{"metadata":{"generation":7,"resourceVersion":"3313340114"},"status":{"observedGeneration":7}}
to:
Resource: "apps/v1, Resource=deployments", GroupVersionKind: "apps/v1, Kind=Deployment"
Name: "nginx-test", Namespace: "default"
for: "nginx-test.yaml": Operation cannot be fulfilled on deployments.apps "nginx-test": the object has been modified; please apply your changes to the latest version and try again

这种情况通常会出现在我们先用kubectl get deploy xxx -o > xxx.yml导出当前配置,修改后再apply应该更改的时候。

对象版本冲突错误提示信息定义在这里store.go#L230

1
2
//https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L230
OptimisticLockErrorMsg        = "the object has been modified; please apply your changes to the latest version and try again"

更新冲突错误提示信息定义我们来到store.go#L496

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
	key, err := e.KeyFunc(ctx, name)
	if err != nil {
		return nil, false, err
	}

	var (
		creatingObj runtime.Object
		creating    = false
	)

	qualifiedResource := e.qualifiedResourceFromContext(ctx)
	storagePreconditions := &storage.Preconditions{}
	if preconditions := objInfo.Preconditions(); preconditions != nil {
		storagePreconditions.UID = preconditions.UID
		storagePreconditions.ResourceVersion = preconditions.ResourceVersion
	}

	out := e.NewFunc()
	// deleteObj is only used in case a deletion is carried out
	var deleteObj runtime.Object
	err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
		// 获取etcd中当前资源的最新版本的数据
		existingResourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(existing)
		if err != nil {
			return nil, nil, err
		}
		if existingResourceVersion == 0 {
			if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
				return nil, nil, apierrors.NewNotFound(qualifiedResource, name)
			}
		}

		// Given the existing object, get the new object
		obj, err := objInfo.UpdatedObject(ctx, existing)
		if err != nil {
			return nil, nil, err
		}

		// If AllowUnconditionalUpdate() is true and the object specified by
		// the user does not have a resource version, then we populate it with
		// the latest version. Else, we check that the version specified by
		// the user matches the version of latest storage object.
		newResourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
		if err != nil {
			return nil, nil, err
		}
		// 无条件更新,通常是yaml文件中不带resourceVersion,然后就在etcd最新版本数据的基础上进行更新
		doUnconditionalUpdate := newResourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()

		if existingResourceVersion == 0 {
			var finishCreate FinishFunc = finishNothing

			if e.BeginCreate != nil {
				fn, err := e.BeginCreate(ctx, obj, newCreateOptionsFromUpdateOptions(options))
				if err != nil {
					return nil, nil, err
				}
				finishCreate = fn
				defer func() {
					finishCreate(ctx, false)
				}()
			}

			creating = true
			creatingObj = obj
			if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
				return nil, nil, err
			}
			// at this point we have a fully formed object.  It is time to call the validators that the apiserver
			// handling chain wants to enforce.
			if createValidation != nil {
				if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
					return nil, nil, err
				}
			}
			ttl, err := e.calculateTTL(obj, 0, false)
			if err != nil {
				return nil, nil, err
			}

			// The operation has succeeded.  Call the finish function if there is one,
			// and then make sure the defer doesn't call it again.
			fn := finishCreate
			finishCreate = finishNothing
			fn(ctx, true)

			return obj, &ttl, nil
		}

		creating = false
		creatingObj = nil
		if doUnconditionalUpdate {
			// Update the object's resource version to match the latest
			// storage object's resource version.
			err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
			if err != nil {
				return nil, nil, err
			}
		} else {
			// Check if the object's resource version matches the latest
			// resource version.
			if newResourceVersion == 0 {
				// TODO: The Invalid error should have a field for Resource.
				// After that field is added, we should fill the Resource and
				// leave the Kind field empty. See the discussion in #18526.
				qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
				fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), newResourceVersion, "must be specified for an update")}
				return nil, nil, apierrors.NewInvalid(qualifiedKind, name, fieldErrList)
			}
			// 数据已经发生变化,提示版本冲突
			if newResourceVersion != existingResourceVersion {
				return nil, nil, apierrors.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
			}
		}

		var finishUpdate FinishFunc = finishNothing

		if e.BeginUpdate != nil {
			fn, err := e.BeginUpdate(ctx, obj, existing, options)
			if err != nil {
				return nil, nil, err
			}
			finishUpdate = fn
			defer func() {
				finishUpdate(ctx, false)
			}()
		}

		if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
			return nil, nil, err
		}
		// at this point we have a fully formed object.  It is time to call the validators that the apiserver
		// handling chain wants to enforce.
		if updateValidation != nil {
			if err := updateValidation(ctx, obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
				return nil, nil, err
			}
		}
		// Check the default delete-during-update conditions, and store-specific conditions if provided
		if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
			(e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
			deleteObj = obj
			return nil, nil, errEmptiedFinalizers
		}
		ttl, err := e.calculateTTL(obj, res.TTL, true)
		if err != nil {
			return nil, nil, err
		}

		// The operation has succeeded.  Call the finish function if there is one,
		// and then make sure the defer doesn't call it again.
		fn := finishUpdate
		finishUpdate = finishNothing
		fn(ctx, true)

		if int64(ttl) != res.TTL {
			return obj, &ttl, nil
		}
		return obj, nil, nil
	}, dryrun.IsDryRun(options.DryRun), nil)

	if err != nil {
		// delete the object
		if err == errEmptiedFinalizers {
			return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions, newDeleteOptionsFromUpdateOptions(options))
		}
		if creating {
			err = storeerr.InterpretCreateError(err, qualifiedResource, name)
			err = rest.CheckGeneratedNameError(ctx, e.CreateStrategy, err, creatingObj)
		} else {
			err = storeerr.InterpretUpdateError(err, qualifiedResource, name)
		}
		return nil, false, err
	}

	if creating {
		if e.AfterCreate != nil {
			e.AfterCreate(out, newCreateOptionsFromUpdateOptions(options))
		}
	} else {
		if e.AfterUpdate != nil {
			e.AfterUpdate(out, options)
		}
	}
	if e.Decorator != nil {
		e.Decorator(out)
	}
	return out, creating, nil
}

根据更新资源时是否带有resourceVersion分两种情况:

  • 未带resourceVersion:无条件更新,获得etcd中最新的数据然后再此基础上更新
  • 带有resourceVersion:和etcdmodRevision对比,不一样就提示版本冲突,说明数据已发生修改,当前要修改的版本已不是最新数据。

Generation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (deploymentStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
	deployment := obj.(*apps.Deployment)
	deployment.Status = apps.DeploymentStatus{}
	deployment.Generation = 1

	pod.DropDisabledTemplateFields(&deployment.Spec.Template, nil)
}

func (deploymentStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
	newDeployment := obj.(*apps.Deployment)
	oldDeployment := old.(*apps.Deployment)
	newDeployment.Status = oldDeployment.Status

	pod.DropDisabledTemplateFields(&newDeployment.Spec.Template, &oldDeployment.Spec.Template)

	// Spec updates bump the generation so that we can distinguish between
	// scaling events and template changes, annotation updates bump the generation
	// because annotations are copied from deployments to their replica sets.
	if !apiequality.Semantic.DeepEqual(newDeployment.Spec, oldDeployment.Spec) ||
		!apiequality.Semantic.DeepEqual(newDeployment.Annotations, oldDeployment.Annotations) {
		newDeployment.Generation = oldDeployment.Generation + 1
	}
}

Generation初始值为1,随Spec内容的改变而自增。

总结

  • Kubernetes 资源版本控制采用乐观锁,apiserver在写入etcd时作冲突检测。
  • ResourceVersion 基于底层etcd的revision机制,资源对象每次update时都会改变,且集群范围内唯一。
  • Generation初始值为1,随Spec内容的改变而自增。

参考

https://www.cnblogs.com/tencent-cloud-native/p/14893209.html

http://www.jcxioo.com/2021/06/14/01_Kubernetes/etcd1%20/

http://www.jcxioo.com/2021/07/22/01_Kubernetes/etcd%20mvcc/

https://blog.betacat.io/post/mvcc-implementation-in-etcd/

https://etcd.io/docs/v3.2/learning/api/

https://stackoverflow.com/questions/47100389/what-is-the-difference-between-a-resourceversion-and-a-generation/47101418