Skip to content

Commit

Permalink
Merge pull request #447 from fasaxc/k8s-sync-hc
Browse files Browse the repository at this point in the history
Generate events for host config when nodes change.
  • Loading branch information
fasaxc authored Jun 10, 2017
2 parents 7324160 + a1a9791 commit ca60d99
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions lib/backend/k8s/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
// Other watcher vars.
var nsChan, poChan, npChan, snpChan, gcChan, poolChan, noChan <-chan watch.Event
var event watch.Event
var kvp *model.KVPair
var opts metav1.ListOptions

log.Info("Starting Kubernetes API read loop")
Expand Down Expand Up @@ -489,7 +488,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it.
if kvp = syn.parsePodEvent(event); kvp != nil {
if kvp := syn.parsePodEvent(event); kvp != nil {
// Only send the update if we care about it. We filter
// out a number of events that aren't useful for us.
latestVersions.podVersion = kvp.Revision.(string)
Expand All @@ -505,7 +504,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseNetworkPolicyEvent(event)
kvp := syn.parseNetworkPolicyEvent(event)
latestVersions.networkPolicyVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_NP)
case event = <-snpChan:
Expand All @@ -520,7 +519,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseSystemNetworkPolicyEvent(event)
kvp := syn.parseSystemNetworkPolicyEvent(event)
latestVersions.systemNetworkPolicyVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_SNP)
case event = <-gcChan:
Expand All @@ -535,7 +534,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseGlobalConfigEvent(event)
kvp := syn.parseGlobalConfigEvent(event)
latestVersions.globalConfigVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_GC)
case event = <-poolChan:
Expand All @@ -550,23 +549,30 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseIPPoolEvent(event)
kvp := syn.parseIPPoolEvent(event)
latestVersions.poolVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_IP)
case event = <-noChan:
log.Debugf("Incoming Node watch event. Type=%s", event.Type)
if syn.eventNeedsResync(event) {
syn.needsResync[KEY_NO] = true
syn.needsResync[KEY_HC] = true
continue
} else if syn.eventRestartsWatch(event, KEY_NO) {
syn.needsResync[KEY_NO] = true
syn.needsResync[KEY_HC] = true
syn.closeWatcher(KEY_NO)
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseNodeEvent(event)
latestVersions.nodeVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_NO)
kvpHostIP, kvpIPIPAddr := syn.parseNodeEvent(event)
log.WithFields(log.Fields{
"kvpHostIP": kvpHostIP,
"kvpIPIPAddr": kvpIPIPAddr,
}).Debug("Got node KVs.")
latestVersions.nodeVersion = kvpHostIP.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvpHostIP}, KEY_NO)
syn.sendUpdates([]model.KVPair{*kvpIPIPAddr}, KEY_HC)
}
}
}
Expand Down Expand Up @@ -893,7 +899,7 @@ func (syn *kubeSyncer) parseNamespaceEvent(e watch.Event) []model.KVPair {
return []model.KVPair{*rules, *tags, *labels}
}

func (syn *kubeSyncer) parseNodeEvent(e watch.Event) *model.KVPair {
func (syn *kubeSyncer) parseNodeEvent(e watch.Event) (*model.KVPair, *model.KVPair) {
node, ok := e.Object.(*k8sapi.Node)
if !ok {
log.Panicf("Invalid node event. Type: %s, Object: %+v", e.Type, e.Object)
Expand All @@ -910,11 +916,28 @@ func (syn *kubeSyncer) parseNodeEvent(e watch.Event) *model.KVPair {
Revision: kvp.Revision,
}

kvpIPIPAddr, err := getTunIp(node)
if err != nil || kvpIPIPAddr == nil {
// If we failed to parse, err will be non-nil. If it's missing, kvpIPIPAddr will be nil.
// Either way, generate a delete.
log.WithError(err).WithField("node", node.Name).Info(
"Node has no (or invalid) pod CIDR. (Normal for a new node.)")
kvpIPIPAddr = &model.KVPair{
Key: model.HostConfigKey{
Hostname: node.Name,
Name: "IpInIpTunnelAddr",
},
Value: nil,
}
}
kvpIPIPAddr.Revision = kvp.Revision

if e.Type == watch.Deleted {
kvp.Value = nil
kvpHostIp.Value = nil
kvpIPIPAddr.Value = nil
}

return kvpHostIp
return kvpHostIp, kvpIPIPAddr
}

// parsePodEvent returns a KVPair for the given event. If the event isn't
Expand Down

0 comments on commit ca60d99

Please sign in to comment.