grim/tagpull

Update the annotation tag
draft
2020-07-05, Gary Kramlich
33f8b28b8dc3
Update the annotation tag
package main
import (
"fmt"
"os"
"os/user"
"path/filepath"
"strings"
"time"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type Operator struct {
tickrate time.Duration
namespace string
cache map[string]time.Time
clientset *kubernetes.Clientset
}
func NewOperator(tickrate time.Duration, namespace string) (*Operator, error) {
o := &Operator{
tickrate: tickrate,
namespace: namespace,
cache: map[string]time.Time{},
}
if err := o.getClientSet(); err != nil {
return nil, err
}
return o, nil
}
func (o *Operator) getClientSet() error {
config, err := rest.InClusterConfig()
if err != nil {
if kubeconfig, found := os.LookupEnv("KUBECONFIG"); found {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
user, uErr := user.Current()
if uErr != nil {
return uErr
}
path := filepath.Join(user.HomeDir, ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", path)
}
}
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
o.clientset = clientset
return nil
}
func (o *Operator) Run() {
for {
log.Info("checking deployments")
ts := time.Now().UTC()
deployments, err := o.clientset.AppsV1().Deployments(o.namespace).List(metav1.ListOptions{})
if err != nil {
log.Warningf("failed to get deployments: %s", err.Error())
} else {
for _, deployment := range deployments.Items {
// before checking the deployment, make sure we have the
// annotation.
if rawInterval, found := deployment.Spec.Template.Annotations[Annotation]; found {
o.checkDeployment(deployment, ts, rawInterval)
}
}
}
time.Sleep(o.tickrate)
}
}
func (o *Operator) checkDeployment(deployment appsv1.Deployment, ts time.Time, rawInterval string) error {
name := deployment.Namespace + "/" + deployment.Name
interval, err := time.ParseDuration(rawInterval)
if err != nil {
return fmt.Errorf("deployment %s has an invalid duration of %q", name, rawInterval)
}
lastCheck, found := o.cache[name]
if !found {
log.Infof("found new deployment %s with interval %s", name, interval)
lastCheck = time.Time{}
}
if ts.Sub(lastCheck) < interval {
log.Debugf(
"%s will be checked in %s",
name,
time.Until(lastCheck.Add(interval)).Round(time.Second),
)
return nil
}
log.Debugf("checking deployment %s", name)
// create a cache for the images in this deployment
images := map[string]string{}
o.clientset.AppsV1().Deployments(deployment.Namespace).Update(&deployment)
listOpts := metav1.ListOptions{
LabelSelector: createLabelString(deployment.Spec.Template.Labels),
}
pods, err := o.clientset.CoreV1().Pods(deployment.Namespace).List(listOpts)
if err != nil {
return err
}
for _, pod := range pods.Items {
if err := o.checkPod(pod, images); err != nil {
log.Warnf("%v", err)
}
}
o.cache[name] = ts
return nil
}
func (o *Operator) checkPod(pod corev1.Pod, images map[string]string) error {
for _, container := range pod.Spec.Containers {
log.Debugf("checking pod %s/%s", pod.ObjectMeta.Namespace, pod.Name)
if container.ImagePullPolicy != corev1.PullAlways {
log.Warnf(
"%s/%s ImagePullPolicy is %s not %s",
pod.ObjectMeta.Namespace,
pod.Name,
container.ImagePullPolicy,
corev1.PullAlways,
)
}
// since the statuses is not a map, we have to iterate it to find
// the containers we're interested in.
for _, status := range pod.Status.ContainerStatuses {
if status.Name != container.Name {
continue
}
if !status.Ready {
log.Infof("%s/%s:%s is not ready, skipping", pod.ObjectMeta.Namespace, pod.Name, container.Name)
break
}
cycle, err := o.checkContainer(container, status.ImageID, images)
if err != nil {
log.Warnf(
"%s/%s:%s failed check: %v",
pod.ObjectMeta.Namespace,
pod.Name,
container.Name,
err,
)
continue
}
if cycle {
err := o.clientset.CoreV1().Pods(pod.ObjectMeta.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if err != nil {
log.Warnf("failed to cycle pod %s: %v", pod.Name, err)
} else {
log.Infof("cycled pod %s", pod.Name)
}
} else {
log.Debugf("pod %s/%s is up to date", pod.ObjectMeta.Namespace, pod.Name)
}
}
}
return nil
}
func (o *Operator) checkContainer(container corev1.Container, imageID string, digests map[string]string) (bool, error) {
// now that we know that the container has a ImagePullPolicy of
// Always, we can check it's digest against the tags digest.
// look up the digest for the local version in k8s
if strings.HasPrefix(imageID, "docker-pullable://") {
parts := strings.SplitN(imageID, "@", 2)
if len(parts) > 1 {
imageID = parts[1]
} else {
imageID = parts[0]
}
}
localDigest, found := digests[imageID]
if !found {
digest, err := registryGetDigest(container.Image, imageID)
if err != nil {
return false, err
}
digests[imageID] = digest
localDigest = digest
}
// now figure out the digest for what's on the registry
remoteDigest, found := digests[container.Image]
if !found {
repo, tag := parseRepositoryAndTag(container.Image)
digest, err := registryGetDigest(repo, tag)
if err != nil {
return false, err
}
digests[container.Image] = digest
remoteDigest = digest
}
if localDigest != remoteDigest {
log.Debugf(
"image %q is out of date: %s != %s",
container.Image,
localDigest,
remoteDigest,
)
log.Infof("image digests differ, cycling pod")
return true, nil
}
return false, nil
}