From 73f683533fe3598709a8f31c0f197c0bf76f80f6 Mon Sep 17 00:00:00 2001 From: Wes Barnett Date: Fri, 26 Mar 2021 21:22:19 -0400 Subject: [PATCH] Create CommandParser --- scripts/snap_pac.py | 124 +++++++++++++++++++++++-------------------- tests/test_script.py | 118 ++++++++++++++-------------------------- 2 files changed, 104 insertions(+), 138 deletions(-) diff --git a/scripts/snap_pac.py b/scripts/snap_pac.py index b7c4bb0..d71b349 100755 --- a/scripts/snap_pac.py +++ b/scripts/snap_pac.py @@ -59,6 +59,66 @@ class SnapperCmd: return " ".join(self.cmd) +class ConfigProcessor: + + def __init__(self, ini_file, parent_cmd, packages, snapshot_type): + """Set up defaults for snap-pac configuration.""" + + self.parent_cmd = parent_cmd + self.packages = packages + self.snapshot_type = snapshot_type + + self.config = ConfigParser() + self.config["DEFAULT"] = { + "snapshot": False, + "cleanup_algorithm": "number", + "pre_description": parent_cmd, + "post_description": " ".join(packages), + "desc_limit": 72, + "important_packages": [], + "important_commands": [], + "userdata": [] + } + self.config["root"] = { + "snapshot": True + } + self.config.read(ini_file) + + def get_cleanup_algorithm(self, section): + return self.config.get(section, "cleanup_algorithm") + + def get_description(self, section): + desc_limit = self.config.getint(section, "desc_limit") + return self.config.get(section, f"{self.snapshot_type}_description")[:desc_limit] + + def check_important_commands(self, section): + return self.parent_cmd in json.loads(self.config.get(section, "important_commands")) + + def check_important_packages(self, section): + important_packages = json.loads(self.config.get(section, "important_packages")) + return any(x in important_packages for x in self.packages) + + def check_important(self, section): + return (self.check_important_commands(section) or + self.check_important_packages(section)) + + def get_userdata(self, section): + userdata = set(json.loads(self.config.get(section, "userdata"))) + if self.check_important(section): + userdata.add("important=yes") + return ",".join(sorted(list(userdata))) + + def __call__(self, section): + if section not in self.config: + self.config.add_section(section) + return { + "description": self.get_description(section), + "cleanup_algorithm": self.get_cleanup_algorithm(section), + "userdata": self.get_userdata(section), + "snapshot": self.config.getboolean(section, "snapshot") + } + + def get_snapper_configs(conf_file): """Get the snapper configurations.""" for line in conf_file.read_text().split("\n"): @@ -67,32 +127,6 @@ def get_snapper_configs(conf_file): return line[1].lstrip("\"").split() -def setup_config_parser(ini_file, parent_cmd, packages): - """Set up defaults for snap-pac configuration.""" - - config = ConfigParser() - config["DEFAULT"] = { - "snapshot": False, - "cleanup_algorithm": "number", - "pre_description": parent_cmd, - "post_description": " ".join(packages), - "desc_limit": 72, - "important_packages": [], - "important_commands": [], - "userdata": [] - } - config["root"] = { - "snapshot": True - } - config.read(ini_file) - return config - - -def get_description(snapshot_type, config, section): - desc_limit = config.getint(section, "desc_limit") - return config.get(section, f"{snapshot_type}_description")[:desc_limit] - - def get_pre_number(snapshot_type, prefile): if snapshot_type == "pre": pre_number = None @@ -106,23 +140,6 @@ def get_pre_number(snapshot_type, prefile): return pre_number -def check_important_commands(config, snapper_config, parent_cmd): - important_commands = json.loads(config.get(snapper_config, "important_commands")) - return parent_cmd in important_commands - - -def check_important_packages(config, snapper_config, packages): - important_packages = json.loads(config.get(snapper_config, "important_packages")) - return any(x in important_packages for x in packages) - - -def get_userdata(config, snapper_config, important): - userdata = set(json.loads(config.get(snapper_config, "userdata"))) - if important: - userdata.add("important=yes") - return ",".join(sorted(list(userdata))) - - def check_skip(): return os.getenv("SNAP_PAC_SKIP", "n").lower() in ["y", "yes", "true", "1"] @@ -151,30 +168,19 @@ if __name__ == "__main__": parent_cmd = os.popen(f"ps -p {os.getppid()} -o args=").read().strip() packages = [line.rstrip("\n") for line in sys.stdin] - config = setup_config_parser(snap_pac_ini, parent_cmd, packages) + config_processor = ConfigProcessor(snap_pac_ini, parent_cmd, packages, snapshot_type) snapper_configs = get_snapper_configs(snapper_conf_file) chroot = os.stat("/") != os.stat("/proc/1/root/.") tmpdir = Path(tempfile.gettempdir()) for snapper_config in snapper_configs: - if snapper_config not in config: - config.add_section(snapper_config) - - if config.getboolean(snapper_config, "snapshot"): + data = config_processor(snapper_config) + if data["snapshot"]: prefile = tmpdir / f"snap-pac-pre_{snapper_config}" - - cleanup_algorithm = config.get(snapper_config, "cleanup_algorithm") - description = get_description(snapshot_type, config, snapper_config) pre_number = get_pre_number(snapshot_type, prefile) - - important = (check_important_commands(config, snapper_config, parent_cmd) or - check_important_packages(config, snapper_config, packages)) - - userdata = get_userdata(config, snapper_config, important) - - snapper_cmd = SnapperCmd(snapper_config, snapshot_type, cleanup_algorithm, - description, chroot, pre_number, userdata) + snapper_cmd = SnapperCmd(snapper_config, snapshot_type, data["cleanup_algorithm"], + data["description"], chroot, pre_number, data["userdata"]) num = snapper_cmd() logging.info(f"==> {snapper_config}: {num}") diff --git a/tests/test_script.py b/tests/test_script.py index 903ac16..c2dccd2 100644 --- a/tests/test_script.py +++ b/tests/test_script.py @@ -1,38 +1,10 @@ -from configparser import ConfigParser import tempfile from pathlib import Path import os import pytest -from scripts.snap_pac import ( - SnapperCmd, check_important_commands, check_important_packages, check_skip, get_pre_number, get_snapper_configs, - get_userdata, setup_config_parser, get_description -) - - -@pytest.fixture -def config(): - config = ConfigParser() - config["DEFAULT"] = { - "snapshot": False, - "cleanup_algorithm": "number", - "pre_description": "foo", - "post_description": "bar", - "desc_limit": 72, - "important_packages": [], - "important_commands": [], - "userdata": [] - } - config["root"] = { - "snapshot": True - } - config["home"] = { - "snapshot": True, - "desc_limit": 3, - "post_description": "a really long description" - } - return config +from scripts.snap_pac import SnapperCmd, ConfigProcessor, check_skip, get_pre_number, get_snapper_configs @pytest.fixture @@ -90,15 +62,50 @@ def test_skip_snap_pac(): assert check_skip() is True -def test_setup_config_parser(config): +@pytest.mark.parametrize("section, command, packages, snapshot_type, result", [ + ( + "root", "foo", ["bar"], "pre", + {"description": "foo", "cleanup_algorithm": "number", "userdata": "", "snapshot": True} + ), + ( + "root", "pacman -Syu", [], "pre", + {"description": "pacman -Syu", "cleanup_algorithm": "number", "userdata": "important=yes", "snapshot": True} + ), + ( + "mail", "pacman -Syu", [], "pre", + {"description": "pacman -Syu", "cleanup_algorithm": "number", "userdata": "", "snapshot": False} + ), + ( + "home", "pacman -Syu", [], "pre", + {"description": "pac", "cleanup_algorithm": "number", "userdata": "foo=bar,requestid=42", "snapshot": True} + ), + ( + "home", "pacman -Syu", [], "post", + {"description": "a r", "cleanup_algorithm": "number", "userdata": "foo=bar,requestid=42", "snapshot": True} + ), + ( + "myconfig", "pacman -S linux", ["linux"], "post", + {"description": "linux", "cleanup_algorithm": "timeline", + "userdata": "foo=bar,important=yes,requestid=42", "snapshot": True} + ), +]) +def test_config_processor(section, command, packages, snapshot_type, result): with tempfile.NamedTemporaryFile("w", delete=False) as f: + f.write("[root]\n") + f.write("important_commands = [\"pacman -Syu\"]\n\n") f.write("[home]\n") f.write("snapshot = True\n") f.write("desc_limit = 3\n") f.write("post_description = a really long description\n") + f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n\n") + f.write("[myconfig]\n") + f.write("snapshot = True\n") + f.write("cleanup_algorithm = timeline\n") + f.write("important_packages = [\"linux\", \"linux-lts\"]\n") + f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n") name = f.name - config2 = setup_config_parser(name, "foo", ["bar"]) - assert config == config2 + config_processor = ConfigProcessor(name, command, packages, snapshot_type) + assert config_processor(section) == result def test_get_pre_number_pre(prefile): @@ -112,50 +119,3 @@ def test_get_pre_number_post(prefile): def test_no_prefile(): with pytest.raises(FileNotFoundError): get_pre_number("post", Path("/tmp/foo-pre-file-not-found")) - - -@pytest.mark.parametrize("snapshot_type, description", [("pre", "foo"), ("post", "a r")]) -def test_get_description(snapshot_type, description, config): - assert get_description(snapshot_type, config, "home") == description - - -def test_important_commands(): - parent_cmd = "pacman -Syu" - with tempfile.NamedTemporaryFile("w", delete=False) as f: - f.write("[DEFAULT]\n") - f.write("important_commands = [\"pacman -Syu\"]\n") - name = f.name - config = setup_config_parser(name, parent_cmd, ["bar"]) - important = check_important_commands(config, "root", parent_cmd) - assert important - - -def test_important_packages(): - packages = ["bar", "linux", "vim"] - with tempfile.NamedTemporaryFile("w", delete=False) as f: - f.write("[DEFAULT]\n") - f.write("important_packages = [\"linux\"]\n") - name = f.name - config = setup_config_parser(name, "pacman -S", packages) - important = check_important_packages(config, "root", packages) - assert important - - -def test_load_userdata(): - with tempfile.NamedTemporaryFile("w", delete=False) as f: - f.write("[DEFAULT]\n") - f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n") - name = f.name - config = setup_config_parser(name, "pacman -Syu", ["bar"]) - userdata = get_userdata(config, "root", False) - assert userdata == "foo=bar,requestid=42" - - -def test_load_userdata_and_important(): - with tempfile.NamedTemporaryFile("w", delete=False) as f: - f.write("[DEFAULT]\n") - f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n") - name = f.name - config = setup_config_parser(name, "pacman -Syu", ["bar"]) - userdata = get_userdata(config, "root", True) - assert userdata == "foo=bar,important=yes,requestid=42"