diff --git a/changelog.md b/changelog.md index ace0426e..8d8b4d0c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,11 @@ +Upcoming (TBD) +============== + +Internal +--------- +* Collect CLI arguments into a dataclass. + + 1.66.0 (2026/03/21) ============== diff --git a/mycli/main.py b/mycli/main.py index d5f2b403..4541c0d2 100755 --- a/mycli/main.py +++ b/mycli/main.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections import defaultdict, namedtuple +from dataclasses import dataclass from decimal import Decimal import functools from io import TextIOWrapper @@ -31,6 +32,7 @@ from cli_helpers.tabular_output.output_formatter import MISSING_VALUE as DEFAULT_MISSING_VALUE from cli_helpers.utils import strip_ansi import click +import clickdc from configobj import ConfigObj import keyring from prompt_toolkit import print_formatted_text @@ -594,7 +596,7 @@ def connect( port: str | int | None = "", socket: str | None = "", character_set: str | None = "", - local_infile: bool = False, + local_infile: bool | None = False, ssl: dict[str, Any] | None = None, ssh_user: str | None = "", ssh_host: str | None = "", @@ -1907,174 +1909,279 @@ def get_last_query(self) -> str | None: return self.query_history[-1][0] if self.query_history else None +@dataclass(slots=True) +class CliArgs: + database: str | None = clickdc.argument( + type=str, + default=None, + nargs=1, + ) + host: str | None = clickdc.option( + '-h', + type=str, + envvar='MYSQL_HOST', + help='Host address of the database.', + ) + port: int | None = clickdc.option( + '-P', + type=int, + envvar='MYSQL_TCP_PORT', + help='Port number to use for connection. Honors $MYSQL_TCP_PORT.', + ) + user: str | None = clickdc.option( + '-u', + '--user', + '--username', + 'user', + type=str, + envvar='MYSQL_USER', + help='User name to connect to the database.', + ) + socket: str | None = clickdc.option( + '-S', + type=str, + envvar='MYSQL_UNIX_SOCKET', + help='The socket file to use for connection.', + ) + password: int | str | None = clickdc.option( + '-p', + '--pass', + '--password', + 'password', + type=INT_OR_STRING_CLICK_TYPE, + is_flag=False, + flag_value=EMPTY_PASSWORD_FLAG_SENTINEL, + help='Prompt for (or pass in cleartext) the password to connect to the database.', + ) + password_file: str | None = clickdc.option( + type=click.Path(), + help='File or FIFO path containing the password to connect to the db if not specified otherwise.', + ) + ssh_user: str | None = clickdc.option( + type=str, + help='User name to connect to ssh server.', + ) + ssh_host: str | None = clickdc.option( + type=str, + help='Host name to connect to ssh server.', + ) + ssh_port: int = clickdc.option( + type=int, + default=22, + help='Port to connect to ssh server.', + ) + ssh_password: str | None = clickdc.option( + type=str, + help='Password to connect to ssh server.', + ) + ssh_key_filename: str | None = clickdc.option( + type=str, + help='Private key filename (identify file) for the ssh connection.', + ) + ssh_config_path: str = clickdc.option( + type=str, + help='Path to ssh configuration.', + default=os.path.expanduser('~') + '/.ssh/config', + ) + ssh_config_host: str | None = clickdc.option( + type=str, + help='Host to connect to ssh server reading from ssh configuration.', + ) + list_ssh_config: bool = clickdc.option( + is_flag=True, + help='list ssh configurations in the ssh config (requires paramiko).', + ) + ssh_warning_off: bool = clickdc.option( + is_flag=True, + help='Suppress the SSH deprecation notice.', + ) + ssl_mode: str = clickdc.option( + type=click.Choice(['auto', 'on', 'off']), + help='Set desired SSL behavior. auto=preferred if TCP/IP, on=required, off=off.', + ) + deprecated_ssl: bool | None = clickdc.option( + '--ssl/--no-ssl', + 'deprecated_ssl', + default=None, + clickdc=None, + help='Enable SSL for connection (automatically enabled with other flags).', + ) + ssl_ca: str | None = clickdc.option( + type=click.Path(exists=True), + help='CA file in PEM format.', + ) + ssl_capath: str | None = clickdc.option( + type=click.Path(exists=True, file_okay=False, dir_okay=True), + help='CA directory.', + ) + ssl_cert: str | None = clickdc.option( + type=click.Path(exists=True), + help='X509 cert in PEM format.', + ) + ssl_key: str | None = clickdc.option( + type=click.Path(exists=True), + help='X509 key in PEM format.', + ) + ssl_cipher: str | None = clickdc.option( + type=str, + help='SSL cipher to use.', + ) + tls_version: str | None = clickdc.option( + type=click.Choice(['TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'], case_sensitive=False), + help='TLS protocol version for secure connection.', + ) + ssl_verify_server_cert: bool = clickdc.option( + is_flag=True, + help=('Verify server\'s "Common Name" in its cert against hostname used when connecting. This option is disabled by default.'), + ) + verbose: bool = clickdc.option( + '-v', + is_flag=True, + help='Verbose output.', + ) + dbname: str | None = clickdc.option( + '-D', + '--database', + 'dbname', + type=str, + clickdc=None, + help='Database or DSN to use for the connection.', + ) + dsn: str = clickdc.option( + '-d', + type=str, + default='', + envvar='DSN', + help='DSN alias configured in the ~/.myclirc file, or a full DSN.', + ) + list_dsn: bool = clickdc.option( + is_flag=True, + help='Show list of DSN aliases configured in the [alias_dsn] section of ~/.myclirc.', + ) + prompt: str | None = clickdc.option( + '-R', + type=str, + help=f'Prompt format (Default: "{MyCli.default_prompt}").', + ) + toolbar: str | None = clickdc.option( + type=str, + help='Toolbar format.', + ) + logfile: TextIOWrapper | None = clickdc.option( + '-l', + type=click.File(mode='a', encoding='utf-8'), + help='Log every query and its results to a file.', + ) + checkpoint: TextIOWrapper | None = clickdc.option( + type=click.File(mode='a', encoding='utf-8'), + help='In batch or --execute mode, log successful queries to a file.', + ) + defaults_group_suffix: str | None = clickdc.option( + type=str, + help='Read MySQL config groups with the specified suffix.', + ) + defaults_file: str | None = clickdc.option( + type=click.Path(), + help='Only read MySQL options from the given file.', + ) + myclirc: str = clickdc.option( + type=click.Path(), + default='~/.myclirc', + help='Location of myclirc file.', + ) + auto_vertical_output: bool = clickdc.option( + is_flag=True, + help='Automatically switch to vertical output mode if the result is wider than the terminal width.', + ) + show_warnings: bool = clickdc.option( + '--show-warnings/--no-show-warnings', + is_flag=True, + clickdc=None, + help='Automatically show warnings after executing a SQL statement.', + ) + table: bool = clickdc.option( + '-t', + is_flag=True, + help='Shorthand for --format=table.', + ) + csv: bool = clickdc.option( + is_flag=True, + help='Shorthand for --format=csv.', + ) + warn: bool | None = clickdc.option( + '--warn/--no-warn', + default=None, + clickdc=None, + help='Warn before running a destructive query.', + ) + local_infile: bool | None = clickdc.option( + type=bool, + is_flag=False, + default=None, + help='Enable/disable LOAD DATA LOCAL INFILE.', + ) + login_path: str | None = clickdc.option( + '-g', + type=str, + help='Read this path from the login file.', + ) + execute: str | None = clickdc.option( + '-e', + type=str, + help='Execute command and quit.', + ) + init_command: str | None = clickdc.option( + type=str, + help='SQL statement to execute after connecting.', + ) + unbuffered: bool | None = clickdc.option( + is_flag=True, + help='Instead of copying every row of data into a buffer, fetch rows as needed, to save memory.', + ) + character_set: str | None = clickdc.option( + '--charset', + '--character-set', + 'character_set', + type=str, + help='Character set for MySQL session.', + ) + batch: str | None = clickdc.option( + type=str, + help='SQL script to execute in batch mode.', + ) + noninteractive: bool = clickdc.option( + is_flag=True, + help="Don't prompt during batch input. Recommended.", + ) + format: str | None = clickdc.option( + type=click.Choice(['default', 'csv', 'tsv', 'table']), + help='Format for batch or --execute output.', + ) + throttle: float = clickdc.option( + type=int, + default=0.0, + help='Pause in seconds between queries in batch mode.', + ) + use_keyring: str | None = clickdc.option( + type=click.Choice(['true', 'false', 'reset']), + default=None, + help='Store and retrieve passwords from the system keyring: true/false/reset.', + ) + keepalive_ticks: int | None = clickdc.option( + type=int, + help='Send regular keepalive pings to the connection, roughly every seconds.', + ) + checkup: bool = clickdc.option( + is_flag=True, + help='Run a checkup on your configuration.', + ) + + @click.command() -@click.option("-h", "--host", envvar="MYSQL_HOST", help="Host address of the database.") -@click.option("-P", "--port", envvar="MYSQL_TCP_PORT", type=int, help="Port number to use for connection. Honors $MYSQL_TCP_PORT.") -@click.option( - '-u', - '--user', - '--username', - 'user', - envvar='MYSQL_USER', - help='User name to connect to the database.', -) -@click.option("-S", "--socket", envvar="MYSQL_UNIX_SOCKET", help="The socket file to use for connection.") -@click.option( - "-p", - "--pass", - "--password", - "password", - is_flag=False, - flag_value=EMPTY_PASSWORD_FLAG_SENTINEL, - type=INT_OR_STRING_CLICK_TYPE, - help="Prompt for (or pass in cleartext) the password to connect to the database.", -) -@click.option("--ssh-user", help="User name to connect to ssh server.") -@click.option("--ssh-host", help="Host name to connect to ssh server.") -@click.option("--ssh-port", default=22, help="Port to connect to ssh server.") -@click.option("--ssh-password", help="Password to connect to ssh server.") -@click.option("--ssh-key-filename", help="Private key filename (identify file) for the ssh connection.") -@click.option("--ssh-config-path", help="Path to ssh configuration.", default=os.path.expanduser("~") + "/.ssh/config") -@click.option("--ssh-config-host", help="Host to connect to ssh server reading from ssh configuration.") -@click.option( - "--ssl-mode", - "ssl_mode", - help="Set desired SSL behavior. auto=preferred if TCP/IP, on=required, off=off.", - type=click.Choice(["auto", "on", "off"]), -) -@click.option("--ssl/--no-ssl", "ssl_enable", default=None, help="Enable SSL for connection (automatically enabled with other flags).") -@click.option("--ssl-ca", help="CA file in PEM format.", type=click.Path(exists=True)) -@click.option("--ssl-capath", help="CA directory.", type=click.Path(exists=True, file_okay=False, dir_okay=True)) -@click.option("--ssl-cert", help="X509 cert in PEM format.", type=click.Path(exists=True)) -@click.option("--ssl-key", help="X509 key in PEM format.", type=click.Path(exists=True)) -@click.option("--ssl-cipher", help="SSL cipher to use.") -@click.option( - "--tls-version", - type=click.Choice(["TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3"], case_sensitive=False), - help="TLS protocol version for secure connection.", -) -@click.option( - "--ssl-verify-server-cert", - is_flag=True, - help=("""Verify server's "Common Name" in its cert against hostname used when connecting. This option is disabled by default."""), -) -@click.version_option(__version__, "-V", "--version", help="Output mycli's version.") -@click.option("-v", "--verbose", is_flag=True, help="Verbose output.") -@click.option("-D", "--database", "dbname", help="Database or DSN to use for the connection.") -@click.option("-d", "--dsn", 'dsn_alias', default="", envvar="DSN", help="DSN alias configured in the ~/.myclirc file, or a full DSN.") -@click.option( - "--list-dsn", "list_dsn", is_flag=True, help="list of DSN aliases configured in the [alias_dsn] section of the ~/.myclirc file." -) -@click.option("--list-ssh-config", "list_ssh_config", is_flag=True, help="list ssh configurations in the ssh config (requires paramiko).") -@click.option("--ssh-warning-off", is_flag=True, help="Suppress the SSH deprecation notice.") -@click.option("-R", "--prompt", "prompt", help=f'Prompt format (Default: "{MyCli.default_prompt}").') -@click.option('--toolbar', 'toolbar_format', help='Toolbar format.') -@click.option("-l", "--logfile", type=click.File(mode="a", encoding="utf-8"), help="Log every query and its results to a file.") -@click.option( - "--checkpoint", type=click.File(mode="a", encoding="utf-8"), help="In batch or --execute mode, log successful queries to a file." -) -@click.option("--defaults-group-suffix", type=str, help="Read MySQL config groups with the specified suffix.") -@click.option("--defaults-file", type=click.Path(), help="Only read MySQL options from the given file.") -@click.option("--myclirc", type=click.Path(), default="~/.myclirc", help="Location of myclirc file.") -@click.option( - "--auto-vertical-output", - is_flag=True, - help="Automatically switch to vertical output mode if the result is wider than the terminal width.", -) -@click.option( - "--show-warnings/--no-show-warnings", "show_warnings", is_flag=True, help="Automatically show warnings after executing a SQL statement." -) -@click.option("-t", "--table", is_flag=True, help="Shorthand for --format=table.") -@click.option("--csv", is_flag=True, help="Shorthand for --format=csv.") -@click.option("--warn/--no-warn", default=None, help="Warn before running a destructive query.") -@click.option("--local-infile", type=bool, help="Enable/disable LOAD DATA LOCAL INFILE.") -@click.option("-g", "--login-path", type=str, help="Read this path from the login file.") -@click.option("-e", "--execute", type=str, help="Execute command and quit.") -@click.option("--init-command", type=str, help="SQL statement to execute after connecting.") -@click.option( - "--unbuffered", is_flag=True, help="Instead of copying every row of data into a buffer, fetch rows as needed, to save memory." -) -@click.option("--character-set", "--charset", type=str, help="Character set for MySQL session.") -@click.option( - "--password-file", type=click.Path(), help="File or FIFO path containing the password to connect to the db if not specified otherwise." -) -@click.argument("database", default=None, nargs=1) -@click.option('--batch', 'batch_file', type=str, help='SQL script to execute in batch mode.') -@click.option("--noninteractive", is_flag=True, help="Don't prompt during batch input. Recommended.") -@click.option( - '--format', 'batch_format', type=click.Choice(['default', 'csv', 'tsv', 'table']), help='Format for batch or --execute output.' -) -@click.option('--throttle', type=float, default=0.0, help='Pause in seconds between queries in batch mode.') -@click.option( - '--use-keyring', - 'use_keyring_cli_opt', - type=click.Choice(['true', 'false', 'reset']), - default=None, - help='Store and retrieve passwords from the system keyring: true/false/reset.', -) -@click.option( - '--keepalive-ticks', - type=int, - help='Send regular keepalive pings to the connection, roughly every seconds.', -) -@click.option("--checkup", is_flag=True, help="Run a checkup on your config file.") -@click.pass_context +@clickdc.adddc('cli_args', CliArgs) +@click.version_option(__version__, '--version', '-V', help='Output mycli\'s version.') def click_entrypoint( - ctx: click.Context, - database: str | None, - user: str | None, - host: str | None, - port: int | None, - socket: str | None, - password: str | int | None, - dbname: str | None, - verbose: bool, - prompt: str | None, - toolbar_format: str | None, - logfile: TextIOWrapper | None, - checkpoint: TextIOWrapper | None, - defaults_group_suffix: str | None, - defaults_file: str | None, - login_path: str | None, - auto_vertical_output: bool, - show_warnings: bool, - local_infile: bool, - ssl_mode: str | None, - ssl_enable: bool, - ssl_ca: str | None, - ssl_capath: str | None, - ssl_cert: str | None, - ssl_key: str | None, - ssl_cipher: str | None, - tls_version: str | None, - ssl_verify_server_cert: bool, - table: bool, - csv: bool, - warn: bool | None, - execute: str | None, - myclirc: str, - dsn_alias: str, - list_dsn: str | None, - ssh_user: str | None, - ssh_host: str | None, - ssh_port: int, - ssh_password: str | None, - ssh_key_filename: str | None, - list_ssh_config: bool, - ssh_config_path: str, - ssh_config_host: str | None, - ssh_warning_off: bool | None, - init_command: str | None, - unbuffered: bool | None, - character_set: str | None, - password_file: str | None, - noninteractive: bool, - batch_file: str | None, - batch_format: str | None, - throttle: float, - use_keyring_cli_opt: str | None, - checkup: bool, - keepalive_ticks: int | None, + cli_args: CliArgs, ) -> None: """A MySQL terminal client with auto-completion and syntax highlighting. @@ -2108,62 +2215,62 @@ def get_password_from_file(password_file: str | None) -> str | None: # if the password value looks like a DSN, treat it as such and # prompt for password - if database is None and isinstance(password, str) and "://" in password: + if cli_args.database is None and isinstance(cli_args.password, str) and "://" in cli_args.password: # check if the scheme is valid. We do not actually have any logic for these, but # it will most usefully catch the case where we erroneously catch someone's # password, and give them an easy error message to follow / report - is_valid_scheme, scheme = is_valid_connection_scheme(password) + is_valid_scheme, scheme = is_valid_connection_scheme(cli_args.password) if not is_valid_scheme: click.secho(f"Error: Unknown connection scheme provided for DSN URI ({scheme}://)", err=True, fg="red") sys.exit(1) - database = password - password = EMPTY_PASSWORD_FLAG_SENTINEL + cli_args.database = cli_args.password + cli_args.password = EMPTY_PASSWORD_FLAG_SENTINEL # if the password is not specified try to set it using the password_file option - if password is None and password_file: - password_from_file = get_password_from_file(password_file) + if cli_args.password is None and cli_args.password_file: + password_from_file = get_password_from_file(cli_args.password_file) if password_from_file is not None: - password = password_from_file + cli_args.password = password_from_file # getting the envvar ourselves because the envvar from a click # option cannot be an empty string, but a password can be - if password is None and os.environ.get("MYSQL_PWD") is not None: - password = os.environ.get("MYSQL_PWD") + if cli_args.password is None and os.environ.get("MYSQL_PWD") is not None: + cli_args.password = os.environ.get("MYSQL_PWD") mycli = MyCli( - prompt=prompt, - toolbar_format=toolbar_format, - logfile=logfile, - defaults_suffix=defaults_group_suffix, - defaults_file=defaults_file, - login_path=login_path, - auto_vertical_output=auto_vertical_output, - warn=warn, - myclirc=myclirc, + prompt=cli_args.prompt, + toolbar_format=cli_args.toolbar, + logfile=cli_args.logfile, + defaults_suffix=cli_args.defaults_group_suffix, + defaults_file=cli_args.defaults_file, + login_path=cli_args.login_path, + auto_vertical_output=cli_args.auto_vertical_output, + warn=cli_args.warn, + myclirc=cli_args.myclirc, ) - if checkup: + if cli_args.checkup: do_checkup(mycli) sys.exit(0) - if csv and batch_format not in [None, 'csv']: + if cli_args.csv and cli_args.format not in [None, 'csv']: click.secho("Conflicting --csv and --format arguments.", err=True, fg="red") sys.exit(1) - if table and batch_format not in [None, 'table']: + if cli_args.table and cli_args.format not in [None, 'table']: click.secho("Conflicting --table and --format arguments.", err=True, fg="red") sys.exit(1) - if not batch_format: - batch_format = 'default' + if not cli_args.format: + cli_args.format = 'default' - if csv: - batch_format = 'csv' + if cli_args.csv: + cli_args.format = 'csv' - if table: - batch_format = 'table' + if cli_args.table: + cli_args.format = 'table' - if ssl_enable is not None: + if cli_args.deprecated_ssl is not None: click.secho( "Warning: The --ssl/--no-ssl CLI options are deprecated and will be removed in a future release. " "Please use the \"default_ssl_mode\" config option or --ssl-mode CLI flag instead. " @@ -2173,14 +2280,24 @@ def get_password_from_file(password_file: str | None) -> str | None: ) # ssh_port and ssh_config_path have truthy defaults and are not included - if any([ssh_user, ssh_host, ssh_password, ssh_key_filename, list_ssh_config, ssh_config_host]) and not ssh_warning_off: + if ( + any([ + cli_args.ssh_user, + cli_args.ssh_host, + cli_args.ssh_password, + cli_args.ssh_key_filename, + cli_args.list_ssh_config, + cli_args.ssh_config_host, + ]) + and not cli_args.ssh_warning_off + ): click.secho( f"Warning: The built-in SSH functionality is deprecated and will be removed in a future release. See issue {ISSUES_URL}/1464", err=True, fg="red", ) - if list_dsn: + if cli_args.list_dsn: try: alias_dsn = mycli.config["alias_dsn"] except KeyError: @@ -2190,21 +2307,21 @@ def get_password_from_file(password_file: str | None) -> str | None: click.secho(str(e), err=True, fg="red") sys.exit(1) for alias, value in alias_dsn.items(): - if verbose: + if cli_args.verbose: click.secho(f"{alias} : {value}") else: click.secho(alias) sys.exit(0) - if list_ssh_config: - ssh_config = read_ssh_config(ssh_config_path) + if cli_args.list_ssh_config: + ssh_config = read_ssh_config(cli_args.ssh_config_path) try: host_entries = ssh_config.get_hostnames() except KeyError: click.secho('Error reading ssh config', err=True, fg="red") sys.exit(1) for host_entry in host_entries: - if verbose: + if cli_args.verbose: host_config = ssh_config.lookup(host_entry) click.secho(f"{host_entry} : {host_config.get('hostname')}") else: @@ -2219,35 +2336,41 @@ def get_password_from_file(password_file: str | None) -> str | None: err=True, fg="red", ) - if not socket: - socket = os.environ['MYSQL_UNIX_PORT'] + if not cli_args.socket: + cli_args.socket = os.environ['MYSQL_UNIX_PORT'] # Choose which ever one has a valid value. - database = dbname or database + database = cli_args.dbname or cli_args.database dsn_uri = None # Treat the database argument as a DSN alias only if it matches a configured alias # todo why is port tested but not socket? - truthy_password = password not in (None, EMPTY_PASSWORD_FLAG_SENTINEL) + truthy_password = cli_args.password not in (None, EMPTY_PASSWORD_FLAG_SENTINEL) if ( database and "://" not in database - and not any([user, truthy_password, host, port, login_path]) + and not any([ + cli_args.user, + truthy_password, + cli_args.host, + cli_args.port, + cli_args.login_path, + ]) and database in mycli.config.get("alias_dsn", {}) ): - dsn_alias, database = database, "" + cli_args.dsn, database = database, "" if database and "://" in database: dsn_uri, database = database, "" - if dsn_alias: + if cli_args.dsn: try: - dsn_uri = mycli.config["alias_dsn"][dsn_alias] + dsn_uri = mycli.config["alias_dsn"][cli_args.dsn] except KeyError: - is_valid_scheme, scheme = is_valid_connection_scheme(dsn_alias) + is_valid_scheme, scheme = is_valid_connection_scheme(cli_args.dsn) if is_valid_scheme: - dsn_uri = dsn_alias + dsn_uri = cli_args.dsn else: click.secho( "Could not find the specified DSN in the config file. Please check the \"[alias_dsn]\" section in your myclirc.", @@ -2256,21 +2379,21 @@ def get_password_from_file(password_file: str | None) -> str | None: ) sys.exit(1) else: - mycli.dsn_alias = dsn_alias + mycli.dsn_alias = cli_args.dsn if dsn_uri: uri = urlparse(dsn_uri) if not database: database = uri.path[1:] # ignore the leading fwd slash - if not user and uri.username is not None: - user = unquote(uri.username) + if not cli_args.user and uri.username is not None: + cli_args.user = unquote(uri.username) # todo: rationalize the behavior of empty-string passwords here - if not password and uri.password is not None: - password = unquote(uri.password) - if not host: - host = uri.hostname - if not port: - port = uri.port + if not cli_args.password and uri.password is not None: + cli_args.password = unquote(uri.password) + if not cli_args.host: + cli_args.host = uri.hostname + if not cli_args.port: + cli_args.port = uri.port if uri.query: dsn_params = parse_qs(uri.query) @@ -2286,81 +2409,88 @@ def get_password_from_file(password_file: str | None) -> str | None: fg='yellow', ) if params[0].lower() == 'true': - ssl_mode = 'on' + cli_args.ssl_mode = 'on' if params := dsn_params.get('ssl_mode'): - ssl_mode = ssl_mode or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or params[0] if params := dsn_params.get('ssl_ca'): - ssl_ca = ssl_ca or params[0] - ssl_mode = ssl_mode or 'on' + cli_args.ssl_ca = cli_args.ssl_ca or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('ssl_capath'): - ssl_capath = ssl_capath or params[0] - ssl_mode = ssl_mode or 'on' + cli_args.ssl_capath = cli_args.ssl_capath or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('ssl_cert'): - ssl_cert = ssl_cert or params[0] - ssl_mode = ssl_mode or 'on' + cli_args.ssl_cert = cli_args.ssl_cert or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('ssl_key'): - ssl_key = ssl_key or params[0] - ssl_mode = ssl_mode or 'on' + cli_args.ssl_key = cli_args.ssl_key or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('ssl_cipher'): - ssl_cipher = ssl_cipher or params[0] - ssl_mode = ssl_mode or 'on' + cli_args.ssl_cipher = cli_args.ssl_cipher or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('tls_version'): - tls_version = tls_version or params[0] - ssl_mode = ssl_mode or 'on' + cli_args.tls_version = cli_args.tls_version or params[0] + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('ssl_verify_server_cert'): - ssl_verify_server_cert = ssl_verify_server_cert or (params[0].lower() == 'true') - ssl_mode = ssl_mode or 'on' + cli_args.ssl_verify_server_cert = cli_args.ssl_verify_server_cert or (params[0].lower() == 'true') + cli_args.ssl_mode = cli_args.ssl_mode or 'on' if params := dsn_params.get('socket'): - socket = socket or params[0] + cli_args.socket = cli_args.socket or params[0] if params := dsn_params.get('keepalive_ticks'): - if keepalive_ticks is None: - keepalive_ticks = int(params[0]) + if cli_args.keepalive_ticks is None: + cli_args.keepalive_ticks = int(params[0]) if params := dsn_params.get('character_set'): - character_set = character_set or params[0] + cli_args.character_set = cli_args.character_set or params[0] - keepalive_ticks = keepalive_ticks if keepalive_ticks is not None else mycli.default_keepalive_ticks - ssl_mode = ssl_mode or mycli.ssl_mode # cli option or config option + keepalive_ticks = cli_args.keepalive_ticks if cli_args.keepalive_ticks is not None else mycli.default_keepalive_ticks + ssl_mode = cli_args.ssl_mode or mycli.ssl_mode # if there is a mismatch between the ssl_mode value and other sources of ssl config, show a warning - # specifically using "is False" to not pickup the case where ssl_enable is None (not set by the user) - if ssl_enable and ssl_mode == "off" or ssl_enable is False and ssl_mode in ("auto", "on"): + # specifically using "is False" to not pickup the case where cli_args.deprecated_ssl is None (not set by the user) + if cli_args.deprecated_ssl and ssl_mode == "off" or cli_args.deprecated_ssl is False and ssl_mode in ("auto", "on"): click.secho( f"Warning: The current ssl_mode value of '{ssl_mode}' is overriding the value provided by " - f"either the --ssl/--no-ssl CLI options or a DSN URI parameter (ssl={ssl_enable}).", + f"either the --ssl/--no-ssl CLI options or a DSN URI parameter (ssl={cli_args.deprecated_ssl}).", err=True, fg="yellow", ) # configure SSL if ssl_mode is auto/on or if - # ssl_enable = True (from --ssl or a DSN URI) and ssl_mode is None - if ssl_mode in ("auto", "on") or (ssl_enable and ssl_mode is None): - if socket and ssl_mode == 'auto': + # cli_args.deprecated_ssl = True (from --ssl or a DSN URI) and ssl_mode is None + if ssl_mode in ("auto", "on") or (cli_args.deprecated_ssl and ssl_mode is None): + if cli_args.socket and ssl_mode == 'auto': ssl = None else: ssl = { "mode": ssl_mode, - "enable": ssl_enable, - "ca": ssl_ca and os.path.expanduser(ssl_ca), - "cert": ssl_cert and os.path.expanduser(ssl_cert), - "key": ssl_key and os.path.expanduser(ssl_key), - "capath": ssl_capath, - "cipher": ssl_cipher, - "tls_version": tls_version, - "check_hostname": ssl_verify_server_cert, + "enable": cli_args.deprecated_ssl, # todo: why is this set at all? + "ca": cli_args.ssl_ca and os.path.expanduser(cli_args.ssl_ca), + "cert": cli_args.ssl_cert and os.path.expanduser(cli_args.ssl_cert), + "key": cli_args.ssl_key and os.path.expanduser(cli_args.ssl_key), + "capath": cli_args.ssl_capath, + "cipher": cli_args.ssl_cipher, + "tls_version": cli_args.tls_version, + "check_hostname": cli_args.ssl_verify_server_cert, } # remove empty ssl options ssl = {k: v for k, v in ssl.items() if v is not None} else: ssl = None - if ssh_config_host: - ssh_config = read_ssh_config(ssh_config_path).lookup(ssh_config_host) - ssh_host = ssh_host if ssh_host else ssh_config.get("hostname") - ssh_user = ssh_user if ssh_user else ssh_config.get("user") - if ssh_config.get("port") and ssh_port == 22: + if cli_args.ssh_config_host: + ssh_config = read_ssh_config(cli_args.ssh_config_path).lookup(cli_args.ssh_config_host) + ssh_host = cli_args.ssh_host if cli_args.ssh_host else ssh_config.get("hostname") + ssh_user = cli_args.ssh_user if cli_args.ssh_user else ssh_config.get("user") + if ssh_config.get("port") and cli_args.ssh_port == 22: # port has a default value, overwrite it if it's in the config ssh_port = int(ssh_config.get("port")) - ssh_key_filename = ssh_key_filename if ssh_key_filename else ssh_config.get("identityfile", [None])[0] + else: + ssh_port = cli_args.ssh_port + ssh_key_filename = cli_args.ssh_key_filename if cli_args.ssh_key_filename else ssh_config.get("identityfile", [None])[0] + else: + ssh_host = cli_args.ssh_host + ssh_user = cli_args.ssh_user + ssh_port = cli_args.ssh_port + ssh_key_filename = cli_args.ssh_key_filename ssh_key_filename = ssh_key_filename and os.path.expanduser(ssh_key_filename) # Merge init-commands: global, DSN-specific, then CLI @@ -2373,32 +2503,32 @@ def get_password_from_file(password_file: str | None) -> str | None: elif val: init_cmds.append(val) # 2) DSN-specific init-commands - if dsn_alias: + if cli_args.dsn: alias_section = mycli.config.get("alias_dsn.init-commands", {}) - if dsn_alias in alias_section: - val = alias_section.get(dsn_alias) + if cli_args.dsn in alias_section: + val = alias_section.get(cli_args.dsn) if isinstance(val, (list, tuple)): init_cmds.extend(val) elif val: init_cmds.append(val) # 3) CLI-provided init_command - if init_command: - init_cmds.append(init_command) + if cli_args.init_command: + init_cmds.append(cli_args.init_command) combined_init_cmd = "; ".join(cmd.strip() for cmd in init_cmds if cmd) # --show-warnings / --no-show-warnings - if show_warnings: - mycli.show_warnings = show_warnings + if cli_args.show_warnings: + mycli.show_warnings = cli_args.show_warnings - if use_keyring_cli_opt is not None and use_keyring_cli_opt.lower() == 'reset': + if cli_args.use_keyring is not None and cli_args.use_keyring.lower() == 'reset': use_keyring = True reset_keyring = True - elif use_keyring_cli_opt is None: + elif cli_args.use_keyring is None: use_keyring = str_to_bool(mycli.config['main'].get('use_keyring', 'False')) reset_keyring = False else: - use_keyring = str_to_bool(use_keyring_cli_opt) + use_keyring = str_to_bool(cli_args.use_keyring) reset_keyring = False # todo: removeme after a period of transition @@ -2479,21 +2609,21 @@ def get_password_from_file(password_file: str | None) -> str | None: mycli.connect( database=database, - user=user, - passwd=password, - host=host, - port=port, - socket=socket, - local_infile=local_infile, + user=cli_args.user, + passwd=cli_args.password, + host=cli_args.host, + port=cli_args.port, + socket=cli_args.socket, + local_infile=cli_args.local_infile, ssl=ssl, ssh_user=ssh_user, ssh_host=ssh_host, ssh_port=ssh_port, - ssh_password=ssh_password, + ssh_password=cli_args.ssh_password, ssh_key_filename=ssh_key_filename, init_command=combined_init_cmd, - unbuffered=unbuffered, - character_set=character_set, + unbuffered=cli_args.unbuffered, + character_set=cli_args.character_set, use_keyring=use_keyring, reset_keyring=reset_keyring, keepalive_ticks=keepalive_ticks, @@ -2502,31 +2632,38 @@ def get_password_from_file(password_file: str | None) -> str | None: if combined_init_cmd: click.echo(f"Executing init-command: {combined_init_cmd}", err=True) - mycli.logger.debug("Launch Params: \n\tdatabase: %r\tuser: %r\thost: %r\tport: %r", database, user, host, port) + mycli.logger.debug( + "Launch Params: \n\tdatabase: %r\tuser: %r\thost: %r\tport: %r", + database, + cli_args.user, + cli_args.host, + cli_args.port, + ) # --execute argument - if execute: + if cli_args.execute: if not sys.stdin.isatty(): click.secho('Ignoring STDIN since --execute was also given.', err=True, fg='red') - if batch_file: + if cli_args.batch: click.secho('Ignoring --batch since --execute was also given.', err=True, fg='red') try: - if batch_format == 'csv': + execute_sql = cli_args.execute + if cli_args.format == 'csv': mycli.main_formatter.format_name = 'csv' - if execute.endswith(r'\G'): - execute = execute[:-2] - elif batch_format == 'tsv': + if execute_sql.endswith(r'\G'): + execute_sql = execute_sql[:-2] + elif cli_args.format == 'tsv': mycli.main_formatter.format_name = 'tsv' - if execute.endswith(r'\G'): - execute = execute[:-2] - elif batch_format == 'table': + if execute_sql.endswith(r'\G'): + execute_sql = execute_sql[:-2] + elif cli_args.format == 'table': mycli.main_formatter.format_name = 'ascii' - if execute.endswith(r'\G'): - execute = execute[:-2] + if execute_sql.endswith(r'\G'): + execute_sql = execute_sql[:-2] else: mycli.main_formatter.format_name = 'tsv' - mycli.run_query(execute, checkpoint=checkpoint) + mycli.run_query(execute_sql, checkpoint=cli_args.checkpoint) sys.exit(0) except Exception as e: click.secho(str(e), err=True, fg="red") @@ -2535,26 +2672,26 @@ def get_password_from_file(password_file: str | None) -> str | None: def dispatch_batch_statements(statements: str, batch_counter: int) -> None: if batch_counter: # this is imperfect if the first line of input has multiple statements - if batch_format == 'csv': + if cli_args.format == 'csv': mycli.main_formatter.format_name = 'csv-noheader' - elif batch_format == 'tsv': + elif cli_args.format == 'tsv': mycli.main_formatter.format_name = 'tsv_noheader' - elif batch_format == 'table': + elif cli_args.format == 'table': mycli.main_formatter.format_name = 'ascii' else: mycli.main_formatter.format_name = 'tsv' else: - if batch_format == 'csv': + if cli_args.format == 'csv': mycli.main_formatter.format_name = 'csv' - elif batch_format == 'tsv': + elif cli_args.format == 'tsv': mycli.main_formatter.format_name = 'tsv' - elif batch_format == 'table': + elif cli_args.format == 'table': mycli.main_formatter.format_name = 'ascii' else: mycli.main_formatter.format_name = 'tsv' warn_confirmed: bool | None = True - if not noninteractive and mycli.destructive_warning and is_destructive(mycli.destructive_keywords, statements): + if not cli_args.noninteractive and mycli.destructive_warning and is_destructive(mycli.destructive_keywords, statements): try: # this seems to work, even though we are reading from stdin above sys.stdin = open("/dev/tty") @@ -2565,21 +2702,21 @@ def dispatch_batch_statements(statements: str, batch_counter: int) -> None: sys.exit(1) try: if warn_confirmed: - if throttle and batch_counter >= 1: - sleep(throttle) - mycli.run_query(statements, checkpoint=checkpoint, new_line=True) + if cli_args.throttle and batch_counter >= 1: + sleep(cli_args.throttle) + mycli.run_query(statements, checkpoint=cli_args.checkpoint, new_line=True) except Exception as e: click.secho(str(e), err=True, fg="red") sys.exit(1) - if batch_file or not sys.stdin.isatty(): - if batch_file: - if not sys.stdin.isatty() and batch_file != '-': + if cli_args.batch or not sys.stdin.isatty(): + if cli_args.batch: + if not sys.stdin.isatty() and cli_args.batch != '-': click.secho('Ignoring STDIN since --batch was also given.', err=True, fg='red') try: - batch_h = click.open_file(batch_file) + batch_h = click.open_file(cli_args.batch) except (OSError, FileNotFoundError): - click.secho(f'Failed to open --batch file: {batch_file}', err=True, fg='red') + click.secho(f'Failed to open --batch file: {cli_args.batch}', err=True, fg='red') sys.exit(1) else: batch_h = click.get_text_stream('stdin') diff --git a/pyproject.toml b/pyproject.toml index 5470ea98..dc731e83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ authors = [{ name = "Mycli Core Team" }] dependencies = [ "click ~= 8.3.1", + "clickdc ~= 0.1.1", "cryptography ~= 46.0.5", "Pygments ~= 2.19.2", "prompt_toolkit>=3.0.6,<4.0.0", diff --git a/test/test_main.py b/test/test_main.py index f47e5beb..df56ab61 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -1097,6 +1097,216 @@ def run_query(self, query, new_line=True): assert MockMyCli.connect_args['passwd'] == EMPTY_PASSWORD_FLAG_SENTINEL +def test_password_option_uses_cleartext_value(monkeypatch): + class Formatter: + format_name = None + + class Logger: + def debug(self, *args, **args_dict): + pass + + def warning(self, *args, **args_dict): + pass + + class MockMyCli: + config = { + 'main': {}, + 'alias_dsn': {}, + 'connection': { + 'default_keepalive_ticks': 0, + }, + } + + def __init__(self, **_args): + self.logger = Logger() + self.destructive_warning = False + self.main_formatter = Formatter() + self.redirect_formatter = Formatter() + self.ssl_mode = 'auto' + self.my_cnf = {'client': {}, 'mysqld': {}} + self.default_keepalive_ticks = 0 + + def connect(self, **args): + MockMyCli.connect_args = args + + def run_query(self, query, new_line=True): + pass + + import mycli.main + + monkeypatch.setattr(mycli.main, 'MyCli', MockMyCli) + runner = CliRunner() + + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--user', + 'user', + '--host', + DEFAULT_HOST, + '--port', + f'{DEFAULT_PORT}', + '--database', + 'database', + '--password', + 'cleartext_password', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['passwd'] == 'cleartext_password' + + +def test_password_option_overrides_password_file_and_mysql_pwd(monkeypatch): + class Formatter: + format_name = None + + class Logger: + def debug(self, *args, **args_dict): + pass + + def warning(self, *args, **args_dict): + pass + + class MockMyCli: + config = { + 'main': {}, + 'alias_dsn': {}, + 'connection': { + 'default_keepalive_ticks': 0, + }, + } + + def __init__(self, **_args): + self.logger = Logger() + self.destructive_warning = False + self.main_formatter = Formatter() + self.redirect_formatter = Formatter() + self.ssl_mode = 'auto' + self.my_cnf = {'client': {}, 'mysqld': {}} + self.default_keepalive_ticks = 0 + + def connect(self, **args): + MockMyCli.connect_args = args + + def run_query(self, query, new_line=True): + pass + + import mycli.main + + monkeypatch.setattr(mycli.main, 'MyCli', MockMyCli) + monkeypatch.setenv('MYSQL_PWD', 'env_password') + runner = CliRunner() + + with NamedTemporaryFile(prefix=TEMPFILE_PREFIX, mode='w', delete=False) as password_file: + password_file.write('file_password\n') + password_file.flush() + + try: + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--user', + 'user', + '--host', + DEFAULT_HOST, + '--port', + f'{DEFAULT_PORT}', + '--database', + 'database', + '--password', + 'option_password', + '--password-file', + password_file.name, + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['passwd'] == 'option_password' + finally: + os.remove(password_file.name) + + +def test_password_file_option_reads_password(monkeypatch): + class Formatter: + format_name = None + + class Logger: + def debug(self, *args, **args_dict): + pass + + def warning(self, *args, **args_dict): + pass + + class MockMyCli: + config = { + 'main': {}, + 'alias_dsn': {}, + 'connection': { + 'default_keepalive_ticks': 0, + }, + } + + def __init__(self, **_args): + self.logger = Logger() + self.destructive_warning = False + self.main_formatter = Formatter() + self.redirect_formatter = Formatter() + self.ssl_mode = 'auto' + self.my_cnf = {'client': {}, 'mysqld': {}} + self.default_keepalive_ticks = 0 + + def connect(self, **args): + MockMyCli.connect_args = args + + def run_query(self, query, new_line=True): + pass + + import mycli.main + + monkeypatch.setattr(mycli.main, 'MyCli', MockMyCli) + runner = CliRunner() + + with NamedTemporaryFile(prefix=TEMPFILE_PREFIX, mode='w', delete=False) as password_file: + password_file.write('file_password\nsecond line ignored\n') + password_file.flush() + + try: + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--user', + 'user', + '--host', + DEFAULT_HOST, + '--port', + f'{DEFAULT_PORT}', + '--database', + 'database', + '--password-file', + password_file.name, + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['passwd'] == 'file_password' + finally: + os.remove(password_file.name) + + +def test_password_file_option_missing_file(): + runner = CliRunner() + missing_path = 'definitely_missing_password_file.txt' + + result = runner.invoke( + click_entrypoint, + args=[ + '--password-file', + missing_path, + ], + ) + + assert result.exit_code == 1 + assert f"Password file '{missing_path}' not found" in result.output + + def test_username_option_and_mysql_user_envvar(monkeypatch): class Formatter: format_name = None @@ -1170,6 +1380,209 @@ def run_query(self, query, new_line=True): assert MockMyCli.connect_args['user'] == 'env_user' +def test_host_option_and_mysql_host_envvar(monkeypatch): + class Formatter: + format_name = None + + class Logger: + def debug(self, *args, **args_dict): + pass + + def warning(self, *args, **args_dict): + pass + + class MockMyCli: + config = { + 'main': {}, + 'alias_dsn': {}, + 'connection': { + 'default_keepalive_ticks': 0, + }, + } + + def __init__(self, **_args): + self.logger = Logger() + self.destructive_warning = False + self.main_formatter = Formatter() + self.redirect_formatter = Formatter() + self.ssl_mode = 'auto' + self.my_cnf = {'client': {}, 'mysqld': {}} + self.default_keepalive_ticks = 0 + + def connect(self, **args): + MockMyCli.connect_args = args + + def run_query(self, query, new_line=True): + pass + + import mycli.main + + monkeypatch.setattr(mycli.main, 'MyCli', MockMyCli) + runner = CliRunner() + + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--host', + 'option_host', + '--port', + f'{DEFAULT_PORT}', + '--database', + 'database', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['host'] == 'option_host' + + MockMyCli.connect_args = None + monkeypatch.setenv('MYSQL_HOST', 'env_host') + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--port', + f'{DEFAULT_PORT}', + '--database', + 'database', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['host'] == 'env_host' + + +def test_port_option_and_mysql_tcp_port_envvar(monkeypatch): + class Formatter: + format_name = None + + class Logger: + def debug(self, *args, **args_dict): + pass + + def warning(self, *args, **args_dict): + pass + + class MockMyCli: + config = { + 'main': {}, + 'alias_dsn': {}, + 'connection': { + 'default_keepalive_ticks': 0, + }, + } + + def __init__(self, **_args): + self.logger = Logger() + self.destructive_warning = False + self.main_formatter = Formatter() + self.redirect_formatter = Formatter() + self.ssl_mode = 'auto' + self.my_cnf = {'client': {}, 'mysqld': {}} + self.default_keepalive_ticks = 0 + + def connect(self, **args): + MockMyCli.connect_args = args + + def run_query(self, query, new_line=True): + pass + + import mycli.main + + monkeypatch.setattr(mycli.main, 'MyCli', MockMyCli) + runner = CliRunner() + + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--host', + DEFAULT_HOST, + '--port', + '12345', + '--database', + 'database', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['port'] == 12345 + + MockMyCli.connect_args = None + monkeypatch.setenv('MYSQL_TCP_PORT', '23456') + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--host', + DEFAULT_HOST, + '--database', + 'database', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['port'] == 23456 + + +def test_socket_option_and_mysql_unix_socket_envvar(monkeypatch): + class Formatter: + format_name = None + + class Logger: + def debug(self, *args, **args_dict): + pass + + def warning(self, *args, **args_dict): + pass + + class MockMyCli: + config = { + 'main': {}, + 'alias_dsn': {}, + 'connection': { + 'default_keepalive_ticks': 0, + }, + } + + def __init__(self, **_args): + self.logger = Logger() + self.destructive_warning = False + self.main_formatter = Formatter() + self.redirect_formatter = Formatter() + self.ssl_mode = 'auto' + self.my_cnf = {'client': {}, 'mysqld': {}} + self.default_keepalive_ticks = 0 + + def connect(self, **args): + MockMyCli.connect_args = args + + def run_query(self, query, new_line=True): + pass + + import mycli.main + + monkeypatch.setattr(mycli.main, 'MyCli', MockMyCli) + runner = CliRunner() + + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--socket', + 'option.sock', + '--database', + 'database', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['socket'] == 'option.sock' + + MockMyCli.connect_args = None + monkeypatch.setenv('MYSQL_UNIX_SOCKET', 'env.sock') + result = runner.invoke( + mycli.main.click_entrypoint, + args=[ + '--database', + 'database', + ], + ) + assert result.exit_code == 0, result.output + ' ' + str(result.exception) + assert MockMyCli.connect_args['socket'] == 'env.sock' + + def test_mysql_user_envvar_overrides_dsn_resolution(monkeypatch): class Formatter: format_name = None @@ -1392,6 +1805,25 @@ def test_execute_with_logfile(executor): print(f"An error occurred while attempting to delete the file: {e}") +@dbtest +def test_execute_with_short_logfile_option(executor): + """Test that --execute combines with -l""" + sql = 'select 1' + runner = CliRunner() + + with NamedTemporaryFile(prefix=TEMPFILE_PREFIX, mode="w", delete=False) as logfile: + result = runner.invoke(mycli.main.click_entrypoint, args=CLI_ARGS + ["-l", logfile.name, "--execute", sql]) + assert result.exit_code == 0 + + assert os.path.getsize(logfile.name) > 0 + + try: + if os.path.exists(logfile.name): + os.remove(logfile.name) + except Exception as e: + print(f"An error occurred while attempting to delete the file: {e}") + + def _noninteractive_mock_mycli(monkeypatch): class Formatter: format_name = None