grim/tagpull
Clone
Summary
Browse
Changes
Graph
Add a readme
draft
default
tip
2020-12-29, Gary Kramlich
3ecd14cfca54
Add a readme
package
main
import
(
"fmt"
"os"
"os/user"
"path/filepath"
"strings"
"time"
log
"github.com/sirupsen/logrus"
appsv1
"k8s.io/api/apps/v1"
corev1
"k8s.io/api/core/v1"
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type
Operator
struct
{
tickrate
time
.
Duration
namespace
string
cache
map
[
string
]
time
.
Time
clientset
*
kubernetes
.
Clientset
}
func
NewOperator
(
tickrate
time
.
Duration
,
namespace
string
)
(
*
Operator
,
error
)
{
o
:=
&
Operator
{
tickrate
:
tickrate
,
namespace
:
namespace
,
cache
:
map
[
string
]
time
.
Time
{},
}
if
err
:=
o
.
getClientSet
();
err
!=
nil
{
return
nil
,
err
}
return
o
,
nil
}
func
(
o
*
Operator
)
getClientSet
()
error
{
config
,
err
:=
rest
.
InClusterConfig
()
if
err
!=
nil
{
if
kubeconfig
,
found
:=
os
.
LookupEnv
(
"KUBECONFIG"
);
found
{
config
,
err
=
clientcmd
.
BuildConfigFromFlags
(
""
,
kubeconfig
)
}
else
{
user
,
uErr
:=
user
.
Current
()
if
uErr
!=
nil
{
return
uErr
}
path
:=
filepath
.
Join
(
user
.
HomeDir
,
".kube"
,
"config"
)
config
,
err
=
clientcmd
.
BuildConfigFromFlags
(
""
,
path
)
}
}
if
err
!=
nil
{
return
err
}
clientset
,
err
:=
kubernetes
.
NewForConfig
(
config
)
if
err
!=
nil
{
return
err
}
o
.
clientset
=
clientset
return
nil
}
func
(
o
*
Operator
)
Run
()
{
for
{
log
.
Info
(
"checking deployments"
)
ts
:=
time
.
Now
().
UTC
()
deployments
,
err
:=
o
.
clientset
.
AppsV1
().
Deployments
(
o
.
namespace
).
List
(
metav1
.
ListOptions
{})
if
err
!=
nil
{
log
.
Warningf
(
"failed to get deployments: %s"
,
err
.
Error
())
}
else
{
for
_
,
deployment
:=
range
deployments
.
Items
{
// before checking the deployment, make sure we have the
// annotation.
if
rawInterval
,
found
:=
deployment
.
Annotations
[
Annotation
];
found
{
o
.
checkDeployment
(
deployment
,
ts
,
rawInterval
)
}
}
}
time
.
Sleep
(
o
.
tickrate
)
}
}
func
(
o
*
Operator
)
checkDeployment
(
deployment
appsv1
.
Deployment
,
ts
time
.
Time
,
rawInterval
string
)
error
{
name
:=
deployment
.
Namespace
+
"/"
+
deployment
.
Name
interval
,
err
:=
time
.
ParseDuration
(
rawInterval
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"deployment %s has an invalid duration of %q"
,
name
,
rawInterval
)
}
lastCheck
,
found
:=
o
.
cache
[
name
]
if
!
found
{
log
.
Infof
(
"found new deployment %s with interval %s"
,
name
,
interval
)
lastCheck
=
time
.
Time
{}
}
if
ts
.
Sub
(
lastCheck
)
<
interval
{
log
.
Debugf
(
"%s will be checked in %s"
,
name
,
time
.
Until
(
lastCheck
.
Add
(
interval
)).
Round
(
time
.
Second
),
)
return
nil
}
log
.
Debugf
(
"checking deployment %s"
,
name
)
// create a cache for the images in this deployment
images
:=
map
[
string
]
string
{}
o
.
clientset
.
AppsV1
().
Deployments
(
deployment
.
Namespace
).
Update
(
&
deployment
)
listOpts
:=
metav1
.
ListOptions
{
LabelSelector
:
createLabelString
(
deployment
.
Spec
.
Template
.
Labels
),
}
pods
,
err
:=
o
.
clientset
.
CoreV1
().
Pods
(
deployment
.
Namespace
).
List
(
listOpts
)
if
err
!=
nil
{
return
err
}
for
_
,
pod
:=
range
pods
.
Items
{
if
err
:=
o
.
checkPod
(
pod
,
images
);
err
!=
nil
{
log
.
Warnf
(
"%v"
,
err
)
}
}
o
.
cache
[
name
]
=
ts
return
nil
}
func
(
o
*
Operator
)
checkPod
(
pod
corev1
.
Pod
,
images
map
[
string
]
string
)
error
{
for
_
,
container
:=
range
pod
.
Spec
.
Containers
{
log
.
Debugf
(
"checking pod %s/%s"
,
pod
.
ObjectMeta
.
Namespace
,
pod
.
Name
)
if
container
.
ImagePullPolicy
!=
corev1
.
PullAlways
{
log
.
Warnf
(
"%s/%s ImagePullPolicy is %s not %s"
,
pod
.
ObjectMeta
.
Namespace
,
pod
.
Name
,
container
.
ImagePullPolicy
,
corev1
.
PullAlways
,
)
}
// since the statuses is not a map, we have to iterate it to find
// the containers we're interested in.
for
_
,
status
:=
range
pod
.
Status
.
ContainerStatuses
{
if
status
.
Name
!=
container
.
Name
{
continue
}
if
!
status
.
Ready
{
log
.
Infof
(
"%s/%s:%s is not ready, skipping"
,
pod
.
ObjectMeta
.
Namespace
,
pod
.
Name
,
container
.
Name
)
break
}
cycle
,
err
:=
o
.
checkContainer
(
container
,
status
.
ImageID
,
images
)
if
err
!=
nil
{
log
.
Warnf
(
"%s/%s:%s failed check: %v"
,
pod
.
ObjectMeta
.
Namespace
,
pod
.
Name
,
container
.
Name
,
err
,
)
continue
}
if
cycle
{
err
:=
o
.
clientset
.
CoreV1
().
Pods
(
pod
.
ObjectMeta
.
Namespace
).
Delete
(
pod
.
Name
,
&
metav1
.
DeleteOptions
{})
if
err
!=
nil
{
log
.
Warnf
(
"failed to cycle pod %s: %v"
,
pod
.
Name
,
err
)
}
else
{
log
.
Infof
(
"cycled pod %s"
,
pod
.
Name
)
}
}
else
{
log
.
Debugf
(
"pod %s/%s is up to date"
,
pod
.
ObjectMeta
.
Namespace
,
pod
.
Name
)
}
}
}
return
nil
}
func
(
o
*
Operator
)
checkContainer
(
container
corev1
.
Container
,
imageID
string
,
digests
map
[
string
]
string
)
(
bool
,
error
)
{
// now that we know that the container has a ImagePullPolicy of
// Always, we can check it's digest against the tags digest.
// look up the digest for the local version in k8s
if
strings
.
HasPrefix
(
imageID
,
"docker-pullable://"
)
{
parts
:=
strings
.
SplitN
(
imageID
,
"@"
,
2
)
if
len
(
parts
)
>
1
{
imageID
=
parts
[
1
]
}
else
{
imageID
=
parts
[
0
]
}
}
localDigest
,
found
:=
digests
[
imageID
]
if
!
found
{
digest
,
err
:=
registryGetDigest
(
container
.
Image
,
imageID
)
if
err
!=
nil
{
return
false
,
err
}
digests
[
imageID
]
=
digest
localDigest
=
digest
}
// now figure out the digest for what's on the registry
remoteDigest
,
found
:=
digests
[
container
.
Image
]
if
!
found
{
repo
,
tag
:=
parseRepositoryAndTag
(
container
.
Image
)
digest
,
err
:=
registryGetDigest
(
repo
,
tag
)
if
err
!=
nil
{
return
false
,
err
}
digests
[
container
.
Image
]
=
digest
remoteDigest
=
digest
}
if
localDigest
!=
remoteDigest
{
log
.
Debugf
(
"image %q is out of date: %s != %s"
,
container
.
Image
,
localDigest
,
remoteDigest
,
)
log
.
Infof
(
"image digests differ, cycling pod"
)
return
true
,
nil
}
return
false
,
nil
}