diff --git a/cmd/peanut/main.go b/cmd/peanut/main.go index 59ce9efc2..87b2215a5 100644 --- a/cmd/peanut/main.go +++ b/cmd/peanut/main.go @@ -25,17 +25,46 @@ package main import ( + "flag" + "fmt" "os" - "strings" "github.com/AliceO2Group/Control/occ/peanut" ) func main() { - cmdArg := os.Args[1:] - cmdString := strings.Trim(strings.Join(cmdArg, " "), " ") + fs := flag.NewFlagSet("peanut", flag.ExitOnError) + addr := fs.String("addr", "", "OCC gRPC address (host:port); if empty, OCC_CONTROL_PORT env var is used in direct mode") + mode := fs.String("mode", "direct", "control mode: direct (default), fmq, or fmq-step") + fs.Usage = func() { + fmt.Fprint(os.Stderr, `peanut — process execution and control utility for OCC / FairMQ processes - if err := peanut.Run(cmdString); err != nil { - panic(err) +TUI mode (interactive, launched when no command is given): + OCC_CONTROL_PORT= peanut + peanut -addr host:port -mode fmq + +CLI mode (non-interactive, launched when a command is given): + peanut [flags] [args] + Run "peanut -addr x get-state" for full CLI usage. + +Flags: +`) + fs.PrintDefaults() + } + _ = fs.Parse(os.Args[1:]) + + if fs.NArg() > 0 { + // CLI mode — pass all original args so RunCLI can re-parse its own flags + if err := peanut.RunCLI(os.Args[1:]); err != nil { + fmt.Fprintf(os.Stderr, "peanut: %v\n", err) + os.Exit(1) + } + return + } + + // TUI mode + if err := peanut.Run(peanut.Options{Addr: *addr, Mode: *mode}); err != nil { + fmt.Fprintf(os.Stderr, "peanut: %v\n", err) + os.Exit(1) } } diff --git a/occ/peanut/README.md b/occ/peanut/README.md index 61f72815b..c405d0901 100644 --- a/occ/peanut/README.md +++ b/occ/peanut/README.md @@ -1,42 +1,227 @@ # Process control and execution utility overview `peanut` is the **p**rocess **e**xecution **a**nd co**n**trol **ut**ility for OCClib-based O² processes. Its purpose -is to be a debugging and development aid for non-FairMQ O² devices, where FairMQ's interactive -controller is not available. +is to be a debugging and development aid for OCC-based and FairMQ O² devices. In aliBuild it is part of the `coconut` package. -`peanut` can connect to a running OCClib-based process, query its status, drive its state machine +`peanut` can connect to a running OCClib-based or FairMQ process, query its status, drive its state machine and push runtime configuration data. -`peanut` is an interactive tool, the only information it picks up from its environment is the -`OCC_CONTROL_PORT` variable, which is used to connect to a running OCClib-based process. +`peanut` runs in two modes depending on whether a command is passed: + +* **TUI mode** — interactive terminal UI (launched when no command is given) +* **CLI mode** — non-interactive, scriptable (launched when a command is given) + +--- + +## TUI mode + +![Screenshot of peanut](peanut.png) ```bash -$ OCC_CONTROL_PORT= peanut +peanut [flags] ``` -![Screenshot of peanut](peanut.png) +| Flag | Default | Description | +|------|---------|-------------| +| `-addr` | `""` | gRPC address `host:port`; if empty, falls back to `OCC_CONTROL_PORT` env var (direct mode only) | +| `-mode` | `direct` | `direct`, `fmq`, or `fmq-step` (see below) | + +### Modes + +#### `direct` — OCC protobuf (default) + +Connects to an OCClib-based process using the standard OCC protobuf codec. +The state machine operates on OCC states: `STANDBY`, `CONFIGURED`, `RUNNING`, `ERROR`. + +```bash +OCC_CONTROL_PORT=47100 peanut +# or +peanut -addr localhost:47100 -mode direct +``` + +Control buttons: **CONFIGURE**, **RESET**, **START**, **STOP**, **RECOVER**, **EXIT** + +#### `fmq` — FairMQ JSON codec with automatic multi-step sequencing + +Connects to a FairMQ device using the JSON codec. Each OCC-level button press +automatically drives the full underlying FairMQ state machine sequence. +The state is displayed as an OCC-mapped state (`STANDBY`, `CONFIGURED`, `RUNNING`…). + +```bash +peanut -addr localhost:47100 -mode fmq +``` + +Control buttons: **CONFIGURE**, **RESET**, **START**, **STOP**, **RECOVER**, **EXIT** + +Sequences driven automatically: -`peanut` commands are documented inline. Each transition is applied immediately and -the state is updated in real time. +| Button | FairMQ steps | +|--------|-------------| +| CONFIGURE | INIT DEVICE → COMPLETE INIT → BIND → CONNECT → INIT TASK | +| RESET | RESET TASK → RESET DEVICE | +| START | RUN | +| STOP | STOP | +| RECOVER | RESET DEVICE (from ERROR) | +| EXIT | RESET (if needed) → END | -Compared to the raw gRPC API, the following limitations apply: +#### `fmq-step` — FairMQ JSON codec with granular per-step control -* It is not possible to perform a `GO_ERROR` transition, as this transition is only triggered from -user code. +Connects to a FairMQ device using the JSON codec. Exposes each individual FairMQ +state machine step as a separate button. The state is displayed as the raw FairMQ state. -* The `CONFIGURE` transition may be triggered both with and without runtime configuration data, which -may or may not be suitable depending on user code. All other transitions send no payload. +```bash +peanut -addr localhost:47100 -mode fmq-step +``` + +| Key | Button | Transition | +|-----|--------|-----------| +| `1` | INIT DEVICE | IDLE → INITIALIZING DEVICE | +| `2` | COMPLETE INIT | INITIALIZING DEVICE → INITIALIZED | +| `3` | BIND | INITIALIZED → BOUND | +| `4` | CONNECT | BOUND → DEVICE READY | +| `5` | INIT TASK | DEVICE READY → READY | +| `6` | RUN | READY → RUNNING | +| `7` | STOP | RUNNING → READY | +| `8` | RESET TASK | READY → DEVICE READY | +| `9` | RESET DEVICE | → IDLE | +| `0` | END | IDLE → EXITING | + +### Common TUI controls (all modes) + +| Key | Action | +|-----|--------| +| `n` | **Reconnect** — re-establish the gRPC connection to the controlled process. Use this when the process has been restarted after a crash or deliberate termination. | +| `l` | **Load configuration** — open a file dialog to read a YAML or JSON configuration file. The path field supports tab-completion. Once loaded, the right panel shows `NOT PUSHED` until the next CONFIGURE transition, then `PUSHED`. | +| `q` | **Quit** — disconnect and exit without sending any transitions. | + +### Connection monitoring + +While connected, peanut passively monitors the gRPC connection in a background goroutine and detects process termination without any button press. The strategy depends on what the controlled process supports: + +1. **StateStream** (OCClib processes, `direct` mode) — subscribes to the state stream; any disconnect immediately triggers `UNREACHABLE` and an error modal. State updates from the stream are also reflected in the display in real time. +2. **EventStream** (FairMQ processes, `fmq`/`fmq-step` modes) — subscribes to the event stream; disconnect is detected immediately when the stream breaks. +3. **Polling** (fallback) — if neither stream is available, `GetState` is polled every 2 seconds. + +When the process dies, the state display shows `UNREACHABLE` and an error modal appears. After restarting the controlled process, press `n` to reconnect. + +### Runtime configuration files + +Configuration files are YAML or JSON, with arbitrarily nested structure. +`peanut` flattens them to dot-notation key=value pairs before pushing. +Integer map keys and integer values are both handled correctly. + +Example (channel configuration): + +```yaml +chans: + data: + numSockets: 1 + 0: + address: ipc://@o2ipc-example + method: bind + type: push + transport: shmem + sndBufSize: 1000 + rcvBufSize: 1000 + sndKernelSize: 0 + rcvKernelSize: 0 + rateLogging: 0 +``` + +This flattens to entries like `chans.data.0.address=ipc://@o2ipc-example`. + +--- + +## CLI mode + +```bash +peanut [flags] [args] +``` + +| Flag | Default | Description | +|------|---------|-------------| +| `-addr` | `localhost:47100` | gRPC address `host:port` | +| `-mode` | `fmq` | `fmq` (JSON codec) or `direct` (protobuf) | +| `-timeout` | `30s` | timeout for unary gRPC calls | +| `-config` | `""` | path to YAML/JSON file; flattened key=value pairs are sent as arguments. Inline `key=val` arguments take precedence. | + +### Commands + +#### `get-state` + +Print the current FSM state. + +```bash +peanut -addr localhost:47100 get-state +``` + +#### `transition [key=val ...]` + +High-level state transition. In `fmq` mode drives the full multi-step FairMQ sequence automatically. + +```bash +# FairMQ: drive full configure sequence +peanut -addr localhost:47100 -mode fmq transition STANDBY CONFIGURED \ + chans.data.0.address=ipc://@o2ipc-example + +# FairMQ: with config file +peanut -addr localhost:47100 -mode fmq -config stfsender-configure-args.yaml \ + transition STANDBY CONFIGURED + +# Direct OCC +peanut -addr localhost:47100 -mode direct transition STANDBY CONFIGURED +``` + +FairMQ sequences driven automatically: + +| From → To | Steps | +|-----------|-------| +| `STANDBY → CONFIGURED` | INIT DEVICE, COMPLETE INIT, BIND, CONNECT, INIT TASK | +| `CONFIGURED → RUNNING` | RUN | +| `RUNNING → CONFIGURED` | STOP | +| `CONFIGURED → STANDBY` | RESET TASK, RESET DEVICE | + +#### `direct-step [key=val ...]` + +Low-level: send a single raw OCC gRPC Transition call (protobuf codec). + +```bash +peanut -addr localhost:47100 -mode direct direct-step STANDBY CONFIGURE key=val +``` + +Events: `CONFIGURE`, `RESET`, `START`, `STOP`, `RECOVER`, `EXIT` + +#### `fmq-step [key=val ...]` + +Low-level: send a single raw FairMQ gRPC Transition call (JSON codec). +State/event names that contain spaces must be quoted. + +```bash +peanut -addr localhost:47100 fmq-step IDLE "INIT DEVICE" chans.x.0.address=ipc://@foo +peanut -addr localhost:47100 fmq-step READY RUN +``` + +#### `state-stream` + +Subscribe to `StateStream` and print state updates until interrupted (Ctrl-C). + +```bash +peanut -addr localhost:47100 state-stream +``` + +#### `event-stream` + +Subscribe to `EventStream` and print events until interrupted (Ctrl-C). + +```bash +peanut -addr localhost:47100 event-stream +``` -The last two commands are **not** transitions: +--- -* `Load configuration` allows the user to read in a JSON or YAML file containing sample -configuration data that is then available to be pushed to the controlled process during a future -`CONFIGURE` transition. On startup, there is no file loaded, so a `CONFIGURE` transition will push -an empty payload. Once a runtime configuration file is loaded, its title bar reports `NOT PUSHED` -until the next `CONFIGURE` transition, at which point it becomes `PUSHED`. +## Limitations -* `Quit` disconnects from the controlled process and quits `peanut`, but it performs no transitions -or other data exchange with the controlled process. A future instance of `peanut` may reattach itself -to the same process and continue from there. +* The `GO_ERROR` transition cannot be triggered from `peanut`, as it is only triggered from user code inside the controlled process. +* `Quit` / `q` disconnects without sending any transition. A future instance of `peanut` can reattach to the same process and continue. diff --git a/occ/peanut/cli.go b/occ/peanut/cli.go new file mode 100644 index 000000000..49ad9c160 --- /dev/null +++ b/occ/peanut/cli.go @@ -0,0 +1,416 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2024 CERN and copyright holders of ALICE O². + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package peanut + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/AliceO2Group/Control/executor/executorcmd/nopb" + "github.com/AliceO2Group/Control/executor/executorcmd/transitioner/fairmq" + "github.com/AliceO2Group/Control/executor/protos" + "github.com/AliceO2Group/Control/occ/peanut/flatten" +) + +// RunCLI runs peanut in non-interactive CLI mode. +// args should be os.Args[1:]. +func RunCLI(args []string) error { + fs := flag.NewFlagSet("peanut", flag.ExitOnError) + addr := fs.String("addr", "localhost:47100", "OCC gRPC address (host:port)") + mode := fs.String("mode", "fmq", "control mode: fmq (json codec, default) or direct (protobuf)") + timeout := fs.Duration("timeout", 30*time.Second, "request timeout for unary calls") + configFile := fs.String("config", "", "path to YAML/JSON file whose flattened key=value pairs are sent as arguments (inline key=val args take precedence)") + fs.Usage = cliUsage + _ = fs.Parse(args) + + cmds := fs.Args() + if len(cmds) == 0 { + cliUsage() + return fmt.Errorf("no command specified") + } + + conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) //nolint:staticcheck + if err != nil { + return fmt.Errorf("dial %s: %w", *addr, err) + } + defer conn.Close() + + var client pb.OccClient + if *mode == "fmq" { + client = nopb.NewOccClient(conn) + } else { + client = pb.NewOccClient(conn) + } + + loadedKVs, err := cliLoadConfig(*configFile) + if err != nil { + return fmt.Errorf("config file: %w", err) + } + + switch cmds[0] { + case "get-state": + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + reply, err := client.GetState(ctx, &pb.GetStateRequest{}) + if err != nil { + return fmt.Errorf("GetState: %w", err) + } + fmt.Println(reply.GetState()) + + case "transition": + if len(cmds) < 3 { + return fmt.Errorf("usage: transition [key=val ...]") + } + from := strings.ToUpper(cmds[1]) + to := strings.ToUpper(cmds[2]) + kvs := cliMergeKVs(loadedKVs, cliParseKVs(cmds[3:])) + + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + + if *mode == "fmq" { + result, err := cliFMQTransition(ctx, client, from, to, kvs) + if err != nil { + return fmt.Errorf("transition %s→%s: %w", from, to, err) + } + fmt.Printf("ok state=%s\n", result) + } else { + event := cliOCCEventForTransition(from, to) + reply, err := client.Transition(ctx, &pb.TransitionRequest{ + SrcState: from, + TransitionEvent: event, + Arguments: cliKVsToEntries(kvs), + }) + if err != nil { + return fmt.Errorf("Transition: %w", err) + } + fmt.Printf("ok state=%s trigger=%s\n", reply.GetState(), reply.GetTrigger()) + } + + case "direct-step": + // Low-level single OCC gRPC call. Mirrors what the TUI does. + // Usage: direct-step [key=val ...] + if len(cmds) < 3 { + return fmt.Errorf("usage: direct-step [key=val ...]\n e.g. direct-step STANDBY CONFIGURE key=val") + } + kvs := cliMergeKVs(loadedKVs, cliParseKVs(cmds[3:])) + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + reply, err := client.Transition(ctx, &pb.TransitionRequest{ + SrcState: cmds[1], + TransitionEvent: cmds[2], + Arguments: cliKVsToEntries(kvs), + }) + if err != nil { + return fmt.Errorf("occ-step: %w", err) + } + fmt.Printf("ok state=%s trigger=%s\n", reply.GetState(), reply.GetTrigger()) + + case "fmq-step": + if len(cmds) < 3 { + return fmt.Errorf("usage: fmq-step [key=val ...]\n e.g. fmq-step IDLE \"INIT DEVICE\" key=val") + } + kvs := cliMergeKVs(loadedKVs, cliParseKVs(cmds[3:])) + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + state, err := cliFMQDoStep(ctx, client, cmds[1], cmds[2], kvs) + if err != nil { + return fmt.Errorf("fmq-step: %w", err) + } + fmt.Printf("ok fmq-state=%s occ-state=%s\n", state, cliFMQToOCCState(state)) + + case "state-stream": + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + stream, err := client.StateStream(ctx, &pb.StateStreamRequest{}) + if err != nil { + return fmt.Errorf("StateStream: %w", err) + } + fmt.Fprintf(os.Stderr, "streaming state updates from %s (ctrl-c to stop)\n", *addr) + for { + msg, err := stream.Recv() + if err != nil { + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("StateStream recv: %w", err) + } + fmt.Printf("type=%-12s state=%s\n", msg.GetType(), msg.GetState()) + } + + case "event-stream": + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + stream, err := client.EventStream(ctx, &pb.EventStreamRequest{}) + if err != nil { + return fmt.Errorf("EventStream: %w", err) + } + fmt.Fprintf(os.Stderr, "streaming events from %s (ctrl-c to stop)\n", *addr) + for { + msg, err := stream.Recv() + if err != nil { + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("EventStream recv: %w", err) + } + fmt.Printf("event=%s\n", msg.GetEvent()) + } + + default: + return fmt.Errorf("unknown command %q — valid: get-state, transition, direct-step, fmq-step, state-stream, event-stream", cmds[0]) + } + return nil +} + +// cliFMQStateToOCC maps FairMQ device states to OCC states. +var cliFMQStateToOCC = map[string]string{ + fairmq.IDLE: "STANDBY", + fairmq.INITIALIZING_DEVICE: "STANDBY", + fairmq.INITIALIZED: "STANDBY", + fairmq.BOUND: "STANDBY", + fairmq.DEVICE_READY: "STANDBY", + fairmq.READY: "CONFIGURED", + fairmq.RUNNING: "RUNNING", + fairmq.ERROR: "ERROR", +} + +func cliFMQToOCCState(fmqState string) string { + if s, ok := cliFMQStateToOCC[strings.ToUpper(fmqState)]; ok { + return s + } + return "STANDBY" +} + +// fmqDoStep performs a single raw FairMQ gRPC Transition call with no output. +func fmqDoStep(ctx context.Context, client pb.OccClient, srcFMQ, event string, kvs map[string]string) (string, error) { + reply, err := client.Transition(ctx, &pb.TransitionRequest{ + SrcState: srcFMQ, + TransitionEvent: event, + Arguments: cliKVsToEntries(kvs), + }) + if err != nil { + return "", fmt.Errorf("step %q failed: %w", event, err) + } + state := reply.GetState() + if !reply.GetOk() { + return state, fmt.Errorf("step %q not ok, state=%s", event, state) + } + return state, nil +} + +// cliFMQDoStep wraps fmqDoStep with stderr progress output for CLI use. +func cliFMQDoStep(ctx context.Context, client pb.OccClient, srcFMQ, event string, kvs map[string]string) (string, error) { + fmt.Fprintf(os.Stderr, " fmq-step src=%-20q event=%q\n", srcFMQ, event) + state, err := fmqDoStep(ctx, client, srcFMQ, event, kvs) + ok := err == nil + fmt.Fprintf(os.Stderr, " result=%-20q ok=%v\n", state, ok) + return state, err +} + +func cliFMQTransition(ctx context.Context, client pb.OccClient, from, to string, kvs map[string]string) (string, error) { + switch { + case from == "STANDBY" && to == "CONFIGURED": + return cliFMQConfigure(ctx, client, kvs) + case from == "CONFIGURED" && to == "STANDBY": + return cliFMQReset(ctx, client, kvs) + case from == "CONFIGURED" && to == "RUNNING": + state, err := cliFMQDoStep(ctx, client, fairmq.READY, fairmq.EvtRUN, kvs) + return cliFMQToOCCState(state), err + case from == "RUNNING" && to == "CONFIGURED": + state, err := cliFMQDoStep(ctx, client, fairmq.RUNNING, fairmq.EvtSTOP, kvs) + return cliFMQToOCCState(state), err + default: + return from, fmt.Errorf("unsupported FairMQ transition %s → %s", from, to) + } +} + +func cliFMQConfigure(ctx context.Context, client pb.OccClient, args map[string]string) (string, error) { + state, err := cliFMQDoStep(ctx, client, fairmq.IDLE, fairmq.EvtINIT_DEVICE, args) + if err != nil || state != fairmq.INITIALIZING_DEVICE { + return cliFMQToOCCState(state), fmt.Errorf("INIT DEVICE: expected %q got %q: %w", fairmq.INITIALIZING_DEVICE, state, err) + } + state, err = cliFMQDoStep(ctx, client, fairmq.INITIALIZING_DEVICE, fairmq.EvtCOMPLETE_INIT, nil) + if err != nil || state != fairmq.INITIALIZED { + return cliFMQToOCCState(state), fmt.Errorf("COMPLETE INIT: expected %q got %q: %w", fairmq.INITIALIZED, state, err) + } + state, err = cliFMQDoStep(ctx, client, fairmq.INITIALIZED, fairmq.EvtBIND, nil) + if err != nil || state != fairmq.BOUND { + cliFMQDoStep(ctx, client, fairmq.INITIALIZED, fairmq.EvtRESET_DEVICE, nil) // rollback + return cliFMQToOCCState(state), fmt.Errorf("BIND: expected %q got %q: %w", fairmq.BOUND, state, err) + } + state, err = cliFMQDoStep(ctx, client, fairmq.BOUND, fairmq.EvtCONNECT, nil) + if err != nil || state != fairmq.DEVICE_READY { + cliFMQDoStep(ctx, client, fairmq.BOUND, fairmq.EvtRESET_DEVICE, nil) // rollback + return cliFMQToOCCState(state), fmt.Errorf("CONNECT: expected %q got %q: %w", fairmq.DEVICE_READY, state, err) + } + state, err = cliFMQDoStep(ctx, client, fairmq.DEVICE_READY, fairmq.EvtINIT_TASK, nil) + if err != nil || state != fairmq.READY { + cliFMQDoStep(ctx, client, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, nil) // rollback + return cliFMQToOCCState(state), fmt.Errorf("INIT TASK: expected %q got %q: %w", fairmq.READY, state, err) + } + return cliFMQToOCCState(state), nil +} + +func cliFMQReset(ctx context.Context, client pb.OccClient, args map[string]string) (string, error) { + state, err := cliFMQDoStep(ctx, client, fairmq.READY, fairmq.EvtRESET_TASK, nil) + if err != nil || state != fairmq.DEVICE_READY { + return cliFMQToOCCState(state), fmt.Errorf("RESET TASK: expected %q got %q: %w", fairmq.DEVICE_READY, state, err) + } + state, err = cliFMQDoStep(ctx, client, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, args) + return cliFMQToOCCState(state), err +} + +func cliOCCEventForTransition(from, to string) string { + type edge struct{ from, to string } + table := map[edge]string{ + {"STANDBY", "CONFIGURED"}: "CONFIGURE", + {"CONFIGURED", "RUNNING"}: "START", + {"RUNNING", "CONFIGURED"}: "STOP", + {"CONFIGURED", "STANDBY"}: "RESET", + {"ERROR", "STANDBY"}: "RECOVER", + } + if ev, ok := table[edge{from, to}]; ok { + return ev + } + return to +} + +func cliKVsToEntries(kvs map[string]string) []*pb.ConfigEntry { + entries := make([]*pb.ConfigEntry, 0, len(kvs)) + for k, v := range kvs { + entries = append(entries, &pb.ConfigEntry{Key: k, Value: v}) + } + return entries +} + +func cliParseKVs(args []string) map[string]string { + m := make(map[string]string, len(args)) + for _, kv := range args { + parts := strings.SplitN(kv, "=", 2) + if len(parts) == 2 { + m[parts[0]] = parts[1] + } + } + return m +} + +// cliLoadConfig reads and flattens a YAML/JSON config file into a key=value map. +// Returns an empty map (not an error) if path is empty. +func cliLoadConfig(path string) (map[string]string, error) { + if path == "" { + return map[string]string{}, nil + } + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("cannot read %q: %w", path, err) + } + flattened, err := flatten.FlattenString(string(data), "", flatten.DotStyle) + if err != nil { + return nil, fmt.Errorf("cannot flatten %q: %w", path, err) + } + var rawMap map[string]interface{} + if err := json.Unmarshal([]byte(flattened), &rawMap); err != nil { + return nil, fmt.Errorf("cannot parse flattened config: %w", err) + } + kvs := make(map[string]string, len(rawMap)) + for k, v := range rawMap { + kvs[k] = fmt.Sprintf("%v", v) + } + return kvs, nil +} + +// cliMergeKVs merges base and override maps; keys in override take precedence. +func cliMergeKVs(base, override map[string]string) map[string]string { + merged := make(map[string]string, len(base)+len(override)) + for k, v := range base { + merged[k] = v + } + for k, v := range override { + merged[k] = v + } + return merged +} + +func cliUsage() { + fmt.Fprint(os.Stderr, `peanut — process execution and control utility for OCC / FairMQ processes + +TUI mode (interactive): + OCC_CONTROL_PORT= peanut + +CLI mode (non-interactive): + peanut [flags] [args] + +CLI Flags: + -addr string gRPC address (default "localhost:47100") + -mode string fmq (json codec, default) or direct (protobuf) + -timeout duration unary call timeout (default 30s) + -config string path to YAML/JSON file with arguments to push (inline key=val args take precedence) + +CLI Commands: + get-state + Print the current FSM state. + + transition [key=val ...] + High-level OCC transition. In fairmq mode drives the full multi-step + FairMQ sequence automatically: + STANDBY→CONFIGURED runs INIT DEVICE, COMPLETE INIT, BIND, CONNECT, INIT TASK + CONFIGURED→RUNNING runs RUN + RUNNING→CONFIGURED runs STOP + CONFIGURED→STANDBY runs RESET TASK, RESET DEVICE + key=val pairs are forwarded as ConfigEntry arguments. + + direct-step [key=val ...] + Low-level: send a single raw OCC gRPC Transition call. + Mirrors exactly what the TUI does for each button press. + Events: CONFIGURE, RESET, START, STOP, RECOVER, EXIT + + fmq-step [key=val ...] + Low-level: send a single raw FairMQ gRPC Transition call. + FairMQ state/event names that contain spaces must be quoted. + + state-stream + Subscribe to StateStream; print updates until interrupted. + + event-stream + Subscribe to EventStream; print events until interrupted. + +Examples: + peanut -addr localhost:47100 get-state + peanut -addr localhost:47100 transition STANDBY CONFIGURED chans.x.0.address=ipc://@foo + peanut -addr localhost:47100 fmq-step IDLE "INIT DEVICE" chans.x.0.address=ipc://@foo + peanut -addr localhost:47100 state-stream + peanut -addr localhost:47100 -mode direct transition STANDBY CONFIGURED +`) +} diff --git a/occ/peanut/flatten/flatten.go b/occ/peanut/flatten/flatten.go index 09db104a1..fbf9b2a58 100644 --- a/occ/peanut/flatten/flatten.go +++ b/occ/peanut/flatten/flatten.go @@ -94,6 +94,7 @@ package flatten import ( "encoding/json" "errors" + "fmt" "strconv" "gopkg.in/yaml.v3" @@ -175,7 +176,7 @@ func flatten(top bool, flatMap map[string]interface{}, nested interface{}, prefi switch nested.(type) { case map[interface{}]interface{}: for k, v := range nested.(map[interface{}]interface{}) { - newKey := enkey(top, prefix, k.(string), style) + newKey := enkey(top, prefix, fmt.Sprintf("%v", k), style) assign(newKey, v) } case map[string]interface{}: diff --git a/occ/peanut/peanut.go b/occ/peanut/peanut.go index 4e6f4d5f5..bab4497c6 100644 --- a/occ/peanut/peanut.go +++ b/occ/peanut/peanut.go @@ -32,28 +32,154 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "strconv" + "strings" "time" - "github.com/AliceO2Group/Control/common/controlmode" - "github.com/AliceO2Group/Control/executor/executorcmd" + "github.com/AliceO2Group/Control/executor/executorcmd/nopb" + "github.com/AliceO2Group/Control/executor/executorcmd/transitioner/fairmq" "github.com/AliceO2Group/Control/executor/protos" "github.com/AliceO2Group/Control/occ/peanut/flatten" "github.com/gdamore/tcell/v2" "github.com/rivo/tview" - log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) +// Options configures the peanut TUI. +type Options struct { + Addr string // host:port; if empty, falls back to OCC_CONTROL_PORT env var (occ mode only) + Mode string // "direct" (default), "fmq", or "fmq-step" +} + var ( app *tview.Application state string + tuiMode string + tuiAddr string + tuiConn *grpc.ClientConn + streamCancel context.CancelFunc + transitioning bool configMap map[string]string controlList *tview.List configTextView *tview.TextView - rpcClient *executorcmd.RpcClient + configPages *tview.Pages + occClient pb.OccClient ) +func monitorConnection(ctx context.Context) { + // Try StateStream first — gives state updates and disconnect detection. + stateStream, e := occClient.StateStream(ctx, &pb.StateStreamRequest{}) + if e == nil && stateStream != nil { + for { + msg, e := stateStream.Recv() + if e != nil { + if ctx.Err() != nil { + return + } + app.QueueUpdateDraw(func() { + state = "UNREACHABLE" + errorMessage(configPages, "Connection lost", e.Error()) + }) + return + } + app.QueueUpdateDraw(func() { + switch tuiMode { + case "fmq": + state = cliFMQToOCCState(msg.GetState()) + default: + state = msg.GetState() + } + }) + } + } + + // Try EventStream — disconnect detection only (no state in payload). + eventStream, e := occClient.EventStream(ctx, &pb.EventStreamRequest{}) + if e == nil && eventStream != nil { + for { + if _, e := eventStream.Recv(); e != nil { + if ctx.Err() != nil { + return + } + app.QueueUpdateDraw(func() { + state = "UNREACHABLE" + errorMessage(configPages, "Connection lost", e.Error()) + }) + return + } + } + } + + // Neither stream available — poll GetState every 2s. + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, e := occClient.GetState(ctx, &pb.GetStateRequest{}); e != nil { + if ctx.Err() != nil { + return + } + app.QueueUpdateDraw(func() { + state = "UNREACHABLE" + errorMessage(configPages, "Connection lost", e.Error()) + }) + return + } + } + } +} + +func connectRPC() { + if streamCancel != nil { + streamCancel() // stop any existing stream monitor + } + state = "CONNECTING" + go func() { + if tuiConn != nil { + tuiConn.Close() + tuiConn = nil + } + conn, e := grpc.Dial(tuiAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) //nolint:staticcheck + if e != nil { + app.QueueUpdateDraw(func() { + state = "UNREACHABLE" + errorMessage(configPages, "Connection failed", e.Error()) + }) + return + } + if tuiMode == "fmq" || tuiMode == "fmq-step" { + occClient = nopb.NewOccClient(conn) + } else { + occClient = pb.NewOccClient(conn) + } + response, e := occClient.GetState(context.TODO(), &pb.GetStateRequest{}) + if e != nil { + app.QueueUpdateDraw(func() { + state = "UNREACHABLE" + errorMessage(configPages, "Connection failed", e.Error()) + }) + return + } + tuiConn = conn + ctx, cancel := context.WithCancel(context.Background()) + streamCancel = cancel + go monitorConnection(ctx) + app.QueueUpdateDraw(func() { + switch tuiMode { + case "fmq": + state = cliFMQToOCCState(response.GetState()) + default: + state = response.GetState() + } + }) + }() +} + func modal(p tview.Primitive, width, height int) tview.Primitive { return tview.NewFlex(). AddItem(nil, 0, 1, false). @@ -65,7 +191,7 @@ func modal(p tview.Primitive, width, height int) tview.Primitive { } func transition(evt string) error { - args := make([]*pb.ConfigEntry, 0) + args := make([]*pb.ConfigEntry, 0, len(configMap)) for k, v := range configMap { args = append(args, &pb.ConfigEntry{Key: k, Value: v}) } @@ -78,41 +204,163 @@ func transition(evt string) error { }) } - response, err := rpcClient.Transition(context.TODO(), &pb.TransitionRequest{ - TransitionEvent: evt, - Arguments: args, - SrcState: state, - }, grpc.EmptyCallOption{}) - if err != nil { - app.Stop() + var ( + newState string + err error + ) + + if tuiMode == "fmq" { + newState, err = tuiFMQTransition(evt, args) + } else { + var response *pb.TransitionReply + response, err = occClient.Transition(context.TODO(), &pb.TransitionRequest{ + TransitionEvent: evt, + Arguments: args, + SrcState: state, + }) + if err == nil { + newState = response.GetState() + } + } - fmt.Println(err.Error()) + if err != nil { return err } if evt == "CONFIGURE" { configTextView.SetTitle("runtime configuration (PUSHED)") } - state = response.GetState() + state = newState return nil } +// tuiFMQTransition maps TUI event names to FairMQ multi-step sequences. +func tuiFMQTransition(evt string, args []*pb.ConfigEntry) (string, error) { + kvs := make(map[string]string, len(args)) + for _, e := range args { + kvs[e.Key] = e.Value + } + + switch evt { + case "CONFIGURE": + return tuiFMQConfigure(kvs) + case "RESET": + return tuiFMQReset(kvs) + case "START": + result, err := fmqDoStep(context.TODO(), occClient, fairmq.READY, fairmq.EvtRUN, kvs) + return cliFMQToOCCState(result), err + case "STOP": + result, err := fmqDoStep(context.TODO(), occClient, fairmq.RUNNING, fairmq.EvtSTOP, kvs) + return cliFMQToOCCState(result), err + case "RECOVER": + result, err := fmqDoStep(context.TODO(), occClient, fairmq.ERROR, fairmq.EvtRESET_DEVICE, kvs) + return cliFMQToOCCState(result), err + case "EXIT": + if state == "CONFIGURED" { + if _, err := tuiFMQReset(nil); err != nil { + return state, err + } + } + result, err := fmqDoStep(context.TODO(), occClient, fairmq.IDLE, fairmq.EvtEND, nil) + return cliFMQToOCCState(result), err + default: + return state, fmt.Errorf("unsupported transition %q in FairMQ mode", evt) + } +} + +func tuiFMQConfigure(args map[string]string) (string, error) { + state, err := fmqDoStep(context.TODO(), occClient, fairmq.IDLE, fairmq.EvtINIT_DEVICE, args) + if err != nil || state != fairmq.INITIALIZING_DEVICE { + return cliFMQToOCCState(state), fmt.Errorf("INIT DEVICE: expected %q got %q: %w", fairmq.INITIALIZING_DEVICE, state, err) + } + state, err = fmqDoStep(context.TODO(), occClient, fairmq.INITIALIZING_DEVICE, fairmq.EvtCOMPLETE_INIT, nil) + if err != nil || state != fairmq.INITIALIZED { + return cliFMQToOCCState(state), fmt.Errorf("COMPLETE INIT: expected %q got %q: %w", fairmq.INITIALIZED, state, err) + } + state, err = fmqDoStep(context.TODO(), occClient, fairmq.INITIALIZED, fairmq.EvtBIND, nil) + if err != nil || state != fairmq.BOUND { + fmqDoStep(context.TODO(), occClient, fairmq.INITIALIZED, fairmq.EvtRESET_DEVICE, nil) // rollback + return cliFMQToOCCState(state), fmt.Errorf("BIND: expected %q got %q: %w", fairmq.BOUND, state, err) + } + state, err = fmqDoStep(context.TODO(), occClient, fairmq.BOUND, fairmq.EvtCONNECT, nil) + if err != nil || state != fairmq.DEVICE_READY { + fmqDoStep(context.TODO(), occClient, fairmq.BOUND, fairmq.EvtRESET_DEVICE, nil) // rollback + return cliFMQToOCCState(state), fmt.Errorf("CONNECT: expected %q got %q: %w", fairmq.DEVICE_READY, state, err) + } + state, err = fmqDoStep(context.TODO(), occClient, fairmq.DEVICE_READY, fairmq.EvtINIT_TASK, nil) + if err != nil || state != fairmq.READY { + fmqDoStep(context.TODO(), occClient, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, nil) // rollback + return cliFMQToOCCState(state), fmt.Errorf("INIT TASK: expected %q got %q: %w", fairmq.READY, state, err) + } + return cliFMQToOCCState(state), nil +} + +func tuiFMQReset(args map[string]string) (string, error) { + state, err := fmqDoStep(context.TODO(), occClient, fairmq.READY, fairmq.EvtRESET_TASK, nil) + if err != nil || state != fairmq.DEVICE_READY { + return cliFMQToOCCState(state), fmt.Errorf("RESET TASK: expected %q got %q: %w", fairmq.DEVICE_READY, state, err) + } + state, err = fmqDoStep(context.TODO(), occClient, fairmq.DEVICE_READY, fairmq.EvtRESET_DEVICE, args) + return cliFMQToOCCState(state), err +} + func drawStatus(screen tcell.Screen, x int, y int, width int, height int) (int, int, int, int) { tview.Print(screen, state, x, height/2, width, tview.AlignCenter, tcell.ColorLime) return 0, 0, 0, 0 } +// pathComplete returns filesystem completions for the given partial path. +func pathComplete(text string) []string { + // Expand ~ to home directory + if strings.HasPrefix(text, "~/") { + if home, err := os.UserHomeDir(); err == nil { + text = filepath.Join(home, text[2:]) + } + } + + // Split into directory and filename prefix + dir, prefix := filepath.Split(text) + if dir == "" { + dir = "." + } + + entries, err := os.ReadDir(dir) + if err != nil { + return nil + } + + var matches []string + for _, entry := range entries { + name := entry.Name() + if !strings.HasPrefix(name, prefix) { + continue + } + full := filepath.Join(dir, name) + if entry.IsDir() { + full += "/" + } + matches = append(matches, full) + } + return matches +} + func acquireConfigFile(configPages *tview.Pages) error { configInputFrame := tview.NewForm() configInputFrame.SetTitle("file path for runtime configuration") configInputFrame.SetBorder(true) configInputFrame.AddInputField("path:", "", 0, nil, nil) + // Wire up filesystem tab-completion on the path input field + pathField := configInputFrame.GetFormItemByLabel("path:").(*tview.InputField) + pathField.SetAutocompleteFunc(func(currentText string) []string { + return pathComplete(currentText) + }) + configPages.AddPage("modal", modal(configInputFrame, 40, 10), true, true) + app.SetFocus(configInputFrame) configCancelFunc := func() { configPages.RemovePage("modal") app.SetFocus(controlList) - app.Draw() } configInputFrame.AddButton("Ok", func() { @@ -126,9 +374,6 @@ func acquireConfigFile(configPages *tview.Pages) error { configInputFrame.SetCancelFunc(configCancelFunc) configInputFrame.AddButton("Cancel", configCancelFunc) - app.SetFocus(configInputFrame) - - app.Draw() return nil } @@ -141,7 +386,6 @@ func errorMessage(configPages *tview.Pages, title string, text string) { configPages.AddPage("modal", modalPage, true, true) app.SetFocus(modalPage) - app.Draw() } func loadConfig(configFilePath string, configPages *tview.Pages) { @@ -149,21 +393,6 @@ func loadConfig(configFilePath string, configPages *tview.Pages) { errorMessage(configPages, "Cannot open configuration file", "path empty") return } - /*// Make sure we trim all variants - trimmed := strings.TrimPrefix(configFilePath, "file://") - trimmed = strings.TrimPrefix(trimmed, "file:/") - trimmed = strings.TrimPrefix(trimmed, "file:") - uri := "file://" + trimmed - cfg, err := configuration.NewConfiguration(uri) - if err != nil { - errorMessage(configPages, "Cannot open configuration file", err.Error()) - return - } - yamlConfig, err := cfg.GetRecursiveYaml("") - if err != nil { - errorMessage(configPages, "Cannot parse configuration file", err.Error()) - return - }*/ yamlConfig, err := ioutil.ReadFile(configFilePath) if err != nil { errorMessage(configPages, "Cannot open configuration file", err.Error()) @@ -178,82 +407,126 @@ func loadConfig(configFilePath string, configPages *tview.Pages) { configTextView.SetTitle("runtime configuration (NOT PUSHED)") - configMap = make(map[string]string) - err = json.Unmarshal([]byte(configTextView.GetText(false))[:], &configMap) + var rawMap map[string]interface{} + err = json.Unmarshal([]byte(configTextView.GetText(false)), &rawMap) if err != nil { errorMessage(configPages, "Cannot process configuration file", err.Error()) return } + configMap = make(map[string]string, len(rawMap)) + for k, v := range rawMap { + configMap[k] = fmt.Sprintf("%v", v) + } } -func Run(cmdString string) (err error) { +func Run(opts Options) (err error) { state = "UNKNOWN" + tuiMode = opts.Mode + if tuiMode == "" { + tuiMode = "direct" + } + + // Validate mode + switch tuiMode { + case "direct", "fmq", "fmq-step": + default: + return fmt.Errorf("unknown mode %q — valid modes: direct, fmq, fmq-step", tuiMode) + } + + // Resolve address + addr := opts.Addr + if addr == "" { + if tuiMode == "fmq" || tuiMode == "fmq-step" { + return fmt.Errorf("%s mode requires -addr flag", tuiMode) + } + // Fall back to OCC_CONTROL_PORT env var (direct mode legacy behaviour) + occPortString := os.Getenv("OCC_CONTROL_PORT") + if occPortString == "" { + return fmt.Errorf("OCC_CONTROL_PORT not defined") + } + occPort, e := strconv.ParseUint(occPortString, 10, 64) + if e != nil { + return e + } + addr = fmt.Sprintf("127.0.0.1:%d", occPort) + } // Setup UI app = tview.NewApplication() statusBox := tview.NewBox().SetBorder(true).SetTitle("state") - configTextView = tview.NewTextView().SetChangedFunc(func() { app.Draw() }) + configTextView = tview.NewTextView().SetChangedFunc(func() { app.QueueUpdateDraw(func() {}) }) configTextView.SetBorder(true).SetTitle("runtime configuration (EMPTY)") - configPages := tview.NewPages(). + configPages = tview.NewPages(). AddPage("configBox", configTextView, true, true) - controlList = tview.NewList(). - AddItem("Transition CONFIGURE", - "perform CONFIGURE transition", - 'c', - func() { - err = transition("CONFIGURE") - }). - AddItem("Transition RESET", - "perform RESET transition", - 'r', - func() { - err = transition("RESET") - }). - AddItem("Transition START", - "perform START transition", - 's', - func() { - err = transition("START") - }). - AddItem("Transition STOP", - "perform STOP transition", - 't', - func() { - err = transition("STOP") - }). - //AddItem("Transition GO_ERROR", - // "perform GO_ERROR transition", - // 'e', - // func(){ - // err = transition("GO_ERROR") - // app.Draw() - // }). - AddItem("Transition RECOVER", - "perform RECOVER transition", - 'v', - func() { - err = transition("RECOVER") - }). - AddItem("Transition EXIT", - "perform EXIT transition", - 'x', - func() { - err = transition("EXIT") - }). - AddItem("Load configuration", - "read runtime configuration from file", - 'l', - func() { - err = acquireConfigFile(configPages) - }). - AddItem("Quit", - "disconnect from the process and quit peanut", - 'q', - func() { - app.Stop() + doTransition := func(evt string) { + if transitioning { + return + } + transitioning = true + go func() { + e := transition(evt) + app.QueueUpdateDraw(func() { + transitioning = false + if e != nil { + err = e + errorMessage(configPages, "Transition error", e.Error()) + } }) + }() + } + + doFMQStep := func(event string) { + if transitioning { + return + } + transitioning = true + go func() { + kvs := make(map[string]string, len(configMap)) + for k, v := range configMap { + kvs[k] = v + } + newState, e := fmqDoStep(context.TODO(), occClient, state, event, kvs) + app.QueueUpdateDraw(func() { + transitioning = false + if e != nil { + err = e + errorMessage(configPages, "FMQ step error", e.Error()) + } else { + state = newState + } + }) + }() + } + + controlList = tview.NewList() + switch tuiMode { + case "fmq-step": + controlList. + AddItem("INIT DEVICE", "IDLE → INITIALIZING DEVICE", '1', func() { doFMQStep(fairmq.EvtINIT_DEVICE) }). + AddItem("COMPLETE INIT", "INITIALIZING DEVICE → INITIALIZED", '2', func() { doFMQStep(fairmq.EvtCOMPLETE_INIT) }). + AddItem("BIND", "INITIALIZED → BOUND", '3', func() { doFMQStep(fairmq.EvtBIND) }). + AddItem("CONNECT", "BOUND → DEVICE READY", '4', func() { doFMQStep(fairmq.EvtCONNECT) }). + AddItem("INIT TASK", "DEVICE READY → READY", '5', func() { doFMQStep(fairmq.EvtINIT_TASK) }). + AddItem("RUN", "READY → RUNNING", '6', func() { doFMQStep(fairmq.EvtRUN) }). + AddItem("STOP", "RUNNING → READY", '7', func() { doFMQStep(fairmq.EvtSTOP) }). + AddItem("RESET TASK", "READY → DEVICE READY", '8', func() { doFMQStep(fairmq.EvtRESET_TASK) }). + AddItem("RESET DEVICE", "→ IDLE", '9', func() { doFMQStep(fairmq.EvtRESET_DEVICE) }). + AddItem("END", "IDLE → EXITING", '0', func() { doFMQStep(fairmq.EvtEND) }) + default: // direct, fmq + controlList. + AddItem("Transition CONFIGURE", "perform CONFIGURE transition", 'c', func() { doTransition("CONFIGURE") }). + AddItem("Transition RESET", "perform RESET transition", 'r', func() { doTransition("RESET") }). + AddItem("Transition START", "perform START transition", 's', func() { doTransition("START") }). + AddItem("Transition STOP", "perform STOP transition", 't', func() { doTransition("STOP") }). + AddItem("Transition RECOVER", "perform RECOVER transition", 'v', func() { doTransition("RECOVER") }). + AddItem("Transition EXIT", "perform EXIT transition", 'x', func() { doTransition("EXIT") }) + } + controlList. + AddItem("Reconnect", "re-establish gRPC connection to the controlled process", 'n', func() { connectRPC() }). + AddItem("Load configuration", "read runtime configuration from file", 'l', func() { err = acquireConfigFile(configPages) }). + AddItem("Quit", "disconnect from the process and quit peanut", 'q', func() { app.Stop() }) controlList.SetBorder(true).SetTitle("control") flex := tview.NewFlex().AddItem(tview.NewFlex().SetDirection(tview.FlexRow). @@ -263,41 +536,8 @@ func Run(cmdString string) (err error) { statusBox.SetDrawFunc(drawStatus) - // Parse input - var occPort uint64 - if len(cmdString) > 0 { - // RUN process - } else { - // If cmdString not passed, env var OCC_CONTROL_PORT (occ/OccGlobals.h) must be defined - occPortString := os.Getenv("OCC_CONTROL_PORT") - if len(occPortString) == 0 { - err = fmt.Errorf("OCC_CONTROL_PORT not defined") - return - } - occPort, err = strconv.ParseUint(occPortString, 10, 64) - if err != nil { - return - } - } - - // Setup RPC - go func() { - // FIXME allow choice of controlmode.FAIRMQ - rpcClient = executorcmd.NewClient( - occPort, - controlmode.DIRECT, - executorcmd.ProtobufTransport, - log.WithField("id", "")) - var response *pb.GetStateReply - response, err = rpcClient.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) - if err != nil { - app.Stop() - fmt.Println(err.Error()) - return - } - // NOTE: we acquire the transitioner-dependent STANDBY equivalent state - state = rpcClient.FromDeviceState(response.GetState()) - app.Draw() - }() + // Connect to the controlled process + tuiAddr = addr + connectRPC() return app.SetRoot(flex, true).SetFocus(controlList).Run() }