Create CommandParser

This commit is contained in:
Wes Barnett 2021-03-26 21:22:19 -04:00
parent 559a8d7b54
commit 73f683533f
No known key found for this signature in database
GPG key ID: 1070BCC98C18BD66
2 changed files with 104 additions and 138 deletions

View file

@ -59,6 +59,66 @@ class SnapperCmd:
return " ".join(self.cmd)
class ConfigProcessor:
def __init__(self, ini_file, parent_cmd, packages, snapshot_type):
"""Set up defaults for snap-pac configuration."""
self.parent_cmd = parent_cmd
self.packages = packages
self.snapshot_type = snapshot_type
self.config = ConfigParser()
self.config["DEFAULT"] = {
"snapshot": False,
"cleanup_algorithm": "number",
"pre_description": parent_cmd,
"post_description": " ".join(packages),
"desc_limit": 72,
"important_packages": [],
"important_commands": [],
"userdata": []
}
self.config["root"] = {
"snapshot": True
}
self.config.read(ini_file)
def get_cleanup_algorithm(self, section):
return self.config.get(section, "cleanup_algorithm")
def get_description(self, section):
desc_limit = self.config.getint(section, "desc_limit")
return self.config.get(section, f"{self.snapshot_type}_description")[:desc_limit]
def check_important_commands(self, section):
return self.parent_cmd in json.loads(self.config.get(section, "important_commands"))
def check_important_packages(self, section):
important_packages = json.loads(self.config.get(section, "important_packages"))
return any(x in important_packages for x in self.packages)
def check_important(self, section):
return (self.check_important_commands(section) or
self.check_important_packages(section))
def get_userdata(self, section):
userdata = set(json.loads(self.config.get(section, "userdata")))
if self.check_important(section):
userdata.add("important=yes")
return ",".join(sorted(list(userdata)))
def __call__(self, section):
if section not in self.config:
self.config.add_section(section)
return {
"description": self.get_description(section),
"cleanup_algorithm": self.get_cleanup_algorithm(section),
"userdata": self.get_userdata(section),
"snapshot": self.config.getboolean(section, "snapshot")
}
def get_snapper_configs(conf_file):
"""Get the snapper configurations."""
for line in conf_file.read_text().split("\n"):
@ -67,32 +127,6 @@ def get_snapper_configs(conf_file):
return line[1].lstrip("\"").split()
def setup_config_parser(ini_file, parent_cmd, packages):
"""Set up defaults for snap-pac configuration."""
config = ConfigParser()
config["DEFAULT"] = {
"snapshot": False,
"cleanup_algorithm": "number",
"pre_description": parent_cmd,
"post_description": " ".join(packages),
"desc_limit": 72,
"important_packages": [],
"important_commands": [],
"userdata": []
}
config["root"] = {
"snapshot": True
}
config.read(ini_file)
return config
def get_description(snapshot_type, config, section):
desc_limit = config.getint(section, "desc_limit")
return config.get(section, f"{snapshot_type}_description")[:desc_limit]
def get_pre_number(snapshot_type, prefile):
if snapshot_type == "pre":
pre_number = None
@ -106,23 +140,6 @@ def get_pre_number(snapshot_type, prefile):
return pre_number
def check_important_commands(config, snapper_config, parent_cmd):
important_commands = json.loads(config.get(snapper_config, "important_commands"))
return parent_cmd in important_commands
def check_important_packages(config, snapper_config, packages):
important_packages = json.loads(config.get(snapper_config, "important_packages"))
return any(x in important_packages for x in packages)
def get_userdata(config, snapper_config, important):
userdata = set(json.loads(config.get(snapper_config, "userdata")))
if important:
userdata.add("important=yes")
return ",".join(sorted(list(userdata)))
def check_skip():
return os.getenv("SNAP_PAC_SKIP", "n").lower() in ["y", "yes", "true", "1"]
@ -151,30 +168,19 @@ if __name__ == "__main__":
parent_cmd = os.popen(f"ps -p {os.getppid()} -o args=").read().strip()
packages = [line.rstrip("\n") for line in sys.stdin]
config = setup_config_parser(snap_pac_ini, parent_cmd, packages)
config_processor = ConfigProcessor(snap_pac_ini, parent_cmd, packages, snapshot_type)
snapper_configs = get_snapper_configs(snapper_conf_file)
chroot = os.stat("/") != os.stat("/proc/1/root/.")
tmpdir = Path(tempfile.gettempdir())
for snapper_config in snapper_configs:
if snapper_config not in config:
config.add_section(snapper_config)
if config.getboolean(snapper_config, "snapshot"):
data = config_processor(snapper_config)
if data["snapshot"]:
prefile = tmpdir / f"snap-pac-pre_{snapper_config}"
cleanup_algorithm = config.get(snapper_config, "cleanup_algorithm")
description = get_description(snapshot_type, config, snapper_config)
pre_number = get_pre_number(snapshot_type, prefile)
important = (check_important_commands(config, snapper_config, parent_cmd) or
check_important_packages(config, snapper_config, packages))
userdata = get_userdata(config, snapper_config, important)
snapper_cmd = SnapperCmd(snapper_config, snapshot_type, cleanup_algorithm,
description, chroot, pre_number, userdata)
snapper_cmd = SnapperCmd(snapper_config, snapshot_type, data["cleanup_algorithm"],
data["description"], chroot, pre_number, data["userdata"])
num = snapper_cmd()
logging.info(f"==> {snapper_config}: {num}")

View file

@ -1,38 +1,10 @@
from configparser import ConfigParser
import tempfile
from pathlib import Path
import os
import pytest
from scripts.snap_pac import (
SnapperCmd, check_important_commands, check_important_packages, check_skip, get_pre_number, get_snapper_configs,
get_userdata, setup_config_parser, get_description
)
@pytest.fixture
def config():
config = ConfigParser()
config["DEFAULT"] = {
"snapshot": False,
"cleanup_algorithm": "number",
"pre_description": "foo",
"post_description": "bar",
"desc_limit": 72,
"important_packages": [],
"important_commands": [],
"userdata": []
}
config["root"] = {
"snapshot": True
}
config["home"] = {
"snapshot": True,
"desc_limit": 3,
"post_description": "a really long description"
}
return config
from scripts.snap_pac import SnapperCmd, ConfigProcessor, check_skip, get_pre_number, get_snapper_configs
@pytest.fixture
@ -90,15 +62,50 @@ def test_skip_snap_pac():
assert check_skip() is True
def test_setup_config_parser(config):
@pytest.mark.parametrize("section, command, packages, snapshot_type, result", [
(
"root", "foo", ["bar"], "pre",
{"description": "foo", "cleanup_algorithm": "number", "userdata": "", "snapshot": True}
),
(
"root", "pacman -Syu", [], "pre",
{"description": "pacman -Syu", "cleanup_algorithm": "number", "userdata": "important=yes", "snapshot": True}
),
(
"mail", "pacman -Syu", [], "pre",
{"description": "pacman -Syu", "cleanup_algorithm": "number", "userdata": "", "snapshot": False}
),
(
"home", "pacman -Syu", [], "pre",
{"description": "pac", "cleanup_algorithm": "number", "userdata": "foo=bar,requestid=42", "snapshot": True}
),
(
"home", "pacman -Syu", [], "post",
{"description": "a r", "cleanup_algorithm": "number", "userdata": "foo=bar,requestid=42", "snapshot": True}
),
(
"myconfig", "pacman -S linux", ["linux"], "post",
{"description": "linux", "cleanup_algorithm": "timeline",
"userdata": "foo=bar,important=yes,requestid=42", "snapshot": True}
),
])
def test_config_processor(section, command, packages, snapshot_type, result):
with tempfile.NamedTemporaryFile("w", delete=False) as f:
f.write("[root]\n")
f.write("important_commands = [\"pacman -Syu\"]\n\n")
f.write("[home]\n")
f.write("snapshot = True\n")
f.write("desc_limit = 3\n")
f.write("post_description = a really long description\n")
f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n\n")
f.write("[myconfig]\n")
f.write("snapshot = True\n")
f.write("cleanup_algorithm = timeline\n")
f.write("important_packages = [\"linux\", \"linux-lts\"]\n")
f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n")
name = f.name
config2 = setup_config_parser(name, "foo", ["bar"])
assert config == config2
config_processor = ConfigProcessor(name, command, packages, snapshot_type)
assert config_processor(section) == result
def test_get_pre_number_pre(prefile):
@ -112,50 +119,3 @@ def test_get_pre_number_post(prefile):
def test_no_prefile():
with pytest.raises(FileNotFoundError):
get_pre_number("post", Path("/tmp/foo-pre-file-not-found"))
@pytest.mark.parametrize("snapshot_type, description", [("pre", "foo"), ("post", "a r")])
def test_get_description(snapshot_type, description, config):
assert get_description(snapshot_type, config, "home") == description
def test_important_commands():
parent_cmd = "pacman -Syu"
with tempfile.NamedTemporaryFile("w", delete=False) as f:
f.write("[DEFAULT]\n")
f.write("important_commands = [\"pacman -Syu\"]\n")
name = f.name
config = setup_config_parser(name, parent_cmd, ["bar"])
important = check_important_commands(config, "root", parent_cmd)
assert important
def test_important_packages():
packages = ["bar", "linux", "vim"]
with tempfile.NamedTemporaryFile("w", delete=False) as f:
f.write("[DEFAULT]\n")
f.write("important_packages = [\"linux\"]\n")
name = f.name
config = setup_config_parser(name, "pacman -S", packages)
important = check_important_packages(config, "root", packages)
assert important
def test_load_userdata():
with tempfile.NamedTemporaryFile("w", delete=False) as f:
f.write("[DEFAULT]\n")
f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n")
name = f.name
config = setup_config_parser(name, "pacman -Syu", ["bar"])
userdata = get_userdata(config, "root", False)
assert userdata == "foo=bar,requestid=42"
def test_load_userdata_and_important():
with tempfile.NamedTemporaryFile("w", delete=False) as f:
f.write("[DEFAULT]\n")
f.write("userdata = [\"foo=bar\", \"requestid=42\"]\n")
name = f.name
config = setup_config_parser(name, "pacman -Syu", ["bar"])
userdata = get_userdata(config, "root", True)
assert userdata == "foo=bar,important=yes,requestid=42"