Skip to content

Commit 8de5b8d

Browse files
authored
Merge pull request #355 from robbrockbank/1.1.1-merge
Commits required for v1.1.1
2 parents 83090c9 + cf000a9 commit 8de5b8d

19 files changed

+383
-75
lines changed

lib/api/ippool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type IPIPConfiguration struct {
6363
// addresses within this pool. A mode of "cross-subnet" will only use IPIP
6464
// tunneling when the destination node is on a different subnet to the
6565
// originating node. The default value (if not specified) is "always".
66-
Mode ipip.Mode `json:"mode,omitempty"`
66+
Mode ipip.Mode `json:"mode,omitempty" validate:"ipipmode"`
6767
}
6868

6969
// NewIPPool creates a new (zeroed) Pool struct with the TypeMetadata initialised to the current

lib/api/unversioned/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ var (
3030
//
3131
// All resource and resource lists embed a TypeMetadata as an anonymous field.
3232
type TypeMetadata struct {
33-
Kind string `json:"kind" validate:"required"`
34-
APIVersion string `json:"apiVersion" validate:"required"`
33+
Kind string `json:"kind"`
34+
APIVersion string `json:"apiVersion"`
3535
}
3636

3737
func (md TypeMetadata) GetTypeMetadata() TypeMetadata {

lib/backend/etcd/syncer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2016 Tigera, Inc. All rights reserved.
1+
// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -245,6 +245,7 @@ func sendSnapshotNode(node *client.Node, snapshotUpdates chan<- interface{}, res
245245
// so that the merge goroutine can trigger a new resync via snapshot.
246246
func (syn *etcdSyncer) watchEtcd(watcherUpdateC chan<- interface{}) {
247247
log.Info("etcd watch thread started.")
248+
var timeOfLastError time.Time
248249
// Each trip around the outer loop establishes the current etcd index
249250
// of the cluster, triggers a new snapshot read from that index (via
250251
// message to the merge goroutine) and starts watching from that index.
@@ -278,6 +279,13 @@ func (syn *etcdSyncer) watchEtcd(watcherUpdateC chan<- interface{}) {
278279
// Wait for the next event from the watcher.
279280
resp, err := watcher.Next(context.Background())
280281
if err != nil {
282+
// Prevent a tight loop if etcd is repeatedly failing.
283+
if time.Since(timeOfLastError) < 1*time.Second {
284+
log.Warning("May be tight looping, throttling retries.")
285+
time.Sleep(1 * time.Second)
286+
}
287+
timeOfLastError = time.Now()
288+
281289
if !retryableWatcherError(err) {
282290
// Break out of the loop to trigger a new resync.
283291
log.WithError(err).Warning("Lost sync with etcd, restarting watcher.")

lib/backend/k8s/conversion.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ func (c converter) podToWorkloadEndpoint(pod *kapiv1.Pod) (*model.KVPair, error)
219219
ipNets = []cnet.IPNet{*ipNet}
220220
}
221221

222+
// Check to see if the Pod has a Node.
223+
if len(pod.Spec.NodeName) == 0 {
224+
log.Warnf("%s - NodeName is empty. Spec: %+v Status: %+v", pod.Name, pod.Spec, pod.Status)
225+
return nil, fmt.Errorf("Pod '%s' is missing a NodeName", pod.Name)
226+
}
227+
222228
// Generate the interface name and MAC based on workload. This must match
223229
// the host-side veth configured by the CNI plugin.
224230
interfaceName := VethNameForWorkload(workload)

lib/backend/k8s/conversion_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,23 @@ var _ = Describe("Test Pod conversion", func() {
191191
Expect(wep.Value.(*model.WorkloadEndpoint).State).To(Equal("active"))
192192
Expect(wep.Value.(*model.WorkloadEndpoint).Labels).To(Equal(map[string]string{"calico/k8s_ns": "default"}))
193193
})
194+
195+
It("should not Parse a Pod with no NodeName", func() {
196+
pod := k8sapi.Pod{
197+
ObjectMeta: k8sapi.ObjectMeta{
198+
Name: "podA",
199+
Namespace: "default",
200+
},
201+
Spec: k8sapi.PodSpec{},
202+
Status: k8sapi.PodStatus{
203+
PodIP: "192.168.0.1",
204+
},
205+
}
206+
207+
_, err := c.podToWorkloadEndpoint(&pod)
208+
Expect(err).To(HaveOccurred())
209+
})
210+
194211
})
195212

196213
var _ = Describe("Test NetworkPolicy conversion", func() {

lib/backend/k8s/k8s.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func NewKubeClient(kc *KubeConfig) (*KubeClient, error) {
113113

114114
tprClient, err := buildTPRClient(config)
115115
if err != nil {
116-
return nil, err
116+
return nil, fmt.Errorf("Failed to build TPR client: %s", err)
117117
}
118118
kubeClient := &KubeClient{
119119
clientSet: cs,
@@ -131,15 +131,15 @@ func (c *KubeClient) EnsureInitialized() error {
131131
log.Info("Ensuring ThirdPartyResources exist")
132132
err := c.ensureThirdPartyResources()
133133
if err != nil {
134-
return err
134+
return fmt.Errorf("Failed to ensure ThirdPartyResources exist: %s", err)
135135
}
136136
log.Info("ThirdPartyResources exist")
137137

138138
// Ensure ClusterType is set.
139139
log.Info("Ensuring ClusterType is set")
140140
err = c.waitForClusterType()
141141
if err != nil {
142-
return err
142+
return fmt.Errorf("Failed to ensure ClusterType is set: %s", err)
143143
}
144144
log.Info("ClusterType is set")
145145
return nil
@@ -221,6 +221,7 @@ func (c *KubeClient) ensureClusterType() (bool, error) {
221221
}
222222
value = existingValue
223223
}
224+
log.WithField("value", value).Debug("Setting ClusterType")
224225
_, err = c.Apply(&model.KVPair{
225226
Key: k,
226227
Value: value,
@@ -276,6 +277,7 @@ func (c *KubeClient) Syncer(callbacks api.SyncerCallbacks) api.Syncer {
276277

277278
// Create an entry in the datastore. This errors if the entry already exists.
278279
func (c *KubeClient) Create(d *model.KVPair) (*model.KVPair, error) {
280+
log.Debugf("Performing 'Create' for %+v", d)
279281
switch d.Key.(type) {
280282
case model.GlobalConfigKey:
281283
return c.createGlobalConfig(d)
@@ -293,21 +295,23 @@ func (c *KubeClient) Create(d *model.KVPair) (*model.KVPair, error) {
293295
// Update an existing entry in the datastore. This errors if the entry does
294296
// not exist.
295297
func (c *KubeClient) Update(d *model.KVPair) (*model.KVPair, error) {
298+
log.Debugf("Performing 'Update' for %+v", d)
296299
switch d.Key.(type) {
297300
case model.GlobalConfigKey:
298301
return c.updateGlobalConfig(d)
299302
case model.IPPoolKey:
300303
return c.ipPoolClient.Update(d)
301304
default:
302305
// If the resource isn't supported, then this is a no-op.
303-
log.Infof("'Update' for %+v is no-op", d.Key)
306+
log.Debugf("'Update' for %+v is no-op", d.Key)
304307
return d, nil
305308
}
306309
}
307310

308311
// Set an existing entry in the datastore. This ignores whether an entry already
309312
// exists.
310313
func (c *KubeClient) Apply(d *model.KVPair) (*model.KVPair, error) {
314+
log.Debugf("Performing 'Apply' for %+v", d)
311315
switch d.Key.(type) {
312316
case model.WorkloadEndpointKey:
313317
return c.applyWorkloadEndpoint(d)
@@ -316,20 +320,21 @@ func (c *KubeClient) Apply(d *model.KVPair) (*model.KVPair, error) {
316320
case model.IPPoolKey:
317321
return c.ipPoolClient.Apply(d)
318322
default:
319-
log.Infof("'Apply' for %s is no-op", d.Key)
323+
log.Debugf("'Apply' for %s is no-op", d.Key)
320324
return d, nil
321325
}
322326
}
323327

324328
// Delete an entry in the datastore. This is a no-op when using the k8s backend.
325329
func (c *KubeClient) Delete(d *model.KVPair) error {
330+
log.Debugf("Performing 'Delete' for %+v", d)
326331
switch d.Key.(type) {
327332
case model.GlobalConfigKey:
328333
return c.deleteGlobalConfig(d)
329334
case model.IPPoolKey:
330335
return c.ipPoolClient.Delete(d)
331336
default:
332-
log.Warn("Attempt to 'Delete' using kubernetes backend is not supported.")
337+
log.Warn("Attempt to 'Delete' using kubernetes datastore driver is not supported.")
333338
return nil
334339
}
335340
}
@@ -414,11 +419,11 @@ func (c *KubeClient) listProfiles(l model.ProfileListOptions) ([]*model.KVPair,
414419
// getProfile gets the Profile from the k8s API based on existing Namespaces.
415420
func (c *KubeClient) getProfile(k model.ProfileKey) (*model.KVPair, error) {
416421
if k.Name == "" {
417-
return nil, goerrors.New("Missing profile name")
422+
return nil, fmt.Errorf("Profile key missing name: %+v", k)
418423
}
419424
namespaceName, err := c.converter.parseProfileName(k.Name)
420425
if err != nil {
421-
return nil, err
426+
return nil, fmt.Errorf("Failed to parse Profile name: %s", err)
422427
}
423428
namespace, err := c.clientSet.Namespaces().Get(namespaceName, metav1.GetOptions{})
424429
if err != nil {
@@ -430,6 +435,8 @@ func (c *KubeClient) getProfile(k model.ProfileKey) (*model.KVPair, error) {
430435

431436
// applyWorkloadEndpoint patches the existing Pod to include an IP address, if
432437
// one has been set on the workload endpoint.
438+
// TODO: This is only required as a workaround for an upstream k8s issue. Once fixed,
439+
// this should be a no-op. See https://github.com/kubernetes/kubernetes/issues/39113
433440
func (c *KubeClient) applyWorkloadEndpoint(k *model.KVPair) (*model.KVPair, error) {
434441
ips := k.Value.(*model.WorkloadEndpoint).IPv4Nets
435442
if len(ips) > 0 {

lib/backend/k8s/resources/ippools.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,32 @@ func (c *client) Get(key model.Key) (*model.KVPair, error) {
114114

115115
func (c *client) List(list model.ListInterface) ([]*model.KVPair, error) {
116116
kvps := []*model.KVPair{}
117-
tprs := thirdparty.IpPoolList{}
118117
l := list.(model.IPPoolListOptions)
119118

120-
// Build the request.
121-
req := c.tprClient.Get().Resource("ippools").Namespace("kube-system")
119+
// If the CIDR is specified, k8s will return a single resource
120+
// rather than a list, so handle this case separately, using our
121+
// Get method to return the single result.
122122
if l.CIDR.IP != nil {
123-
k := model.IPPoolKey{CIDR: l.CIDR}
124-
req.Name(tprName(k))
123+
log.Info("Performing IP pool List with name")
124+
if kvp, err := c.Get(model.IPPoolKey{CIDR: l.CIDR}); err == nil {
125+
kvps = append(kvps, kvp)
126+
} else {
127+
if !kerrors.IsNotFound(err) {
128+
return nil, K8sErrorToCalico(err, l)
129+
}
130+
}
131+
return kvps, nil
125132
}
126133

134+
// Since are not performing an exact Get, Kubernetes will return a list
135+
// of resources.
136+
tprs := thirdparty.IpPoolList{}
137+
127138
// Perform the request.
128-
err := req.Do().Into(&tprs)
139+
err := c.tprClient.Get().
140+
Resource("ippools").
141+
Namespace("kube-system").
142+
Do().Into(&tprs)
129143
if err != nil {
130144
// Don't return errors for "not found". This just
131145
// means there are no IPPools, and we should return

lib/backend/k8s/syncer.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
142142
// If we need to resync, do so.
143143
if needsResync {
144144
// Set status to ResyncInProgress.
145-
log.Warnf("Resync required - latest versions: %+v", latestVersions)
145+
log.Debugf("Resync required - latest versions: %+v", latestVersions)
146146
syn.callbacks.OnStatusUpdated(api.ResyncInProgress)
147147

148148
// Get snapshot from datastore.
@@ -155,7 +155,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
155155
// Send the snapshot through.
156156
syn.sendUpdates(snap)
157157

158-
log.Warnf("Snapshot complete - start watch from %+v", latestVersions)
158+
log.Debugf("Snapshot complete - start watch from %+v", latestVersions)
159159
syn.callbacks.OnStatusUpdated(api.InSync)
160160

161161
// Create the Kubernetes API watchers.
@@ -226,6 +226,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
226226

227227
// Don't start watches if we're in oneshot mode.
228228
if syn.OneShot {
229+
log.Info("OneShot mode, do not start watches")
229230
return
230231
}
231232

@@ -235,7 +236,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
235236
log.Debugf("Incoming Namespace watch event. Type=%s", event.Type)
236237
if needsResync = syn.eventTriggersResync(event); needsResync {
237238
// We need to resync. Break out into the sync loop.
238-
log.Warn("Event triggered resync: %+v", event)
239+
log.Warnf("Event triggered resync: %+v", event)
239240
continue
240241
}
241242

@@ -248,7 +249,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
248249
log.Debugf("Incoming Pod watch event. Type=%s", event.Type)
249250
if needsResync = syn.eventTriggersResync(event); needsResync {
250251
// We need to resync. Break out into the sync loop.
251-
log.Warn("Event triggered resync: %+v", event)
252+
log.Warnf("Event triggered resync: %+v", event)
252253
continue
253254
}
254255

@@ -263,7 +264,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
263264
log.Debugf("Incoming NetworkPolicy watch event. Type=%s", event.Type)
264265
if needsResync = syn.eventTriggersResync(event); needsResync {
265266
// We need to resync. Break out into the sync loop.
266-
log.Warn("Event triggered resync: %+v", event)
267+
log.Warnf("Event triggered resync: %+v", event)
267268
continue
268269
}
269270

@@ -275,7 +276,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
275276
log.Debugf("Incoming GlobalConfig watch event. Type=%s", event.Type)
276277
if needsResync = syn.eventTriggersResync(event); needsResync {
277278
// We need to resync. Break out into the sync loop.
278-
log.Warn("Event triggered resync: %+v", event)
279+
log.Warnf("Event triggered resync: %+v", event)
279280
continue
280281
}
281282

@@ -287,7 +288,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
287288
log.Debugf("Incoming IPPool watch event. Type=%s", event.Type)
288289
if needsResync = syn.eventTriggersResync(event); needsResync {
289290
// We need to resync. Break out into the sync loop.
290-
log.Warn("Event triggered resync: %+v", event)
291+
log.Warnf("Event triggered resync: %+v", event)
291292
continue
292293
}
293294

@@ -339,6 +340,8 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[string]bool, resou
339340
time.Sleep(1 * time.Second)
340341
continue
341342
}
343+
log.Info("Received Namespace List() response")
344+
342345
versions.namespaceVersion = nsList.ListMeta.ResourceVersion
343346
for _, ns := range nsList.Items {
344347
// The Syncer API expects a profile to be broken into its underlying
@@ -378,6 +381,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[string]bool, resou
378381
time.Sleep(1 * time.Second)
379382
continue
380383
}
384+
log.Info("Received NetworkPolicy List() response")
381385

382386
versions.networkPolicyVersion = npList.ListMeta.ResourceVersion
383387
for _, np := range npList.Items {
@@ -394,9 +398,17 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[string]bool, resou
394398
time.Sleep(1 * time.Second)
395399
continue
396400
}
401+
log.Info("Received Pod List() response")
397402

398403
versions.podVersion = poList.ListMeta.ResourceVersion
399404
for _, po := range poList.Items {
405+
// Ignore any updates for host networked pods.
406+
if syn.kc.converter.isHostNetworked(&po) {
407+
log.Debugf("Skipping host networked pod %s/%s", po.ObjectMeta.Namespace, po.ObjectMeta.Name)
408+
continue
409+
}
410+
411+
// Convert to a workload endpoint.
400412
wep, _ := syn.kc.converter.podToWorkloadEndpoint(&po)
401413
if wep != nil {
402414
snap = append(snap, *wep)
@@ -405,25 +417,29 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[string]bool, resou
405417
}
406418

407419
// Sync GlobalConfig.
420+
log.Info("Syncing GlobalConfig")
408421
confList, err := syn.kc.listGlobalConfig(model.GlobalConfigListOptions{})
409422
if err != nil {
410423
log.Warnf("Error querying GlobalConfig during snapshot, retrying: %s", err)
411424
time.Sleep(1 * time.Second)
412425
continue
413426
}
427+
log.Info("Received GlobalConfig List() response")
414428

415429
for _, c := range confList {
416430
snap = append(snap, *c)
417431
keys[c.Key.String()] = true
418432
}
419433

420434
// Sync IP Pools.
435+
log.Info("Syncing IP Pools")
421436
poolList, err := syn.kc.List(model.IPPoolListOptions{})
422437
if err != nil {
423438
log.Warnf("Error querying IP Pools during snapshot, retrying: %s", err)
424439
time.Sleep(1 * time.Second)
425440
continue
426441
}
442+
log.Info("Received IP Pools List() response")
427443

428444
for _, p := range poolList {
429445
snap = append(snap, *p)
@@ -506,10 +522,12 @@ func (syn *kubeSyncer) parsePodEvent(e watch.Event) *model.KVPair {
506522
return nil
507523
}
508524

509-
// Convert the received Namespace into a KVPair.
525+
// Convert the received Pod into a KVPair.
510526
kvp, err := syn.kc.converter.podToWorkloadEndpoint(pod)
511527
if err != nil {
512-
log.Panicf("%s", err)
528+
// If we fail to parse, then ignore this update and emit a log.
529+
log.WithField("error", err).Error("Failed to parse Pod event")
530+
return nil
513531
}
514532

515533
// We behave differently based on the event type.
@@ -519,7 +537,7 @@ func (syn *kubeSyncer) parsePodEvent(e watch.Event) *model.KVPair {
519537
log.Debugf("Delete for pod %s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
520538
kvp.Value = nil
521539

522-
// Remove it from the cache, if it is there.
540+
// Remove it from the label cache, if it is there.
523541
workload := kvp.Key.(model.WorkloadEndpointKey).WorkloadID
524542
delete(syn.labelCache, workload)
525543
default:

0 commit comments

Comments
 (0)