diff --git a/mc_openapi/doml_mc/common_reqs.py b/mc_openapi/doml_mc/common_reqs.py new file mode 100644 index 0000000000000000000000000000000000000000..2dcfa00a060bc258e190a1288b9767402d5eaf8b --- /dev/null +++ b/mc_openapi/doml_mc/common_reqs.py @@ -0,0 +1,192 @@ +from z3 import ( + Consts, ExprRef, + ForAll, Exists, Implies, And, Or +) + +from .imc import ( + SMTEncoding, SMTSorts, Requirement, RequirementStore +) + + +def get_consts(smtsorts: SMTSorts, consts: list[str]) -> list[ExprRef]: + return Consts(" ".join(consts), smtsorts.element_sort) + + +def vm_iface(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: + vm, iface = get_consts(smtsorts, ["vm", "iface"]) + return ForAll( + [vm], + Implies( + smtenc.element_class_fun(vm) == smtenc.classes["infrastructure_VirtualMachine"], + Exists( + [iface], + And( + smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], iface) + ) + ) + ) + ) + + +def software_package_iface_net(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: + asc_consumer, asc_exposer, siface, net, net_iface, cnode, cdeployment, enode, edeployment, vm, dc = get_consts( + smtsorts, + ["asc_consumer", "asc_exposer", "siface", "net", "net_iface", "cnode", "cdeployment", "enode", "edeployment", "vm", "dc"] + ) + return ForAll( + [asc_consumer, asc_exposer, siface], + Implies( + And( + smtenc.association_rel(asc_consumer, smtenc.associations["application_SoftwareComponent::exposedInterfaces"], siface), + smtenc.association_rel(asc_exposer, smtenc.associations["application_SoftwareComponent::consumedInterfaces"], siface), + ), + Exists( + [cdeployment, cnode, edeployment, enode, net], + And( + smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::component"], asc_consumer), + smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::node"], cnode), + Exists( + [vm, net_iface], + Or( + And( # asc_consumer is deployed on a component with an interface in network n + smtenc.association_rel(cnode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), + smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), + ), + And( # asc_consumer is deployed on a container hosted in a VM with an interface in network n + smtenc.association_rel(cnode, smtenc.associations["infrastructure_Container::hosts"], vm), + smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), + smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), + ), + And( # asc_consumer is deployed on a VM in an AutoScalingGroup with an interface in network n + smtenc.association_rel(cnode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm), + smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), + smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), + ), + ) + ), + smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::component"], asc_exposer), + smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::node"], enode), + Exists( + [vm, net_iface], + Or( + And( # asc_exposer is deployed on a component with an interface in network n + smtenc.association_rel(enode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), + smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), + ), + And( # asc_exposer is deployed on a container hosted on a VM with an interface in network n + smtenc.association_rel(enode, smtenc.associations["infrastructure_Container::hosts"], vm), + smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), + smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), + ), + And( # asc_exposer is deployed on a VM in an AutoScalingGroup with an interface in network n + smtenc.association_rel(enode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm), + smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), + smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), + ), + ) + ) + ) + ) + ) + ) + + +def iface_uniq(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: + def any_iface(elem, iface): + ifaces_assocs = [ + "infrastructure_ComputingNode::ifaces", + "infrastructure_Storage::ifaces", + "infrastructure_FunctionAsAService::ifaces" + ] + return Or(*(smtenc.association_rel(elem, smtenc.associations[assoc_name], iface) for assoc_name in ifaces_assocs)) + + e1, e2, ni = get_consts(smtsorts, ["e1", "e2", "i"]) + return ForAll( + [e1, e2, ni], + Implies( + And(any_iface(e1, ni), any_iface(e2, ni)), + e1 == e2 + ) + ) + + +def all_SoftwareComponents_deployed(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: + sc, deployment, ielem = get_consts(smtsorts, ["sc", "deployment", "ielem"]) + return ForAll( + [sc], + Implies( + smtenc.element_class_fun(sc) == smtenc.classes["application_SoftwareComponent"], + Exists( + [deployment, ielem], + And( + smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::component"], sc), + smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::node"], ielem), + ) + ) + ) + ) + + +def all_infrastructure_elements_deployed(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: + def checkOneClass(ielem, concr, provider, celem, ielemClass, providerAssoc, celemAssoc): + return Implies( + smtenc.element_class_fun(ielem) == smtenc.classes[ielemClass], + Exists( + [provider, celem], + And( + smtenc.association_rel(concr, smtenc.associations["concrete_ConcreteInfrastructure::providers"], provider), + smtenc.association_rel(provider, smtenc.associations[providerAssoc], celem), + smtenc.association_rel(celem, smtenc.associations[celemAssoc], ielem) + ) + ) + ) + + ielem, concr, provider, celem = get_consts(smtsorts, ["ielem", "concr", "provider", "celem"]) + return Exists( + [concr], + And( + smtenc.element_class_fun(concr) == smtenc.classes["concrete_ConcreteInfrastructure"], + ForAll( + [ielem], + And( + checkOneClass( + ielem, concr, provider, celem, + "infrastructure_VirtualMachine", + "concrete_RuntimeProvider::vms", + "concrete_VirtualMachine::maps" + ), + checkOneClass( + ielem, concr, provider, celem, + "infrastructure_Network", + "concrete_RuntimeProvider::networks", + "concrete_Network::maps" + ), + checkOneClass( + ielem, concr, provider, celem, + "infrastructure_Storage", + "concrete_RuntimeProvider::storages", + "concrete_Storage::maps" + ), + checkOneClass( + ielem, concr, provider, celem, + "infrastructure_FunctionAsAService", + "concrete_RuntimeProvider::faas", + "concrete_FunctionAsAService::maps" + ), + ) + ) + ) + ) + + +CommonRequirements = RequirementStore( + [ + Requirement(*rt) for rt in [ + (vm_iface, "vm_iface", "All virtual machines must be connected to at least one network interface.", "A virtual machine is connected to no network interface."), + (software_package_iface_net, "software_package_iface_net", "All software packages can see the interfaces they need through a common network.", "A software package is deployed on a node that has no access to an interface it consumes."), + (iface_uniq, "iface_uniq", "There are no duplicated interfaces.", "There is a duplicated interface."), + (all_SoftwareComponents_deployed, "all_SoftwareComponents_deployed", "All software components have been deployed to some node.", "A software component has not been deployed to any node."), + (all_infrastructure_elements_deployed, "all_infrastructure_elements_deployed", "All abstract infrastructure elements are mapped to an element in the active concretization.", "An abstract infrastructure element has not been mapped to any element in the active concretization."), + ] + ] +) diff --git a/mc_openapi/doml_mc/imc.py b/mc_openapi/doml_mc/imc.py new file mode 100644 index 0000000000000000000000000000000000000000..2fc22ad7932debaeefc553405e9695286e5a70a3 --- /dev/null +++ b/mc_openapi/doml_mc/imc.py @@ -0,0 +1,201 @@ +from collections.abc import Callable +from dataclasses import dataclass + +from z3 import ( + Context, CheckSatResult, FuncDeclRef, Solver, ExprRef, SortRef, DatatypeSortRef, + sat, unsat, unknown +) + +from .intermediate_model.doml_element import IntermediateModel +from .z3encoding.im_encoding import ( + assert_im_associations_q, assert_im_attributes, + def_elem_class_f_and_assert_classes, + mk_elem_sort_dict, mk_stringsym_sort_dict +) +from .z3encoding.metamodel_encoding import ( + def_association_rel, + assert_association_rel_constraints, + def_attribute_rel, + assert_attribute_rel_constraints, + mk_association_sort_dict, + mk_attribute_sort_dict, mk_class_sort_dict +) +from .z3encoding.types import Refs +from .z3encoding.utils import mk_adata_sort + + +@dataclass +class SMTEncoding: + classes: Refs + associations: Refs + attributes: Refs + elements: Refs + str_symbols: Refs + element_class_fun: FuncDeclRef + attribute_rel: FuncDeclRef + association_rel: FuncDeclRef + + +@dataclass +class SMTSorts: + class_sort: SortRef + association_sort: SortRef + attribute_sort: SortRef + element_sort: SortRef + str_symbols_sort: SortRef + attr_data_sort: DatatypeSortRef + + +@dataclass +class Requirement: + assert_callable: Callable[[SMTEncoding, SMTSorts], list[ExprRef]] + assert_name: str + description: str + error_description: str + + +class RequirementStore: + def __init__(self, requirements: list[Requirement]): + self.requirements = requirements + pass + + def get_all_requirements(self) -> list[Requirement]: + return self.requirements + + def get_num_requirements(self) -> int: + return len(self.get_all_requirements()) + + def get_one_requirement(self, index: int) -> Requirement: + return self.get_all_requirements()[index] + + +class IntermediateModelChecker: + def __init__(self, metamodel, inv_assoc, intermediate_model: IntermediateModel): + def instantiate_solver(): + self.z3Context = Context() + self.solver = Solver(ctx=self.z3Context) + + class_sort, class_ = mk_class_sort_dict(self.metamodel, self.z3Context) + assoc_sort, assoc = mk_association_sort_dict(self.metamodel, self.z3Context) + attr_sort, attr = mk_attribute_sort_dict(self.metamodel, self.z3Context) + elem_sort, elem = mk_elem_sort_dict(self.intermediate_model, self.z3Context) + ss_sort, ss = mk_stringsym_sort_dict(self.intermediate_model, self.metamodel, self.z3Context) + AData = mk_adata_sort(ss_sort, self.z3Context) + elem_class_f = def_elem_class_f_and_assert_classes( + self.intermediate_model, + self.solver, + elem_sort, + elem, + class_sort, + class_ + ) + attr_rel = def_attribute_rel( + attr_sort, + elem_sort, + AData + ) + assert_im_attributes( + attr_rel, + self.solver, + self.intermediate_model, + self.metamodel, + elem, + attr_sort, + attr, + AData, + ss + ) + assoc_rel = def_association_rel( + assoc_sort, + elem_sort + ) + assert_im_associations_q( + assoc_rel, + self.solver, + {k: v for k, v in self.intermediate_model.items()}, + elem, + assoc_sort, + assoc, + ) + self.smt_encoding = SMTEncoding( + class_, + assoc, + attr, + elem, + ss, + elem_class_f, + attr_rel, + assoc_rel + ) + self.smt_sorts = SMTSorts( + class_sort, + assoc_sort, + attr_sort, + elem_sort, + ss_sort, + AData + ) + + self.metamodel = metamodel + self.inv_assoc = inv_assoc + self.intermediate_model = intermediate_model + instantiate_solver() + + def check_consistency_constraints(self) -> tuple[CheckSatResult, str]: + self.solver.push() + self.assert_consistency_constraints() + res = self.solver.check() + self.solver.pop() + if res == unsat: + return unsat, "The DOML model is inconsistent." + else: + return res, "" + + def check_requirements(self, reqs: RequirementStore) -> tuple[CheckSatResult, str]: + unsat_msgs = [] + some_dontknow = False + + for req in reqs.get_all_requirements(): + self.solver.push() + self.solver.assert_and_track( + req.assert_callable(self.smt_encoding, self.smt_sorts), + req.assert_name + ) + res = self.solver.check() + if res == unsat: + unsat_msgs.append(req.error_description) + elif res == unknown: + some_dontknow = True + self.solver.pop() + + if unsat_msgs: + if some_dontknow: + unsat_msgs.append("Unable to check some requirements.") + return unsat, " ".join(unsat_msgs) + elif some_dontknow: + return unknown, "Unable to check some requirements." + else: + return sat, "All requirements satisfied." + + def assert_consistency_constraints(self): + assert_attribute_rel_constraints( + self.metamodel, + self.solver, + self.smt_encoding.attribute_rel, + self.smt_encoding.attributes, + self.smt_encoding.classes, + self.smt_encoding.element_class_fun, + self.smt_sorts.element_sort, + self.smt_sorts.attr_data_sort, + self.smt_encoding.str_symbols + ) + assert_association_rel_constraints( + self.metamodel, + self.solver, + self.smt_encoding.association_rel, + self.smt_encoding.associations, + self.smt_encoding.classes, + self.smt_encoding.element_class_fun, + self.smt_sorts.element_sort, + self.inv_assoc + ) diff --git a/mc_openapi/doml_mc/mc.py b/mc_openapi/doml_mc/mc.py index 075101a9e76534c3bdd52f229784bd11e33de24c..239dc95230c10e959e8b25d9d0860c221c03c8bb 100644 --- a/mc_openapi/doml_mc/mc.py +++ b/mc_openapi/doml_mc/mc.py @@ -1,14 +1,7 @@ import importlib.resources as ilres - import yaml -from dataclasses import dataclass from joblib import parallel_backend, Parallel, delayed - -from z3 import ( - Context, CheckSatResult, Consts, ExprRef, FuncDeclRef, Solver, SortRef, DatatypeSortRef, - ForAll, Exists, Implies, And, Or, - sat, unsat, unknown -) +from z3 import CheckSatResult, sat, unsat, unknown from .. import assets from .intermediate_model.doml_element import ( @@ -20,373 +13,8 @@ from .intermediate_model.metamodel import ( parse_metamodel ) from .xmi_parser.doml_model import parse_doml_model -from .z3encoding.im_encoding import ( - assert_im_associations_q, assert_im_attributes, - def_elem_class_f_and_assert_classes, - mk_elem_sort_dict, mk_stringsym_sort_dict -) -from .z3encoding.metamodel_encoding import ( - def_association_rel, - assert_association_rel_constraints, - def_attribute_rel, - assert_attribute_rel_constraints, - mk_association_sort_dict, - mk_attribute_sort_dict, mk_class_sort_dict -) -from .z3encoding.types import Refs -from .z3encoding.utils import mk_adata_sort - - -@dataclass -class SMTEncoding: - classes: Refs - associations: Refs - attributes: Refs - elements: Refs - str_symbols: Refs - element_class_fun: FuncDeclRef - attribute_rel: FuncDeclRef - association_rel: FuncDeclRef - - -@dataclass -class SMTSorts: - class_sort: SortRef - association_sort: SortRef - attribute_sort: SortRef - element_sort: SortRef - str_symbols_sort: SortRef - attr_data_sort: DatatypeSortRef - - -class IntermediateModelChecker: - def __init__(self, metamodel, inv_assoc, intermediate_model: IntermediateModel): - def instantiate_solver(): - self.z3Context = Context() - self.solver = Solver(ctx=self.z3Context) - - class_sort, class_ = mk_class_sort_dict(self.metamodel, self.z3Context) - assoc_sort, assoc = mk_association_sort_dict(self.metamodel, self.z3Context) - attr_sort, attr = mk_attribute_sort_dict(self.metamodel, self.z3Context) - elem_sort, elem = mk_elem_sort_dict(self.intermediate_model, self.z3Context) - ss_sort, ss = mk_stringsym_sort_dict(self.intermediate_model, self.metamodel, self.z3Context) - AData = mk_adata_sort(ss_sort, self.z3Context) - elem_class_f = def_elem_class_f_and_assert_classes( - self.intermediate_model, - self.solver, - elem_sort, - elem, - class_sort, - class_ - ) - attr_rel = def_attribute_rel( - attr_sort, - elem_sort, - AData - ) - assert_im_attributes( - attr_rel, - self.solver, - self.intermediate_model, - self.metamodel, - elem, - attr_sort, - attr, - AData, - ss - ) - assoc_rel = def_association_rel( - assoc_sort, - elem_sort - ) - assert_im_associations_q( - assoc_rel, - self.solver, - {k: v for k, v in self.intermediate_model.items()}, - elem, - assoc_sort, - assoc, - ) - self.smt_encoding = SMTEncoding( - class_, - assoc, - attr, - elem, - ss, - elem_class_f, - attr_rel, - assoc_rel - ) - self.smt_sorts = SMTSorts( - class_sort, - assoc_sort, - attr_sort, - elem_sort, - ss_sort, - AData - ) - - self.metamodel = metamodel - self.inv_assoc = inv_assoc - self.intermediate_model = intermediate_model - instantiate_solver() - - def check_consistency_constraints(self) -> tuple[CheckSatResult, str]: - self.solver.push() - self.assert_consistency_constraints() - res = self.solver.check() - self.solver.pop() - if res == unsat: - return unsat, "The DOML model is inconsistent." - else: - return res, "" - - def check_one_common_requirement(self, index: int) -> tuple[CheckSatResult, str]: - expr_thunk, assert_name, _, err_msg = self.get_common_requirements()[index] - - self.solver.push() - self.solver.assert_and_track(expr_thunk(), assert_name) - res = self.solver.check() - self.solver.pop() - if res == unsat: - return unsat, err_msg - else: - return res, "" - - def check_common_requirements(self) -> tuple[CheckSatResult, str]: - unsat_msgs = [] - some_dontknow = False - - self.solver.push() - self.assert_consistency_constraints() - res = self.solver.check() - if res == unsat: - unsat_msgs.append("The DOML model is inconsistent.") - elif res == unknown: - some_dontknow = True - self.solver.pop() - - common_requirements = self.get_common_requirements() - for expr_thunk, assert_name, _, err_msg in common_requirements: - self.solver.push() - self.solver.assert_and_track(expr_thunk(), assert_name) - res = self.solver.check() - if res == unsat: - unsat_msgs.append(err_msg) - elif res == unknown: - some_dontknow = True - self.solver.pop() - - if unsat_msgs: - if some_dontknow: - unsat_msgs.append("Unable to check some requirements.") - return unsat, " ".join(unsat_msgs) - elif some_dontknow: - return unknown, "Unable to check some requirements." - else: - return sat, "All requirements satisfied." - - def get_consts(self, consts: list[str]) -> list[ExprRef]: - return Consts(" ".join(consts), self.smt_sorts.element_sort) - - def assert_consistency_constraints(self): - assert_attribute_rel_constraints( - self.metamodel, - self.solver, - self.smt_encoding.attribute_rel, - self.smt_encoding.attributes, - self.smt_encoding.classes, - self.smt_encoding.element_class_fun, - self.smt_sorts.element_sort, - self.smt_sorts.attr_data_sort, - self.smt_encoding.str_symbols - ) - assert_association_rel_constraints( - self.metamodel, - self.solver, - self.smt_encoding.association_rel, - self.smt_encoding.associations, - self.smt_encoding.classes, - self.smt_encoding.element_class_fun, - self.smt_sorts.element_sort, - self.inv_assoc - ) - - def get_common_requirements(self): - smtenc = self.smt_encoding - - def vm_iface(): - vm, iface = self.get_consts(["vm", "iface"]) - return ForAll( - [vm], - Implies( - smtenc.element_class_fun(vm) == smtenc.classes["infrastructure_VirtualMachine"], - Exists( - [iface], - And( - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], iface) - ) - ) - ) - ) - - def software_package_iface_net(): - asc_consumer, asc_exposer, siface, net, net_iface, cnode, cdeployment, enode, edeployment, vm, dc = self.get_consts( - ["asc_consumer", "asc_exposer", "siface", "net", "net_iface", "cnode", "cdeployment", "enode", "edeployment", "vm", "dc"] - ) - return ForAll( - [asc_consumer, asc_exposer, siface], - Implies( - And( - smtenc.association_rel(asc_consumer, smtenc.associations["application_SoftwareComponent::exposedInterfaces"], siface), - smtenc.association_rel(asc_exposer, smtenc.associations["application_SoftwareComponent::consumedInterfaces"], siface), - ), - Exists( - [cdeployment, cnode, edeployment, enode, net], - And( - smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::component"], asc_consumer), - smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::node"], cnode), - Exists( - [vm, net_iface], - Or( - And( # asc_consumer is deployed on a component with an interface in network n - smtenc.association_rel(cnode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_consumer is deployed on a container hosted in a VM with an interface in network n - smtenc.association_rel(cnode, smtenc.associations["infrastructure_Container::hosts"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_consumer is deployed on a VM in an AutoScalingGroup with an interface in network n - smtenc.association_rel(cnode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - ) - ), - smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::component"], asc_exposer), - smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::node"], enode), - Exists( - [vm, net_iface], - Or( - And( # asc_exposer is deployed on a component with an interface in network n - smtenc.association_rel(enode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_exposer is deployed on a container hosted on a VM with an interface in network n - smtenc.association_rel(enode, smtenc.associations["infrastructure_Container::hosts"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_exposer is deployed on a VM in an AutoScalingGroup with an interface in network n - smtenc.association_rel(enode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - ) - ) - ) - ) - ) - ) - - def iface_uniq(): - def any_iface(elem, iface): - ifaces_assocs = [ - "infrastructure_ComputingNode::ifaces", - "infrastructure_Storage::ifaces", - "infrastructure_FunctionAsAService::ifaces" - ] - return Or(*(smtenc.association_rel(elem, smtenc.associations[assoc_name], iface) for assoc_name in ifaces_assocs)) - - e1, e2, ni = self.get_consts(["e1", "e2", "i"]) - return ForAll( - [e1, e2, ni], - Implies( - And(any_iface(e1, ni), any_iface(e2, ni)), - e1 == e2 - ) - ) - - def all_SoftwareComponents_deployed(): - sc, deployment, ielem = self.get_consts(["sc", "deployment", "ielem"]) - return ForAll( - [sc], - Implies( - smtenc.element_class_fun(sc) == smtenc.classes["application_SoftwareComponent"], - Exists( - [deployment, ielem], - And( - smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::component"], sc), - smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::node"], ielem), - ) - ) - ) - ) - - def all_infrastructure_elements_deployed(): - def checkOneClass(ielem, concr, provider, celem, ielemClass, providerAssoc, celemAssoc): - return Implies( - smtenc.element_class_fun(ielem) == smtenc.classes[ielemClass], - Exists( - [provider, celem], - And( - smtenc.association_rel(concr, smtenc.associations["concrete_ConcreteInfrastructure::providers"], provider), - smtenc.association_rel(provider, smtenc.associations[providerAssoc], celem), - smtenc.association_rel(celem, smtenc.associations[celemAssoc], ielem) - ) - ) - ) - - ielem, concr, provider, celem = self.get_consts(["ielem", "concr", "provider", "celem"]) - return Exists( - [concr], - And( - smtenc.element_class_fun(concr) == smtenc.classes["concrete_ConcreteInfrastructure"], - ForAll( - [ielem], - And( - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_VirtualMachine", - "concrete_RuntimeProvider::vms", - "concrete_VirtualMachine::maps" - ), - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_Network", - "concrete_RuntimeProvider::networks", - "concrete_Network::maps" - ), - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_Storage", - "concrete_RuntimeProvider::storages", - "concrete_Storage::maps" - ), - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_FunctionAsAService", - "concrete_RuntimeProvider::faas", - "concrete_FunctionAsAService::maps" - ), - ) - ) - ) - ) - - return [ - (vm_iface, "vm_iface", "All virtual machines must be connected to at least one network interface.", "A virtual machine is connected to no network interface."), - (software_package_iface_net, "software_package_iface_net", "All software packages can see the interfaces they need through a common network.", "A software package is deployed on a node that has no access to an interface it consumes."), - (iface_uniq, "iface_uniq", "There are no duplicated interfaces.", "There is a duplicated interface."), - (all_SoftwareComponents_deployed, "all_SoftwareComponents_deployed", "All software components have been deployed to some node.", "A software component has not been deployed to any node."), - (all_infrastructure_elements_deployed, "all_infrastructure_elements_deployed", "All abstract infrastructure elements are mapped to an element in the active concretization.", "An abstract infrastructure element has not been mapped to any element in the active concretization."), - ] - - @staticmethod - def common_requirements_number() -> int: - return 5 +from .imc import RequirementStore, IntermediateModelChecker +from .common_reqs import CommonRequirements class ModelChecker: @@ -401,23 +29,26 @@ class ModelChecker: def __init__(self, xmi_model: bytes): assert ModelChecker.metamodel and ModelChecker.inv_assoc - self.intermediate_model = parse_doml_model(xmi_model, ModelChecker.metamodel) + self.intermediate_model: IntermediateModel = parse_doml_model(xmi_model, ModelChecker.metamodel) reciprocate_inverse_associations(self.intermediate_model, ModelChecker.inv_assoc) def check_common_requirements(self, threads=1) -> tuple[CheckSatResult, str]: def worker(index: int): imc = IntermediateModelChecker(ModelChecker.metamodel, ModelChecker.inv_assoc, self.intermediate_model) if index >= 0: - return imc.check_one_common_requirement(index) + rs = RequirementStore([CommonRequirements.get_one_requirement(index)]) + return imc.check_requirements(rs) else: return imc.check_consistency_constraints() if threads <= 1: imc = IntermediateModelChecker(ModelChecker.metamodel, ModelChecker.inv_assoc, self.intermediate_model) - return imc.check_common_requirements() - - with parallel_backend('threading', n_jobs=threads): - results = Parallel()(delayed(worker)(i) for i in range(-1, IntermediateModelChecker.common_requirements_number())) + cons = imc.check_consistency_constraints() + reqs = imc.check_requirements(CommonRequirements) + results = [cons, reqs] + else: + with parallel_backend('threading', n_jobs=threads): + results = Parallel()(delayed(worker)(i) for i in range(-1, CommonRequirements.get_num_requirements())) some_unsat = any(res == unsat for res, _ in results) some_dontknow = any(res == unknown for res, _ in results)