mirror of
https://github.com/yavook/kiwi-scp.git
synced 2024-11-24 13:43:01 +00:00
pydantic + pytest for config.version, config.storage and config.network
This commit is contained in:
parent
243fee3f23
commit
b568e449f5
3 changed files with 248 additions and 18 deletions
|
@ -1,11 +1,10 @@
|
||||||
# system
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
#############
|
#############
|
||||||
# REGEX PARTS
|
# REGEX PARTS
|
||||||
|
|
||||||
# regex part for a number with no leading zeroes
|
# regex part for a number with no leading zeroes
|
||||||
_RE_NUMBER: str = r"[0-9]|[1-9][0-9]*"
|
_RE_NUMBER: str = r"(?:0|[1-9][0-9]*)"
|
||||||
|
|
||||||
# regex for a semantic version string
|
# regex for a semantic version string
|
||||||
RE_SEMVER = rf"^{_RE_NUMBER}(?:\.{_RE_NUMBER}(?:\.{_RE_NUMBER})?)?$"
|
RE_SEMVER = rf"^{_RE_NUMBER}(?:\.{_RE_NUMBER}(?:\.{_RE_NUMBER})?)?$"
|
||||||
|
|
|
@ -26,6 +26,19 @@ class _Storage(BaseModel):
|
||||||
|
|
||||||
return {"directory": str(self.directory)}
|
return {"directory": str(self.directory)}
|
||||||
|
|
||||||
|
@root_validator(pre=True)
|
||||||
|
@classmethod
|
||||||
|
def unify_storage(cls, values) -> Dict[str, Any]:
|
||||||
|
"""parse different storage notations"""
|
||||||
|
|
||||||
|
if "directory" in values:
|
||||||
|
# default format
|
||||||
|
return values
|
||||||
|
|
||||||
|
else:
|
||||||
|
# undefined format
|
||||||
|
raise ValueError("Invalid Storage Format")
|
||||||
|
|
||||||
|
|
||||||
class _Project(BaseModel):
|
class _Project(BaseModel):
|
||||||
"""a project subsection"""
|
"""a project subsection"""
|
||||||
|
@ -46,6 +59,21 @@ class _Project(BaseModel):
|
||||||
result["override_storage"] = self.override_storage.kiwi_dict
|
result["override_storage"] = self.override_storage.kiwi_dict
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@validator("override_storage", pre=True)
|
||||||
|
@classmethod
|
||||||
|
def unify_storage(cls, value):
|
||||||
|
if value is None or isinstance(value, dict):
|
||||||
|
return value
|
||||||
|
|
||||||
|
elif isinstance(value, str):
|
||||||
|
return {"directory": value}
|
||||||
|
|
||||||
|
elif isinstance(value, list) and len(value) == 1:
|
||||||
|
return {"directory": value[0]}
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid Storage Format")
|
||||||
|
|
||||||
@root_validator(pre=True)
|
@root_validator(pre=True)
|
||||||
@classmethod
|
@classmethod
|
||||||
def unify_project(cls, values) -> Dict[str, Any]:
|
def unify_project(cls, values) -> Dict[str, Any]:
|
||||||
|
@ -67,7 +95,7 @@ class _Project(BaseModel):
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# undefined format
|
# undefined format
|
||||||
raise ValueError
|
raise ValueError("Invalid Project Format")
|
||||||
|
|
||||||
|
|
||||||
class _Network(BaseModel):
|
class _Network(BaseModel):
|
||||||
|
@ -225,10 +253,15 @@ class Config(BaseModel):
|
||||||
def unify_environment(cls, value) -> Dict[str, Optional[str]]:
|
def unify_environment(cls, value) -> Dict[str, Optional[str]]:
|
||||||
"""parse different environment notations"""
|
"""parse different environment notations"""
|
||||||
|
|
||||||
def parse_str(var_val: str) -> (str, Optional[str]):
|
def parse_str(var_val: Any) -> (str, Optional[str]):
|
||||||
"""parse a "<variable>=<value>" string"""
|
"""parse a "<variable>=<value>" string"""
|
||||||
|
|
||||||
idx = var_val.find("=")
|
try:
|
||||||
|
idx = str(var_val).find("=")
|
||||||
|
except Exception:
|
||||||
|
# undefined format
|
||||||
|
raise ValueError("Invalid Environment Format")
|
||||||
|
|
||||||
if idx == -1:
|
if idx == -1:
|
||||||
# don't split, just define the variable
|
# don't split, just define the variable
|
||||||
return var_val, None
|
return var_val, None
|
||||||
|
@ -249,13 +282,8 @@ class Config(BaseModel):
|
||||||
|
|
||||||
result: Dict[str, Optional[str]] = {}
|
result: Dict[str, Optional[str]] = {}
|
||||||
for item in value:
|
for item in value:
|
||||||
try:
|
key, value = parse_str(item)
|
||||||
key, value = parse_str(str(item))
|
result[key] = value
|
||||||
result[key] = value
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
# undefined format
|
|
||||||
raise ValueError("Invalid Environment Format")
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -263,10 +291,40 @@ class Config(BaseModel):
|
||||||
# any other format (try to coerce to str first)
|
# any other format (try to coerce to str first)
|
||||||
# string format (single variable):
|
# string format (single variable):
|
||||||
# "<var>=<value>"
|
# "<var>=<value>"
|
||||||
try:
|
key, value = parse_str(value)
|
||||||
key, value = parse_str(str(value))
|
return {key: value}
|
||||||
return {key: value}
|
|
||||||
|
@validator("storage", pre=True)
|
||||||
|
@classmethod
|
||||||
|
def unify_storage(cls, value):
|
||||||
|
"""parse different storage notations"""
|
||||||
|
|
||||||
|
if value is None:
|
||||||
|
raise ValueError("No Storage Given")
|
||||||
|
|
||||||
|
elif isinstance(value, dict):
|
||||||
|
return value
|
||||||
|
|
||||||
|
elif isinstance(value, str):
|
||||||
|
return {"directory": value}
|
||||||
|
|
||||||
|
elif isinstance(value, list) and len(value) == 1:
|
||||||
|
return {"directory": value[0]}
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid Storage Format")
|
||||||
|
|
||||||
|
@validator("network", pre=True)
|
||||||
|
@classmethod
|
||||||
|
def unify_network(cls, value):
|
||||||
|
"""parse different network notations"""
|
||||||
|
|
||||||
|
if value is None:
|
||||||
|
raise ValueError("No Network Given")
|
||||||
|
|
||||||
|
elif isinstance(value, dict):
|
||||||
|
return value
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid Network Format")
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
# undefined format
|
|
||||||
raise ValueError("Invalid Environment Format")
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ from kiwi_scp.config import Config
|
||||||
|
|
||||||
class UnCoercible:
|
class UnCoercible:
|
||||||
"""A class that doesn't have a string representation"""
|
"""A class that doesn't have a string representation"""
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
raise ValueError
|
raise ValueError
|
||||||
|
|
||||||
|
@ -28,6 +29,53 @@ def test_default():
|
||||||
assert c.network.cidr == IPv4Network("10.22.46.0/24")
|
assert c.network.cidr == IPv4Network("10.22.46.0/24")
|
||||||
|
|
||||||
|
|
||||||
|
#########
|
||||||
|
# VERSION
|
||||||
|
#########
|
||||||
|
|
||||||
|
def test_version_valid():
|
||||||
|
c = Config(version="0.0.0")
|
||||||
|
|
||||||
|
assert c.version == "0.0.0"
|
||||||
|
|
||||||
|
c = Config(version="0.0")
|
||||||
|
|
||||||
|
assert c.version == "0.0"
|
||||||
|
|
||||||
|
c = Config(version="0")
|
||||||
|
|
||||||
|
assert c.version == "0"
|
||||||
|
|
||||||
|
c = Config(version=1.0)
|
||||||
|
|
||||||
|
assert c.version == "1.0"
|
||||||
|
|
||||||
|
c = Config(version=1)
|
||||||
|
|
||||||
|
assert c.version == "1"
|
||||||
|
|
||||||
|
|
||||||
|
def test_version_invalid():
|
||||||
|
# definitely not a version
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(version="dnaf")
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"].find("string does not match regex") != -1
|
||||||
|
assert error["type"] == "value_error.str.regex"
|
||||||
|
|
||||||
|
# barely a version
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
c = Config(version="0.0.0alpha")
|
||||||
|
print(c.version)
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"].find("string does not match regex") != -1
|
||||||
|
assert error["type"] == "value_error.str.regex"
|
||||||
|
|
||||||
|
|
||||||
########
|
########
|
||||||
# SHELLS
|
# SHELLS
|
||||||
########
|
########
|
||||||
|
@ -143,6 +191,19 @@ def test_proj_dict():
|
||||||
assert p.override_storage is None
|
assert p.override_storage is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_proj_invalid_dict():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(projects={
|
||||||
|
"random key 1": "random value 1",
|
||||||
|
"random key 2": "random value 2",
|
||||||
|
})
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "Invalid Project Format"
|
||||||
|
assert error["type"] == "value_error"
|
||||||
|
|
||||||
|
|
||||||
def test_proj_coercible():
|
def test_proj_coercible():
|
||||||
c = Config(projects="project")
|
c = Config(projects="project")
|
||||||
|
|
||||||
|
@ -267,3 +328,115 @@ def test_env_uncoercible():
|
||||||
error = exc_info.value.errors()[0]
|
error = exc_info.value.errors()[0]
|
||||||
assert error["msg"] == "Invalid Environment Format"
|
assert error["msg"] == "Invalid Environment Format"
|
||||||
assert error["type"] == "value_error"
|
assert error["type"] == "value_error"
|
||||||
|
|
||||||
|
|
||||||
|
#########
|
||||||
|
# STORAGE
|
||||||
|
#########
|
||||||
|
|
||||||
|
def test_storage_empty():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(storage=None)
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "No Storage Given"
|
||||||
|
assert error["type"] == "value_error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_storage_dict():
|
||||||
|
c = Config(storage={"directory": "/test/directory"})
|
||||||
|
|
||||||
|
assert c.storage.directory == Path("/test/directory")
|
||||||
|
|
||||||
|
|
||||||
|
def test_storage_invalid_dict():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(storage={"random key": "random value"})
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "Invalid Storage Format"
|
||||||
|
assert error["type"] == "value_error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_storage_str():
|
||||||
|
c = Config(storage="/test/directory")
|
||||||
|
|
||||||
|
assert c.storage.directory == Path("/test/directory")
|
||||||
|
|
||||||
|
|
||||||
|
def test_storage_list():
|
||||||
|
c = Config(storage=["/test/directory"])
|
||||||
|
|
||||||
|
assert c.storage.directory == Path("/test/directory")
|
||||||
|
|
||||||
|
|
||||||
|
def test_storage_invalid():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(storage=True)
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "Invalid Storage Format"
|
||||||
|
assert error["type"] == "value_error"
|
||||||
|
|
||||||
|
|
||||||
|
#########
|
||||||
|
# NETWORK
|
||||||
|
#########
|
||||||
|
|
||||||
|
def test_network_empty():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(network=None)
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "No Network Given"
|
||||||
|
assert error["type"] == "value_error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_network_dict():
|
||||||
|
c = Config(network={
|
||||||
|
"name": "test_hub",
|
||||||
|
"cidr": "1.2.3.4/32",
|
||||||
|
})
|
||||||
|
|
||||||
|
assert c == Config(network={
|
||||||
|
"name": "TEST_HUB",
|
||||||
|
"cidr": "1.2.3.4/32",
|
||||||
|
})
|
||||||
|
|
||||||
|
assert c.network.name == "test_hub"
|
||||||
|
assert c.network.cidr == IPv4Network("1.2.3.4/32")
|
||||||
|
|
||||||
|
|
||||||
|
def test_network_invalid_dict():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(network={"name": "test_hub"})
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "field required"
|
||||||
|
assert error["type"] == "value_error.missing"
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(network={
|
||||||
|
"name": "test_hub",
|
||||||
|
"cidr": "1.2.3.4/123",
|
||||||
|
})
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "value is not a valid IPv4 network"
|
||||||
|
assert error["type"] == "value_error.ipv4network"
|
||||||
|
|
||||||
|
|
||||||
|
def test_network_invalid():
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Config(network=True)
|
||||||
|
|
||||||
|
assert len(exc_info.value.errors()) == 1
|
||||||
|
error = exc_info.value.errors()[0]
|
||||||
|
assert error["msg"] == "Invalid Network Format"
|
||||||
|
assert error["type"] == "value_error"
|
||||||
|
|
Loading…
Reference in a new issue