feat: refactor with step logic, rename maintenance

This commit is contained in:
unteem 2024-03-01 00:08:12 +01:00
parent a149dc1543
commit 3f4b6ab72c
5 changed files with 312 additions and 183 deletions

View file

@ -32,7 +32,7 @@ type MigrationSpec struct {
//+kubebuilder:validation:Required
TargetRef corev1.ObjectReference `json:"targetRef"`
//+kubebuilder:validation:Optional
Resume bool `json:"resume,omitempty"`
Rollback bool `json:"rollback,omitempty"`
//+kubebuilder:validation:Optional
DryRun bool `json:"dryRun,omitempty"`
}
@ -44,10 +44,10 @@ type MigrationStatus struct {
ImportedBuckets []string `json:"importedBuckets,omitempty"`
//+optional
ImportedPostgres []string `json:"importedPostgres,omitempty"`
//+optional
MaintenanceOn bool `json:"maintenanceOn,omitempty"`
//+optional
ProxingTraffic bool `json:"proxingTraffic,omitempty"`
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`
// +optional
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
}
//+kubebuilder:object:root=true

View file

@ -345,6 +345,14 @@ func (in *MigrationStatus) DeepCopyInto(out *MigrationStatus) {
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.StartTime != nil {
in, out := &in.StartTime, &out.StartTime
*out = (*in).DeepCopy()
}
if in.CompletionTime != nil {
in, out := &in.CompletionTime, &out.CompletionTime
*out = (*in).DeepCopy()
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MigrationStatus.

View file

@ -49,7 +49,7 @@ spec:
properties:
dryRun:
type: boolean
resume:
rollback:
type: boolean
selectors:
description: Application selectors used by wader service
@ -125,6 +125,9 @@ spec:
status:
description: MigrationStatus defines the observed state of Migration
properties:
completionTime:
format: date-time
type: string
conditions:
items:
description: "Condition contains details for one aspect of the current
@ -204,10 +207,9 @@ spec:
items:
type: string
type: array
maintenanceOn:
type: boolean
proxingTraffic:
type: boolean
startTime:
format: date-time
type: string
version:
type: string
type: object

View file

@ -24,7 +24,9 @@ import (
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
@ -53,22 +55,24 @@ type MigrationReconciler struct {
func (r *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling")
var mgrt lshportability.Migration
patcher, result := lshr.Initialize(ctx, r, req, &mgrt)
if result != nil {
return result.Unwrap()
err := r.Client.Get(ctx, req.NamespacedName, &mgrt)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// TODO manage suspend in conditions ?
if mgrt.Spec.Suspend {
return ctrl.Result{}, nil
}
if lshr.IsFinalizing(&mgrt) {
return lshr.Finalize(ctx, r, patcher, &mgrt, func() error {
return nil
})
}
patcher := patch.NewSerialPatcher(&mgrt, r.Client)
// TODO: when lshr.Initialize sets to reconciling. May not be necessary
if conditions.IsReady(&mgrt) {
// TODO add annotation to force reconciliation
if !mgrt.Status.CompletionTime.IsZero() {
log.Info("skipping reconciliation because already completed")
return ctrl.Result{}, nil
}
@ -80,53 +84,12 @@ func (r *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}
if mgrt.Spec.Resume || ShouldBeResumed(&mgrt) {
if mgrt.Status.ProxingTraffic {
_, err = c.ResumeTraffic(ctx, &protoportabilityv1alpha1.ResumeTrafficRequest{
Selectors: mgrt.Spec.Selectors,
})
if err != nil {
return ctrl.Result{}, err
}
mgrt.Status.ProxingTraffic = false
}
if ShouldBeResumed(&mgrt) {
mgrt.Spec.Suspend = true
mgrt.Status.ProxingTraffic = false
return ctrl.Result{}, lshr.Patch(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
}
return ctrl.Result{}, lshr.Complete(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
}
log.Info("getting app", "selectors", mgrt.Spec.Selectors)
manifestsResp, err := c.GetApp(ctx, &protoportabilityv1alpha1.GetAppRequest{Selectors: mgrt.Spec.Selectors})
if err != nil {
return ctrl.Result{}, err
}
if mgrt.Spec.DryRun {
secret := corev1.Secret{}
secret.Namespace = mgrt.Namespace
secret.Name = fmt.Sprintf("%s-dry-run.migration.portability.libre.sh", mgrt.Name)
err = lshr.CreateOrPatch(ctx, r, &secret, func() error {
if secret.Data == nil {
secret.Data = make(map[string][]byte)
}
secret.Data["app.json"] = manifestsResp.Application
for i, m := range manifestsResp.ExtraManifests {
secret.Data[fmt.Sprintf("manifest-%d.json", i)] = m
}
return nil
})
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, lshr.Complete(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
}
app := &unstructured.Unstructured{}
err = app.UnmarshalJSON(manifestsResp.Application)
if err != nil {
@ -135,126 +98,67 @@ func (r *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
app.SetNamespace(mgrt.Namespace)
app.SetName(mgrt.Name)
for _, m := range manifestsResp.ExtraManifests {
obj := &unstructured.Unstructured{}
err = obj.UnmarshalJSON(m)
if err != nil {
return ctrl.Result{}, err
steps := []step{
{name: "CreateApp", reconcile: r.CreateAppStepFunc(ctx, log, c, patcher, &mgrt, app, manifestsResp.ExtraManifests)},
{name: "Maintenance", reconcile: r.MaintenanceStepFunc(ctx, log, patcher, &mgrt, app)},
{name: "ProxyTraffic", reconcile: r.ProxyTrafficStepFunc(ctx, log, c, patcher, &mgrt)},
{name: "ImportPostgres", reconcile: r.ImportPostgresStepFunc(ctx, log, c, patcher, &mgrt, app)},
{name: "StartApp", reconcile: r.StartAppStepFunc(ctx, log, c, &mgrt, app)},
{name: "RemoveMaintenance", reconcile: r.RemoveMaintenanceStepFunc(ctx, log, c, patcher, &mgrt, app)},
}
obj.SetNamespace(mgrt.Namespace)
err = r.Create(ctx, obj)
if client.IgnoreAlreadyExists(err) != nil {
if mgrt.Spec.DryRun {
steps = []step{
{name: "DryRunStep", reconcile: r.DryRunStepFunc(ctx, log, c, patcher, &mgrt)},
}
}
if ShouldRollback(&mgrt) {
// TODO de we do some cleanup ?
steps = []step{
{name: "Rollback", reconcile: r.RollbackStepFunc(ctx, log, c, patcher, &mgrt)},
}
}
if mgrt.Status.StartTime.IsZero() {
now := metav1.Now()
mgrt.Status.StartTime = &now
for _, step := range steps {
conditions.MarkFalse(&mgrt, step.conditionType(), meta.ProgressingReason, "")
}
if err := r.patch(ctx, patcher, &mgrt); err != nil {
return ctrl.Result{}, err
}
}
log.Info("getting certs", "selectors", mgrt.Spec.Selectors)
certResp, err := c.GetCert(ctx, &protoportabilityv1alpha1.GetCertRequest{Selectors: mgrt.Spec.Selectors})
if err != nil {
return ctrl.Result{}, err
for _, step := range steps {
if conditions.IsTrue(&mgrt, step.conditionType()) {
continue
}
secret := &corev1.Secret{}
lshr.SetResourceNamespacedName(app, secret, "tls")
err = lshr.CreateOrPatch(ctx, r, secret, func() error {
lshr.ApplyLabels(app, secret, nil)
secret.Type = corev1.SecretTypeTLS
secret.Data = make(map[string][]byte, 3)
secret.Data["tls.key"] = certResp.Key
secret.Data["tls.crt"] = certResp.Crt
secret.Data["ca.crt"] = certResp.Ca
return nil
})
if err != nil {
return ctrl.Result{}, err
res := step.reconcile()
if res != nil {
return res.Unwrap()
}
err = r.Get(ctx, client.ObjectKeyFromObject(app), app)
if err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
log.Info("creating", "app", app.GetName())
err = importer.CreateApplication(ctx, r, app)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
if importer.ShouldWaitForAppDeps(app) {
log.Info("waiting for app dependencies")
conditions.MarkStalled(&mgrt, lshmeta.DependenciesNotReady, "%s %s deps are not ready", app.GetKind(), app.GetName())
return ctrl.Result{RequeueAfter: 5 * time.Second}, lshr.Patch(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
}
if !mgrt.Status.MaintenanceOn {
log.Info("setting maintenance")
err = r.reconcileMaintenance(ctx, mgrt, app)
if err != nil {
return ctrl.Result{}, err
}
mgrt.Status.MaintenanceOn = true
return ctrl.Result{Requeue: true}, lshr.Patch(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
}
// TODO improve
clusterSecret := &corev1.Secret{}
err = r.Get(ctx, types.NamespacedName{Namespace: "libresh-system", Name: "cluster-settings"}, clusterSecret)
if err != nil {
return ctrl.Result{}, err
}
var clusterDomain string
if clusterSecret.Data["CLUSTER_DOMAIN"] == nil {
return ctrl.Result{}, fmt.Errorf("cluster domain is not defined in cluster-settings")
}
clusterDomain = string(clusterSecret.Data["CLUSTER_DOMAIN"])
log.Info("proxy traffic", "selectors", mgrt.Spec.Selectors, "destination", clusterDomain)
// TODO check if it is indempotent
if !mgrt.Status.ProxingTraffic {
_, err = c.ProxyTraffic(ctx, &protoportabilityv1alpha1.ProxyTrafficRequest{Selectors: mgrt.Spec.Selectors, Destination: clusterDomain})
if err != nil {
return ctrl.Result{}, err
}
mgrt.Status.ProxingTraffic = true
err = lshr.Patch(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
if err != nil {
conditions.MarkTrue(&mgrt, step.conditionType(), meta.SucceededReason, "")
if err := r.patch(ctx, patcher, &mgrt); err != nil {
return ctrl.Result{}, err
}
}
// From here if it fails, revert traffic
err = r.importPostgress(ctx, c, patcher, &mgrt, app)
if err != nil {
conditions.MarkStalled(&mgrt, "PostgresImportFailed", err.Error())
return ctrl.Result{}, lshr.Patch(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
}
err = DeleteApplicationCondition(ctx, r, app)
if err != nil {
if mgrt.Status.CompletionTime.IsZero() {
now := metav1.Now()
mgrt.Status.CompletionTime = &now
if err := r.patch(ctx, patcher, &mgrt); err != nil {
return ctrl.Result{}, err
}
maintenance := &lshlifecycle.Maintenance{}
lshr.SetResourceNamespacedName(app, maintenance)
err = r.Delete(ctx, maintenance)
if err != nil {
return ctrl.Result{}, err
}
conditions.Delete(&mgrt, meta.ReconcilingCondition)
// TODO add done in X time
log.Info("done")
return ctrl.Result{}, lshr.Complete(ctx, r, patcher, &mgrt, lshr.PatchOpts{})
return ctrl.Result{}, nil
// TODO: Tolerated downtime
}
// SetupWithManager sets up the controller with the Manager.
@ -275,23 +179,240 @@ func (r *MigrationReconciler) OwnedConditions() []string {
}
}
func ShouldBeResumed(mgrt *lshportability.Migration) bool {
func (r *MigrationReconciler) patch(ctx context.Context, patcher *patch.SerialPatcher, mgrt *lshportability.Migration) error {
conditions.MarkTrue(mgrt, meta.ReadyCondition, meta.SucceededReason, "Migrate")
conditions.SetSummary(mgrt, meta.ReadyCondition,
conditions.WithStepCounter(),
)
return lshr.Patch(ctx, r, patcher, mgrt, lshr.PatchOpts{DisableReadyCondition: true})
}
func ShouldRollback(mgrt *lshportability.Migration) bool {
switch conditions.GetReason(mgrt, meta.StalledCondition) {
case "PostgresImportFailed":
return true
}
return false
return mgrt.Spec.Rollback
}
func DeleteApplicationCondition(ctx context.Context, r lshr.Reconciler, app *unstructured.Unstructured) error {
type step struct {
name string
reconcile func() *lshr.Result
}
func (s *step) conditionType() string {
return s.name + "Step"
}
func (r *MigrationReconciler) DryRunStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, patcher *patch.SerialPatcher, mgrt *lshportability.Migration) func() *lshr.Result {
return func() *lshr.Result {
log.Info("getting app", "selectors", mgrt.Spec.Selectors)
manifestsResp, err := c.GetApp(ctx, &protoportabilityv1alpha1.GetAppRequest{Selectors: mgrt.Spec.Selectors})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
secret := corev1.Secret{}
secret.Namespace = mgrt.Namespace
secret.Name = fmt.Sprintf("%s-dry-run.migration.portability.libre.sh", mgrt.Name)
err = lshr.CreateOrPatch(ctx, r, &secret, func() error {
if secret.Data == nil {
secret.Data = make(map[string][]byte)
}
secret.Data["app.json"] = manifestsResp.Application
for i, m := range manifestsResp.ExtraManifests {
secret.Data[fmt.Sprintf("manifest-%d.json", i)] = m
}
return nil
})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
return nil
}
}
func (r *MigrationReconciler) CreateAppStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, patcher *patch.SerialPatcher, mgrt *lshportability.Migration, app *unstructured.Unstructured, extraManifests [][]byte) func() *lshr.Result {
return func() *lshr.Result {
for _, m := range extraManifests {
obj := &unstructured.Unstructured{}
err := obj.UnmarshalJSON(m)
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
obj.SetNamespace(mgrt.Namespace)
err = r.Create(ctx, obj)
if client.IgnoreAlreadyExists(err) != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
}
log.Info("getting certs", "selectors", mgrt.Spec.Selectors)
certResp, err := c.GetCert(ctx, &protoportabilityv1alpha1.GetCertRequest{Selectors: mgrt.Spec.Selectors})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
secret := &corev1.Secret{}
lshr.SetResourceNamespacedName(app, secret, "tls")
err = lshr.CreateOrPatch(ctx, r, secret, func() error {
lshr.ApplyLabels(app, secret, nil)
secret.Type = corev1.SecretTypeTLS
secret.Data = make(map[string][]byte, 3)
secret.Data["tls.key"] = certResp.Key
secret.Data["tls.crt"] = certResp.Crt
secret.Data["ca.crt"] = certResp.Ca
return nil
})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
err = r.Get(ctx, client.ObjectKeyFromObject(app), app)
if err != nil {
if !apierrors.IsNotFound(err) {
return lshr.WrapResult(ctrl.Result{}, err)
}
log.Info("creating", "app", app.GetName())
err = importer.CreateApplication(ctx, r, app)
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
return lshr.WrapResult(ctrl.Result{Requeue: true}, lshr.Patch(ctx, r, patcher, mgrt, lshr.PatchOpts{}))
}
if importer.ShouldWaitForAppDeps(app) {
log.Info("waiting for app dependencies")
// TODO to create app condition ?
conditions.MarkStalled(mgrt, lshmeta.DependenciesNotReady, "%s %s deps are not ready", app.GetKind(), app.GetName())
return lshr.WrapResult(ctrl.Result{RequeueAfter: 5 * time.Second}, lshr.Patch(ctx, r, patcher, mgrt, lshr.PatchOpts{}))
}
conditions.Delete(mgrt, lshmeta.DependenciesNotReady)
// TODO remove stalled condition
return nil
}
}
func (r *MigrationReconciler) MaintenanceStepFunc(ctx context.Context, log logr.Logger, patcher *patch.SerialPatcher, mgrt *lshportability.Migration, app client.Object) func() *lshr.Result {
return func() *lshr.Result {
log.Info("setting maintenance")
err := r.reconcileMaintenance(ctx, mgrt, app)
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
return nil
}
}
func (r *MigrationReconciler) ProxyTrafficStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, patcher *patch.SerialPatcher, mgrt *lshportability.Migration) func() *lshr.Result {
return func() *lshr.Result {
// TODO improve
clusterSecret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{Namespace: "libresh-system", Name: "cluster-settings"}, clusterSecret)
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
var clusterDomain string
if clusterSecret.Data["CLUSTER_DOMAIN"] == nil {
return lshr.WrapResult(ctrl.Result{}, fmt.Errorf("cluster domain is not defined in cluster-settings"))
}
clusterDomain = string(clusterSecret.Data["CLUSTER_DOMAIN"])
log.Info("proxy traffic", "selectors", mgrt.Spec.Selectors, "destination", clusterDomain)
_, err = c.ProxyTraffic(ctx, &protoportabilityv1alpha1.ProxyTrafficRequest{Selectors: mgrt.Spec.Selectors, Destination: clusterDomain})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
return nil
}
}
func (r *MigrationReconciler) ImportPostgresStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, patcher *patch.SerialPatcher, mgrt *lshportability.Migration, app *unstructured.Unstructured) func() *lshr.Result {
return func() *lshr.Result {
err := r.importPostgress(ctx, c, patcher, mgrt, app)
if err != nil {
// TODO step PostgresImport
conditions.MarkStalled(mgrt, "PostgresImportFailed", err.Error())
return lshr.WrapResult(ctrl.Result{}, lshr.Patch(ctx, r, patcher, mgrt, lshr.PatchOpts{}))
}
return nil
}
}
func (r *MigrationReconciler) StartAppStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, mgrt *lshportability.Migration, app *unstructured.Unstructured) func() *lshr.Result {
return func() *lshr.Result {
log.Info("starting app")
// TODO patch to only delete the condition without get
err := r.Get(ctx, client.ObjectKeyFromObject(app), app)
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
patcher, err := patch.NewHelper(app, r)
if err != nil {
return err
return lshr.WrapResult(ctrl.Result{}, err)
}
conditions.Delete(conditions.UnstructuredSetter(app), lshportability.ImportInProgresStatus)
return patcher.Patch(ctx, app,
err = patcher.Patch(ctx, app,
patch.WithFieldOwner(r.Name()),
patch.WithOwnedConditions{Conditions: []string{lshportability.ImportInProgresStatus}})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
return nil
}
}
func (r *MigrationReconciler) RemoveMaintenanceStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, patcher *patch.SerialPatcher, mgrt *lshportability.Migration, app *unstructured.Unstructured) func() *lshr.Result {
return func() *lshr.Result {
err := r.Get(ctx, client.ObjectKeyFromObject(app), app)
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
log.Info("waiting for app to be ready")
if conditions.IsReady(conditions.UnstructuredGetter(app)) {
maintenance := &lshlifecycle.Maintenance{}
// TODO avoid collusion in maintenance, only one maintenance per ingressRefs ?
maintenance.Namespace = mgrt.Namespace
maintenance.Name = fmt.Sprintf("%s-migration", mgrt.Name)
log.Info("removing maintenance")
err := r.Delete(ctx, maintenance)
if !apierrors.IsNotFound(err) {
return lshr.WrapResult(ctrl.Result{}, err)
}
return nil
}
return lshr.WrapResult(ctrl.Result{Requeue: true}, nil)
}
}
func (r *MigrationReconciler) RollbackStepFunc(ctx context.Context, log logr.Logger, c protoportabilityv1alpha1.WaderServiceClient, patcher *patch.SerialPatcher, mgrt *lshportability.Migration) func() *lshr.Result {
return func() *lshr.Result {
if conditions.IsTrue(mgrt, "ProxyTraffic") {
_, err := c.ResumeTraffic(ctx, &protoportabilityv1alpha1.ResumeTrafficRequest{
Selectors: mgrt.Spec.Selectors,
})
if err != nil {
return lshr.WrapResult(ctrl.Result{}, err)
}
}
return nil
}
}

View file

@ -30,11 +30,7 @@ import (
lshportability "libre.sh/api/portability/v1alpha1"
)
func (r *MigrationReconciler) reconcileMaintenance(ctx context.Context, mgrt lshportability.Migration, app client.Object) error {
if mgrt.Status.MaintenanceOn {
return nil
}
func (r *MigrationReconciler) reconcileMaintenance(ctx context.Context, mgrt *lshportability.Migration, app client.Object) error {
ingressList := netv1.IngressList{}
err := r.List(ctx, &ingressList, client.InNamespace(mgrt.Namespace), client.MatchingLabels(lshr.GetLabelSelector(app, nil)))
if err != nil {
@ -53,8 +49,10 @@ func (r *MigrationReconciler) reconcileMaintenance(ctx context.Context, mgrt lsh
maintenance := &lshlifecycle.Maintenance{}
lshr.SetResourceNamespacedName(app, maintenance)
maintenance.Namespace = mgrt.Namespace
maintenance.Name = fmt.Sprintf("%s-migration", mgrt.Name)
return lshr.CreateOrPatch(ctx, r, maintenance, func() error {
maintenance.Spec.IngressRefs = ingressRefs
return controllerutil.SetControllerReference(app, maintenance, r.Scheme())
return controllerutil.SetControllerReference(mgrt, maintenance, r.Scheme())
})
}