diff --git a/README.md b/README.md index 4889d30cb..5e905d1d2 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,8 @@ There are two ways of interacting with AliECS: * [Sampling reservoir](/docs/metrics.md#sampling-reservoir) * [OCC API debugging with grpcc](/docs/using_grpcc_occ.md#occ-api-debugging-with-grpcc) * [Running tasks inside docker](/docs/running_docker.md#running-a-task-inside-a-docker-container) + * Kubernetes + * [ECS bridge to Kubernetes](/docs/kubernetes_ecs.md) * Resources * T. Mrnjavac et. al, [AliECS: A New Experiment Control System for the ALICE Experiment](https://doi.org/10.1051/epjconf/202429502027), CHEP23 diff --git a/common/controlmode/controlmode.go b/common/controlmode/controlmode.go index 6d6dc8fdb..5cab19729 100644 --- a/common/controlmode/controlmode.go +++ b/common/controlmode/controlmode.go @@ -39,6 +39,8 @@ const ( FAIRMQ BASIC HOOK + KUBECTL_DIRECT + KUBECTL_FAIRMQ ) func (cm ControlMode) String() string { @@ -51,6 +53,10 @@ func (cm ControlMode) String() string { return "basic" case HOOK: return "hook" + case KUBECTL_DIRECT: + return "kubectl_direct" + case KUBECTL_FAIRMQ: + return "kubectl_fairmq" } return "direct" } @@ -71,6 +77,10 @@ func (cm *ControlMode) UnmarshalText(b []byte) error { *cm = BASIC case "hook": *cm = HOOK + case "kubectl_direct": + *cm = KUBECTL_DIRECT + case "kubectl_fairmq": + *cm = KUBECTL_FAIRMQ default: *cm = DIRECT } diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 5e2d60fe4..737cad73e 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -1432,7 +1432,8 @@ func makeTaskForMesosResources( cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%d", "OCC_CONTROL_PORT", controlPort)) } - if cmd.ControlMode == controlmode.FAIRMQ { + if cmd.ControlMode == controlmode.FAIRMQ || + cmd.ControlMode == controlmode.KUBECTL_FAIRMQ { cmd.Arguments = append(cmd.Arguments, "--control-port", strconv.FormatUint(controlPort, 10)) } diff --git a/core/task/task.go b/core/task/task.go index ba3c459e5..bee19a1b5 100644 --- a/core/task/task.go +++ b/core/task/task.go @@ -286,7 +286,9 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) { if class.Control.Mode == controlmode.BASIC || class.Control.Mode == controlmode.HOOK || class.Control.Mode == controlmode.DIRECT || - class.Control.Mode == controlmode.FAIRMQ { + class.Control.Mode == controlmode.FAIRMQ || + class.Control.Mode == controlmode.KUBECTL_DIRECT || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { var varStack map[string]string // First we get the full varStack from the parent role, and @@ -393,7 +395,8 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) { } } - if class.Control.Mode == controlmode.FAIRMQ { + if class.Control.Mode == controlmode.FAIRMQ || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { // FIXME read this from configuration // if the task class doesn't provide an id, we generate one ourselves if !utils.StringSliceContains(cmd.Arguments, "--id") { @@ -635,7 +638,9 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand // For FAIRMQ tasks, we append FairMQ channel configuration if class.Control.Mode == controlmode.FAIRMQ || - class.Control.Mode == controlmode.DIRECT { + class.Control.Mode == controlmode.DIRECT || + class.Control.Mode == controlmode.KUBECTL_DIRECT || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { for _, inbCh := range channel.MergeInbound(parent.CollectInboundChannels(), class.Bind) { // We get the FairMQ-formatted propertyMap from the inbound channel spec var chanProps controlcommands.PropertyMap diff --git a/core/task/taskclass/class.go b/core/task/taskclass/class.go index 902741c6c..4a4e6329d 100644 --- a/core/task/taskclass/class.go +++ b/core/task/taskclass/class.go @@ -123,7 +123,6 @@ func (c *Class) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { } } return - } func (c *Class) MarshalYAML() (interface{}, error) { @@ -154,13 +153,17 @@ func (c *Class) MarshalYAML() (interface{}, error) { Command: c.Command, } - if c.Control.Mode == controlmode.FAIRMQ { - aux.Control.Mode = "fairmq" - } else if c.Control.Mode == controlmode.BASIC { - aux.Control.Mode = "basic" - } else { - aux.Control.Mode = "direct" - } + // if c.Control.Mode == controlmode.FAIRMQ { + // aux.Control.Mode = "fairmq" + // } else if c.Control.Mode == controlmode.BASIC { + // aux.Control.Mode = "basic" + // } else if c.Control.Mode == controlmode.KUBECTL { + // aux.Control.Mode = "kubectl" + // } else { + // aux.Control.Mode = "direct" + // } + + aux.Control.Mode = c.Control.Mode.String() return aux, nil } diff --git a/docs/kubernetes_ecs.md b/docs/kubernetes_ecs.md new file mode 100644 index 000000000..ce698bdcf --- /dev/null +++ b/docs/kubernetes_ecs.md @@ -0,0 +1,70 @@ +# ECS with Kubernetes + +> ⚠️ **Warning** +> All Kubernetes work done is in a stage of prototype. + +## Kubernetes Cluster + +While prototyping we used many Kubernetes clusters, namely [`kind`](https://kind.sigs.k8s.io/), [`minikube`](https://minikube.sigs.k8s.io/docs/) and [`k3s`](https://k3s.io/) +in both local and remote cluster deployment. We used Openstack for remote deployment. +Follow the guides at the individual distributions in order to create the desired cluster setup. +For now we chose `k3s` for most of the activities performed because it is lightweight +and easily installed distribution which is also [`CNCF`](https://www.cncf.io/training/certification/) certified. + +All settings of `k3s` were used as default except one: locked-in-memory size. Use `ulimit -l` to learn +what is the limit for the current user and `LimitMEMLOCK` inside the k3s systemd service config +to set it for correct value. Right now the `flp` user has unlimited size (`LimitMEMLOCK=infinity`). +This config is necessary because even if you are running PODs with the privileged security context +under user flp, Kubernetes still sets limits according to its internal settings and doesn't +respect linux settings. + +Another setup we expect at this moment to be present at the target nodes +is ability to run PODs with privileged permissions and also under user `flp`. +This means that the machine has to have `flp` user setup the same way as +if you would do the installation with [`o2-flp-setup`](https://alice-flp.docs.cern.ch/Operations/Experts/system-configuration/utils/o2-flp-setup/). + +## Running tasks (`KubectlTask`) + +ECS is setup to run tasks through Mesos on all required hosts baremetal with active +task management (see [`ControllableTask`](/executor/executable/controllabletask.go)) +and OCC gRPC communication. When running docker task through ECS we could easily +wrap command to be run into the docker container with proper settings +([see](/docs/running_docker.md)). This is however not possible for Kubernetes +workloads as the PODs are "hidden" inside the cluster. So we plan +to deploy our own Task Controller which will connect to and guide +OCC state machine of required tasks. Thus we need to create custom +POC way to communicate with Kubernetes cluster from Mesos executor. + +The reason why we don't call Kubernetes cluster directly from ECS core +is that ECS does a lot of heavy lifting while deploying workloads, +monitoring workloads and by generating a lot of configuration which +is not trivial to replicate manually. However, if we create some class +that would be able to deploy one task into the Kubernetes and monitor its +state we could replicate `ControllableTask` workflow and leave ECS +mostly intact for now, save a lot of work and focus on prototyping +Kubernetes operator pattern. + +Thus [`KubectlTask`](/executor/executable/kubectltask.go) was created. This class +is written as a wrapper around `kubectl` utility to manage Kubernetes cluster. +It is based on following `kubectl` commands: + +* `apply` => `kubectl apply -f manifest.yaml` - deploys resource described inside given manifest +* `delete` => `kubectl delete -f manifest.yaml` - deletes resource from cluster +* `patch` => `kubectl patch -f exampletask.yaml --type='json' -p='[{"op": "replace", "path": "/spec/state", "value": "running"}]` - changes the state of resource inside cluster +* `get` => `kubectl get -f manifest.yaml -o jsonpath='{.spec.state}'` - queries exact field of resource (`state` in the example) inside cluster. + +These four commands allow us to deploy and monitor status of the deployed +resource without necessity to interact with it directly. However `KubectlTask` +expects that resource is the CRD [Task](/control-operator/api/v1alpha1/task_types.go). + +In order to activate `KubectlTask` you need to change yaml template +inside the `ControlWorkflows` directory. Namely: + +* add path to the kubectl manifest as the first argument in `.command.arguments` field +* change `.control.mode` to either `kubectl_direct` or `kubectl_fairmq` +You can find working template inside `control-operator/ecs-manifests/control-workflows/*_kube.yaml` + +Working kubectl manifests are to be found in `control-operator/ecs-manifests/kubernetes-manifests`. +You can see `*test.yaml` for concrete deployable manifests by `kubectl apply`, the rest +are the templates with variables to be filled in in a `${var}` format. `KubectlTask` +fills these variables from env vars. diff --git a/executor/executable/kubectltask.go b/executor/executable/kubectltask.go new file mode 100644 index 000000000..4a7112595 --- /dev/null +++ b/executor/executable/kubectltask.go @@ -0,0 +1,395 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018-2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package executable + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "os/user" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/AliceO2Group/Control/core/controlcommands" + "github.com/AliceO2Group/Control/executor/executorcmd" + mesos "github.com/mesos/mesos-go/api/v1/lib" + "github.com/sirupsen/logrus" +) + +const ( + KUBECTL string = "kubectl" + + APPLY string = "apply" + DELETE string = "delete" + PATCH string = "patch" + GET string = "get" + TASK string = "task" + CREATE string = "create" + + // TRANSITION_TIMEOUT = 10 * time.Second // inside controllable task +) + +type KubectlTask struct { + taskBase + rpc *executorcmd.RpcClient + configYaml string + running bool + latestStatus atomic.Value +} + +func GetUserInfo(username string) (uid, gid int64, supplemental []int64, err error) { + u, err := user.Lookup(username) + if err != nil { + return 0, 0, nil, err + } + + // Convert UID + uidInt, _ := strconv.ParseInt(u.Uid, 10, 64) + + // Convert Primary GID + gidInt, _ := strconv.ParseInt(u.Gid, 10, 64) + + // Get Supplemental Groups (e.g., wheel, pda) + groupStrings, _ := u.GroupIds() + var supplementalInts []int64 + for _, g := range groupStrings { + gInt, _ := strconv.ParseInt(g, 10, 64) + // Avoid adding the primary GID to the supplemental list + if gInt != gidInt { + supplementalInts = append(supplementalInts, gInt) + } + } + + return uidInt, gidInt, supplementalInts, nil +} + +func (task *KubectlTask) Launch() error { + if len(task.Tci.Arguments) == 0 { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }). + Error("no arguments in kubectl task. We need to have at least manifest location as the last argument") + return errors.New("no arguments for kubectl task. Location for kubernetes manifest needed") + } + + task.configYaml = task.Tci.Arguments[0] + + // Read the template file + content, err := os.ReadFile(task.configYaml) + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "file": task.configYaml, + }).WithError(err).Error("failed to read kubectl config file") + return err + } + + // Set the AliECS environment variables in the local process + // so os.ExpandEnv can find them + for _, envVar := range task.Tci.Env { + parts := strings.SplitN(envVar, "=", 2) + if len(parts) == 2 { + os.Setenv(parts[0], parts[1]) + } + } + + // Set arguments into the KUBE_ARGUMENTS os env leaving the kubemanifest file + arguments := task.Tci.Arguments[1:] + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "args": arguments, + }).Info("setting arguments as a KUBE_ARGUMENTS env var") + + os.Setenv("KUBE_ARGUMENTS", strings.Join(arguments, " ")) + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": *task.Tci.Value, + }).Info("setting command as a KUBE_COMMAND env var") + os.Setenv("KUBE_COMMAND", *task.Tci.Value) + + if uid, gid, supplementalIds, err := GetUserInfo("flp"); err == nil { + os.Setenv("FLP_UID", strconv.FormatInt(uid, 10)) + os.Setenv("FLP_GID", strconv.FormatInt(gid, 10)) + + var strIds []string + for _, id := range supplementalIds { + strIds = append(strIds, strconv.FormatInt(id, 10)) + } + supplementalString := "[" + strings.Join(strIds, ", ") + "]" + + os.Setenv("FLP_SUPPLEMENTAL_GORUPS", supplementalString) + } else { + log.Error("we cannot run kubectl task as flp user because we didn't find user details") + } + + // Replace ${VAR} placeholders with actual values + expandedYaml := os.ExpandEnv(string(content)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Apply via Stdin (-) + command := exec.CommandContext(ctx, KUBECTL, APPLY, "-f", "-") + command.Stdin = strings.NewReader(expandedYaml) + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("Starting kubectl apply via Stdin") + + var stdoutBuf bytes.Buffer + var stderrBuf bytes.Buffer + + command.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + command.Stderr = io.MultiWriter(os.Stderr, &stderrBuf) + + err = command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Errorf("kubectl apply failed stderr: %s , stdin: %s", stderrBuf.String(), stdoutBuf.String()) + return err + } + + task.latestStatus.Store("") + task.running = true + go task.eventLoop() + return nil +} + +func (task *KubectlTask) Kill() error { + task.running = false + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + command := exec.CommandContext(ctx, KUBECTL, DELETE, "-f", task.configYaml) + + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + err := command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Error("kubectl delete failed") + return err + } + + task.sendStatus(task.knownEnvironmentId, mesos.TASK_FINISHED, "") + + return nil +} + +func (task *KubectlTask) Transition(transition *executorcmd.ExecutorCommand_Transition) *controlcommands.MesosCommandResponse_Transition { + // kubectl patch -f exampletask.yaml --type='json' -p='[{"op": "replace", "path": "/spec/state", "value": "running"}]' + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Extract the transition arguments (the 'Mix') to pipe them to Kubernetes + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "args": transition.Arguments, + }).Info("Patching transition arguments to Kubernetes") + + argsJSON, err := json.Marshal(transition.Arguments) + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Error("failed to marshal transition arguments") + return transition.PrepareResponse(err, transition.Source, task.ti.TaskID.Value) + } + + transitionJSON := fmt.Sprintf(`[ + {"op": "add", "path": "/spec/state", "value": "%s"}, + {"op": "add", "path": "/spec/arguments", "value": %s} + ]`, strings.ToLower(transition.Destination), string(argsJSON)) + + command := exec.CommandContext(ctx, KUBECTL, PATCH, "-f", task.configYaml, "--type=json", "-p", transitionJSON) + + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("Starting kubectl patch") + + statusBeforeTransition := task.latestStatus.Load().(string) + + err = command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).WithError(err).Error("kubectl patch failed") + return transition.PrepareResponse(err, transition.Source, task.ti.TaskID.Value) + } + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("kubectl patch suceeded, waiting for actual status change") + actualStatus := "" + timeout := time.After(TRANSITION_TIMEOUT) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + +loop: + for { + select { + case <-ticker.C: + actualStatus = task.latestStatus.Load().(string) + if actualStatus != statusBeforeTransition { + break loop + } + case <-timeout: + return transition.PrepareResponse(errors.New("timeout waiting for status change"), statusBeforeTransition, task.ti.TaskID.Value) + } + } + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Infof("status changed from %s to %s", statusBeforeTransition, actualStatus) + + // TODO: I am not sure what PID should I put here + return transition.PrepareResponse(nil, actualStatus, task.ti.TaskID.Value) +} + +func (task *KubectlTask) getTaskStatus() (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + // command := exec.CommandContext(ctx, KUBECTL, GET, TASK, task.ti.Name, "-o", "jsonpath={.status.state}") + command := exec.CommandContext(ctx, KUBECTL, GET, "-f", task.configYaml, "-o", "jsonpath={.status.state}") + + var stdoutBuf bytes.Buffer + + command.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + command.Stderr = os.Stderr + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Debug("Starting kubectl get task") + + err := command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).WithError(err).Error("kubectl get task failed") + return "", err + } + + // no newlines + return strings.TrimSpace(stdoutBuf.String()), nil +} + +func (task *KubectlTask) eventLoop() { + errorCount := 0 + maxErrors := 5 + for task.running { + time.Sleep(5 * time.Second) + status, err := task.getTaskStatus() + if err != nil { + errorCount += 1 + if errorCount < maxErrors { + log.WithError(err).Warnf("failed to get Task Status, retrying %d/%d", errorCount, maxErrors) + continue + } + log.WithError(err).Errorf("failed to get Task Status, sending TASK_FAILED and breaking from the eventLoop") + task.sendStatus(task.knownEnvironmentId, mesos.TASK_FAILED, "couldn't get task status via kubectl") + task.running = false + // TODO: remove when debugging done + // _ = task.Kill() + break + } + + status = strings.ToUpper(status) + + if task.latestStatus.Load().(string) == status { + continue + } + task.latestStatus.Store(status) + + var state mesos.TaskState + switch status { + case "CONFIGURED", "RUNNING", "STANDBY": + state = mesos.TASK_RUNNING + + case "ERROR": + state = mesos.TASK_FAILED + log.WithError(err).Error("Received error from kubectl task, terminating everything and sending update") + task.running = false + // TODO: remove when debugging done + // _ = task.Kill() + // + + default: + log.Errorf("received different status than expected: %s", status) + continue + } + + log.Debugf("sending new status from kubectl task %s", status) + task.sendStatus(task.knownEnvironmentId, state, "") + + } +} + +func (task *KubectlTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) { + cmd = new(executorcmd.ExecutorCommand_Transition) + if task.rpc != nil { + cmd.Transitioner = task.rpc.Transitioner + } + err = json.Unmarshal(data, cmd) + if err != nil { + cmd = nil + } + return +} diff --git a/executor/executable/kubectltask_test.go b/executor/executable/kubectltask_test.go new file mode 100644 index 000000000..6dbb6d300 --- /dev/null +++ b/executor/executable/kubectltask_test.go @@ -0,0 +1,80 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018-2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package executable + +import ( + "testing" + "time" + + "github.com/AliceO2Group/Control/common" + "github.com/AliceO2Group/Control/executor/executorcmd" + mesos "github.com/mesos/mesos-go/api/v1/lib" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("kubectl task test", func() { + task := KubectlTask{} + task.Tci = &common.TaskCommandInfo{} + task.Tci.Arguments = []string{"exampletask.yaml"} + task.configYaml = "exampletask.yaml" + task.ti = &mesos.TaskInfo{Name: "exampletask"} + When("starting and stoping the task", func() { + It("should start and stop accordingly", func() { + err := task.Launch() + Expect(err).NotTo(HaveOccurred()) + + // just so we can see monitoring tools show something + time.Sleep(2 * time.Second) + + err = task.Kill() + Expect(err).NotTo(HaveOccurred()) + }) + }) + + When("transitioning to running", func() { + It("should transition", func() { + task.configYaml = "exampletask.yaml" + transitionMsg := &executorcmd.ExecutorCommand_Transition{} + transitionMsg.Destination = "running" + result := task.Transition(transitionMsg) + Expect(result.ErrorString).To(BeEmpty()) + }) + }) + + // this test expects there is a task with a status on cluster. + When("geting current status", func() { + It("should get proper status", func() { + status, err := task.getTaskStatus() + Expect(err).NotTo(HaveOccurred()) + Expect(status).To(Equal("standby")) + }) + }) +}) + +func TestKubectlTask(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "KubectlTask suite") +} diff --git a/executor/executable/task.go b/executor/executable/task.go index e23b56394..abb92b14e 100644 --- a/executor/executable/task.go +++ b/executor/executable/task.go @@ -58,9 +58,11 @@ const ( var log = logger.New(logrus.StandardLogger(), "executor") -type SendStatusFunc func(envId uid.ID, state mesos.TaskState, message string) -type SendDeviceEventFunc func(envId uid.ID, event event.DeviceEvent) -type SendMessageFunc func(message []byte) +type ( + SendStatusFunc func(envId uid.ID, state mesos.TaskState, message string) + SendDeviceEventFunc func(envId uid.ID, event event.DeviceEvent) + SendMessageFunc func(message []byte) +) type Task interface { Launch() error @@ -90,6 +92,7 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE log.WithField("json", string(tciData[:])). Trace("received TaskCommandInfo") + log.WithField("findme", "here").Info(string(tciData)) if err := json.Unmarshal(tciData, &commandInfo); tciData != nil && err == nil { log.WithFields(logrus.Fields{ "shell": *commandInfo.Shell, @@ -177,6 +180,21 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE }, rpc: nil, } + case controlmode.KUBECTL_DIRECT: + fallthrough + case controlmode.KUBECTL_FAIRMQ: + newTask = &KubectlTask{ + taskBase: taskBase{ + ti: &taskInfo, + Tci: &commandInfo, + sendStatus: sendStatusFunc, + sendDeviceEvent: sendDeviceEventFunc, + sendMessage: sendMessageFunc, + knownEnvironmentId: envId, + knownDetector: detector, + }, + rpc: nil, + } } return newTask