diff --git a/mc_openapi/doml_mc/synthesis_old/__init__.py b/mc_openapi/doml_mc/synthesis_old/__init__.py deleted file mode 100644 index 5aa44e830cce8f8953cf82a58943221cf9cbb247..0000000000000000000000000000000000000000 --- a/mc_openapi/doml_mc/synthesis_old/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -import warnings -warnings.warn("the synthesis module is deprecated", DeprecationWarning, - stacklevel=2) \ No newline at end of file diff --git a/mc_openapi/doml_mc/synthesis_old/positive_common_reqs.domlr b/mc_openapi/doml_mc/synthesis_old/positive_common_reqs.domlr deleted file mode 100644 index b792c5c82e8493e6727b1802431a1cc981af1bee..0000000000000000000000000000000000000000 --- a/mc_openapi/doml_mc/synthesis_old/positive_common_reqs.domlr +++ /dev/null @@ -1,85 +0,0 @@ -+ "All VMs have iface" - forall vm ( - vm is class abstract.VirtualMachine - implies - exists iface ( - vm has abstract.ComputingNode.ifaces iface - ) - ) - --- - "X" - -+ "Concretization of Software Interfaces" - forall asc_consumer, asc_exposer, siface ( - ( - asc_consumer has application.SoftwareComponent.exposedInterfaces siface - and - asc_exposer has application.SoftwareComponent.consumedInterfaces siface - ) - implies - exists cdeployment, cnode, edeployment, enode, net ( - cdeployment has commons.Deployment.component asc_consumer - and - cdeployment has commons.Deployment.node cnode - and - exists vm, net_iface ( - ( - # asc_consumer is deployed on a component with an interface in network n - node has abstract.ComputingNode.ifaces net_iface - and - net_iface has abstract.NetworkInterface.belongsTo net - ) - or - ( - # asc_consumer is deployed on a component with an interface in network n - cnode has abstract.Container.hosts vm - and - vm has abstract.ComputingNode.ifaces net_iface - and - net_iface has abstract.NetworkInterface.belongsTo net - ) - or - ( - # asc_consumer is deployed on a VM in an AutoScalingGroup with an interface in network n - cnode has abstract.AutoScalingGroup.machineDefinition vm - and - vm has abstract.ComputingNode.ifaces net_iface - and - net_iface has abstract.NetworkInterface.belongsTo net - ) - ) - ) - and - edeployment has commons.Deployment.component asc_exposer - and - edeployment has commons.Deployment.node enode - and - exists vm, net_iface ( - ( - # asc_exposer is deployed on a component with an interface in network n - enode has abstract.ComputingNode.ifaces net_iface - and - net_iface has abstract.NetworkInterface.belongsTo net - ) - or - ( - # asc_exposer is deployed on a container hosted on a VM with an interface in network n - enode has abstract.Container.hosts vm - and - vm has abstract.ComputingNode.ifaces net_iface - and - net_iface has abstract.NetworkInterface.belongsTo net - ) - or - ( - # asc_exposer is deployed on a VM in an AutoScalingGroup with an interface in network n - enode has abstract.AutoScalingGroup.machineDefinition vm - and - vm has abstract.ComputingNode.ifaces net_iface - and - net_iface has abstract.NetworkInterface.belongsTo net - ) - ) - ) - --- - "YY" \ No newline at end of file diff --git a/mc_openapi/doml_mc/synthesis_old/synthesis.py b/mc_openapi/doml_mc/synthesis_old/synthesis.py deleted file mode 100644 index ade0858b84ba6ac2144773f2477aeca7c8d2a6f1..0000000000000000000000000000000000000000 --- a/mc_openapi/doml_mc/synthesis_old/synthesis.py +++ /dev/null @@ -1,354 +0,0 @@ - - -from dataclasses import dataclass -from itertools import product -from typing import Tuple - -from z3 import (DatatypeRef, DatatypeSortRef, Not, FuncDeclRef, Model, Solver, sat, - unsat) -from mc_openapi.doml_mc.imc import Requirement, SMTEncoding, SMTSorts -from mc_openapi.doml_mc.intermediate_model.doml_element import IntermediateModel - -from mc_openapi.doml_mc.intermediate_model.metamodel import (DOMLVersion, MetaModel, - parse_metamodel) -from mc_openapi.doml_mc.xmi_parser.doml_model import parse_doml_model -from mc_openapi.doml_mc.z3encoding.im_encoding import ( - assert_im_associations, assert_im_attributes, - def_elem_class_f_and_assert_classes, mk_attr_data_sort, mk_elem_sort_dict, - mk_stringsym_sort_dict) -from mc_openapi.doml_mc.z3encoding.metamodel_encoding import ( - def_association_rel, def_attribute_rel, mk_association_sort_dict, - mk_attribute_sort_dict, mk_class_sort_dict) -from mc_openapi.doml_mc.z3encoding.types import Refs - -# Types -Elem = Value = Tuple[str, DatatypeRef] -AssocAndElems = Tuple[Elem, DatatypeRef, Elem] -AttrAndValues = Tuple[Elem, DatatypeRef, Value] - -@dataclass -class Context: - solver: Solver - class_sort: DatatypeSortRef - class_refs: Refs - assoc_sort: DatatypeSortRef - assoc_refs: Refs - attr_sort: DatatypeSortRef - attr_refs: Refs - str_sort: DatatypeSortRef - str_refs: Refs - elem_sort: DatatypeSortRef - elem_refs: Refs - - attr_data_sort: DatatypeSortRef - - elem_class_fn: FuncDeclRef - attr_rel: FuncDeclRef - assoc_rel: FuncDeclRef - - unbound_elems: list[str] - unbound_values: Refs - -class Synthesis: - def __init__(self, - metamodel: MetaModel, - intermediate_model: IntermediateModel, - verbose: bool = False - ) -> None: - """ - Initialize the data required to synthetize a new DOML according to provided requirements. - - :param metamodel_input_file: The result of parsing the YAML metamodel file - :param domlx_input_file: A XMI DOML file read as bytes containing the model to run through the tests. - :param doml_version: The DOML version as an Enum. - """ - self.mm = metamodel - self.im = intermediate_model - self.verbose = verbose - - def _init_context( - self, - ub_elem_n : int = 0, - ub_vals_n : int = 0, - reqs : list[Requirement] = [], - user_req_strings : list[str] = [] - ) -> Context: - """Builds a Context object containing all the relationships sorts and refs. - """ - solver = Solver() - - class_sort, class_refs = mk_class_sort_dict(self.mm, solver.ctx) - assoc_sort, assoc_refs = mk_association_sort_dict(self.mm, solver.ctx) - attr_sort, attr_refs = mk_attribute_sort_dict(self.mm, solver.ctx) - str_sort, str_refs = mk_stringsym_sort_dict(self.im, self.mm, solver.ctx, user_req_strings) - attr_data_sort = mk_attr_data_sort(str_sort, solver.ctx) - - unbound_elems = [f"unbound_elem_{i}" for i in range(ub_elem_n)] - - # Takes a list of strings and creates an Enum out of 'em - elem_sort, elem_refs = mk_elem_sort_dict(self.im, solver.ctx, unbound_elems) - - unbound_values_names = [f"unbound_val_{i}" for i in range(ub_vals_n)] - unbound_values = { - name : attr_data_sort.placeholder for name in unbound_values_names - } - # Examples of values that can go in unbound_values: - # ctx["attr_data_sort"].int(42), # ok - # ctx["attr_data_sort"].bool(True), # ok - # ctx["attr_data_sort"].str("x"), # cant do: it accept a ctx["str"][<str_key>] as input - # Const("x", ctx["attr_data_sort"]) # cant do: it is a symbolic value that cannot be converted to a BoolRef expression - - elem_class_fn = def_elem_class_f_and_assert_classes( - self.im, - solver, - elem_sort, - elem_refs, - class_sort, - class_refs - ) - - # attr_rel :: (elem_sort, attr_sort, attr_data_sort) -> BoolRef - attr_rel = def_attribute_rel( - attr_sort, - elem_sort, - attr_data_sort - ) - - assert_im_attributes( - attr_rel, - solver, - self.im, - self.mm, - elem_refs, - attr_sort, - attr_refs, - attr_data_sort, - str_refs - ) - - # assoc_rel :: (elem_sort, assoc_sort, elem_sort) -> BoolRef - assoc_rel = def_association_rel( - assoc_sort, - elem_sort - ) - - assert_im_associations( - assoc_rel, - solver, - {k: v for k, v in self.im.items() if k not in unbound_elems}, - elem_refs, - assoc_sort, - assoc_refs, - ) - - context = Context( - solver, - class_sort, class_refs, - assoc_sort, assoc_refs, - attr_sort, attr_refs, - str_sort, str_refs, - elem_sort, elem_refs, - attr_data_sort, - elem_class_fn, - attr_rel, - assoc_rel, - unbound_elems, - unbound_values - ) - - - encodings = SMTEncoding( - class_refs, - assoc_refs, - attr_refs, - elem_refs, - str_refs, - elem_class_fn, - attr_rel, - assoc_rel - ) - sorts = SMTSorts( - class_sort, - assoc_sort, - attr_sort, - elem_sort, - str_sort, - attr_data_sort - ) - - # Add requirements - # TODO: Investigate whether it's possible or a good idea - # to handle each requirement individually, like in imc.py - for req in reqs: - req_fn = req.assert_callable(encodings, sorts) - req_name = req.assert_name - solver.assert_and_track(req_fn, req_name) - - return context - - def check(self, - ub_elems_n: int = 0, - ub_vals_n: int = 0, - reqs: list = [], - curr_try: int = 0, - max_tries: int = 10, - user_req_strings: list[str] = [] - ) -> Context: - if curr_try > max_tries: - raise RuntimeError("Max tries exceeded.") - - ctx = self._init_context(ub_elems_n, ub_vals_n, reqs, user_req_strings) - - res = ctx.solver.check() - - if res == sat: - if self.verbose: - print(f"<Sat>\tub_elems_n={ub_elems_n}, ubvals_n={ub_vals_n}") - return ctx - elif res == unsat: - if self.verbose: - print(f"<Unsat>\tub_elems_n={ub_elems_n}, ubvals_n={ub_vals_n}") - - # TODO: Choose which goes first in a smart way? - if ub_elems_n > ub_vals_n: - new_ub_vals_n = ub_vals_n * 2 if ub_vals_n >= 1 else 1 - return self.check(ub_elems_n, new_ub_vals_n, reqs, curr_try + 1, max_tries) - elif ub_elems_n <= ub_vals_n: - new_ub_elems_n = ub_elems_n * 2 if ub_elems_n >= 1 else 1 - return self.check(new_ub_elems_n, ub_vals_n, reqs, curr_try + 1, max_tries) - else: # res == dontknow - raise RuntimeError("It took too long to decide satifiability.") - - def get_ub_elems_and_assoc(self, ctx: Context, model: Model) -> list[AssocAndElems]: - """Returns the associations between unbound elements.""" - return [ ((elem_1_k, elem_1_v), a, (elem_2_k, elem_2_v)) - for (elem_1_k, elem_1_v), a, (elem_2_k, elem_2_v) in product(ctx.elem_refs.items(), ctx.assoc_refs.values(), ctx.elem_refs.items()) - if (elem_1_k in ctx.unbound_elems or elem_2_k in ctx.unbound_elems) and model.eval(ctx.assoc_rel(elem_1_v, a, elem_2_v)) - ] - - def get_ub_vals_and_attr(self, ctx: Context, model: Model) -> list[AttrAndValues]: - """Returns the attribute relationships between elements and attribute data/values.""" - return [ ((elem_k, elem_v), a, (ubval_k, ubval_v)) - for (elem_k, elem_v), a, (ubval_k, ubval_v) in product(ctx.elem_refs.items(), ctx.attr_refs.values(), ctx.unbound_values.items()) - if model.eval(ctx.attr_rel(elem_v, a, ubval_v)) - ] - - def pretty_ub_elems_assoc(self, assoc_elems: list[AssocAndElems]) -> str: - """Returns a string containg a human-readable name of the elements and their association. - """ - (elem_1_k, _), a, (elem_2_k, _) = assoc_elems - elem_1 = self.im.get(elem_1_k) - if elem_1: - elem_1_name = f"{elem_1.class_} ({elem_1.user_friendly_name})" if elem_1_k[0:4] == "elem" else f"<'{elem_1_k}' not found>" - else: - elem_1_name = elem_1_k - - elem_2 = self.im.get(elem_2_k) - if elem_2: - elem_2_name = f"{elem_2.class_} ({elem_2.user_friendly_name})" if elem_2_k[0:4] == "elem" else f"<'{elem_2_k}' not found>" - else: - elem_2_name = elem_2_k - - assoc_name = str(a) - - return f"{elem_1_name:<50s} {assoc_name:<60s} {elem_2_name:<30s}" - - def pretty_ub_vals_attr(self, attr_and_val: list[AttrAndValues]) -> str: - """Returns a string containg a human-readable name of the element, the value and the - attribute relationship. - """ - (elem_k, _), a, (ubval_k, _) = attr_and_val - - elem_1 = self.im.get(elem_k) - if elem_1: - elem_1_name = f"{elem_1.class_} ({elem_1.user_friendly_name})" if elem_k[0:4] == "elem" else f"<'{elem_k}' not found>" - else: - elem_1_name = elem_k - - attr_name = str(a) - - val_name = str(ubval_k) - - return f"{elem_1_name:<50s} {attr_name:<60s} {val_name:<30s}" - - def thin_ub_elems_and_assoc(self, ctx: Context, ub_elems_and_assoc: list[AssocAndElems]): - if not ub_elems_and_assoc: - return [] - - (_, elem_1_v), a, (_, elem_2_v) = assoc = ub_elems_and_assoc[0] - assoc_rel = ctx.assoc_rel(elem_1_v, a, elem_2_v) - - # Add negated constraint - ctx.solver.push() - - if self.verbose: - print(f"\tAdd constraint Not({self.pretty_ub_elems_assoc(assoc)})") - ctx.solver.add(Not(assoc_rel)) - - res = ctx.solver.check() - - if res == sat: - if self.verbose: - print("SAT:\tAdding one more constraint and trying again") - # Get new ub_elems_and_assoc - model = ctx.solver.model() - thinned_ub_elems_and_assoc = self.get_ub_elems_and_assoc(ctx, model) - - # Print table showing the diff - from difflib import context_diff - uvar_as_text = lambda input: [self.pretty_ub_elems_assoc(assoc) for assoc in input] - if self.verbose: - print("\n".join([a for a in context_diff(uvar_as_text(ub_elems_and_assoc), uvar_as_text(thinned_ub_elems_and_assoc), lineterm="", fromfile='Before', tofile="After")])) - - # Iterate - return self.thin_ub_elems_and_assoc(ctx, thinned_ub_elems_and_assoc) - else: - if self.verbose: - print("UNSAT\tLast constraint was the association we are looking for!") - ctx.solver.pop() - - if ub_elems_and_assoc[1:]: - if self.verbose: - print("\tIterating over") - print("\t\t" + "\n\t\t".join([self.pretty_ub_elems_assoc(assoc) for assoc in ub_elems_and_assoc[1:]])) - return [*set([assoc] + self.thin_ub_elems_and_assoc(ctx, ub_elems_and_assoc[1:]))] - - def thin_ub_vals_and_attr(self, ctx: Context, ub_vals_and_attr: list[AttrAndValues]): - if not ub_vals_and_attr: - return [] - - (_, elem_v), a, (_, attr_v) = attr = ub_vals_and_attr[0] - attr_rel = ctx["attr_rel"](elem_v, a, attr_v) - - # Add negated constraint - ctx.solver.push() - if self.verbose: - print(f"\tAdd constraint Not({self.pretty_ubvals_attrs(attr)})") - ctx.solver.add(Not(attr_rel)) - - res = ctx.solver.check() - - if res == sat: - print("SAT:\tAdding one more constraint and trying again") - # Get new ub_elems_and_assoc - model = ctx.solver.model() - thinned_ub_vals_and_attr = self.get_ubvals_and_attr(ctx, model) - - # Print table showing the diff - from difflib import context_diff - uvar_as_text = lambda input: [self.pretty_ubvals_attrs(attr) for attr in input] - if self.verbose: - print("\n".join([a for a in context_diff(uvar_as_text(ub_vals_and_attr), uvar_as_text(thinned_ub_vals_and_attr), lineterm="", fromfile='Before', tofile="After")])) - - # Iterate - return self.thin_ub_vals_and_attr(ctx, thinned_ub_vals_and_attr) - else: - if self.verbose: - print("UNSAT\tLast constraint was the attribute we are looking for!") - ctx.solver.pop() - - if ub_vals_and_attr[1:]: - if self.verbose: - print("\tIterating over") - print("\t\t" + "\n\t\t".join([self.pretty_ubvals_attrs(attr) for attr in ub_vals_and_attr[1:]])) - return [*set([attr] + self.thin_ub_vals_and_attr(ctx, ub_vals_and_attr[1:]))] diff --git a/mc_openapi/doml_mc/synthesis_old/synthesis_common_reqs.py b/mc_openapi/doml_mc/synthesis_old/synthesis_common_reqs.py deleted file mode 100644 index 1607a9cb59d30a867f91af2a5e4f7a08b3e5e50b..0000000000000000000000000000000000000000 --- a/mc_openapi/doml_mc/synthesis_old/synthesis_common_reqs.py +++ /dev/null @@ -1,194 +0,0 @@ -from z3 import ( - Consts, ExprRef, - ForAll, Exists, Implies, And, Or -) - -from mc_openapi.doml_mc.imc import ( - SMTEncoding, SMTSorts, Requirement, RequirementStore -) - - - - -def get_consts(smtsorts: SMTSorts, consts: list[str]) -> list[ExprRef]: - return Consts(" ".join(consts), smtsorts.element_sort) - - -def vm_iface(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: - vm, iface = get_consts(smtsorts, ["vm", "iface"]) - return ForAll( - [vm], - Implies( - smtenc.element_class_fun(vm) == smtenc.classes["infrastructure_VirtualMachine"], - Exists( - [iface], - And( - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], iface) - ) - ) - ) - ) - - -def software_package_iface_net(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: - asc_consumer, asc_exposer, siface, net, net_iface, cnode, cdeployment, enode, edeployment, vm, dc = get_consts( - smtsorts, - ["asc_consumer", "asc_exposer", "siface", "net", "net_iface", "cnode", "cdeployment", "enode", "edeployment", "vm", "dc"] - ) - return ForAll( - [asc_consumer, asc_exposer, siface], - Implies( - And( - smtenc.association_rel(asc_consumer, smtenc.associations["application_SoftwareComponent::exposedInterfaces"], siface), - smtenc.association_rel(asc_exposer, smtenc.associations["application_SoftwareComponent::consumedInterfaces"], siface), - ), - Exists( - [cdeployment, cnode, edeployment, enode, net], - And( - smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::component"], asc_consumer), - smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::node"], cnode), - Exists( - [vm, net_iface], - Or( - And( # asc_consumer is deployed on a component with an interface in network n - smtenc.association_rel(cnode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_consumer is deployed on a component with an interface in network n - smtenc.association_rel(cnode, smtenc.associations["infrastructure_Container::hosts"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_consumer is deployed on a VM in an AutoScalingGroup with an interface in network n - smtenc.association_rel(cnode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - ) - ), - smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::component"], asc_exposer), - smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::node"], enode), - Exists( - [vm, net_iface], - Or( - And( # asc_exposer is deployed on a component with an interface in network n - smtenc.association_rel(enode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_exposer is deployed on a container hosted on a VM with an interface in network n - smtenc.association_rel(enode, smtenc.associations["infrastructure_Container::hosts"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - And( # asc_exposer is deployed on a VM in an AutoScalingGroup with an interface in network n - smtenc.association_rel(enode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm), - smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface), - smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net), - ), - ) - ) - ) - ) - ) - ) - - -def iface_uniq(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: - def any_iface(elem, iface): - ifaces_assocs = [ - "infrastructure_ComputingNode::ifaces", - "infrastructure_Storage::ifaces", - "infrastructure_FunctionAsAService::ifaces" - ] - return Or(*(smtenc.association_rel(elem, smtenc.associations[assoc_name], iface) for assoc_name in ifaces_assocs)) - - e1, e2, ni = get_consts(smtsorts, ["e1", "e2", "i"]) - return ForAll( - [e1, e2, ni], - Implies( - And(any_iface(e1, ni), any_iface(e2, ni)), - e1 == e2 - ) - ) - - -def all_SoftwareComponents_deployed(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: - sc, deployment, ielem = get_consts(smtsorts, ["sc", "deployment", "ielem"]) - return ForAll( - [sc], - Implies( - smtenc.element_class_fun(sc) == smtenc.classes["application_SoftwareComponent"], - Exists( - [deployment, ielem], - And( - smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::component"], sc), - smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::node"], ielem), - ) - ) - ) - ) - - -def all_infrastructure_elements_deployed(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef: - def checkOneClass(ielem, concr, provider, celem, ielemClass, providerAssoc, celemAssoc): - return Implies( - smtenc.element_class_fun(ielem) == smtenc.classes[ielemClass], - Exists( - [provider, celem], - And( - smtenc.association_rel(concr, smtenc.associations["concrete_ConcreteInfrastructure::providers"], provider), - smtenc.association_rel(provider, smtenc.associations[providerAssoc], celem), - smtenc.association_rel(celem, smtenc.associations[celemAssoc], ielem) - ) - ) - ) - - ielem, concr, provider, celem = get_consts(smtsorts, ["ielem", "concr", "provider", "celem"]) - return Exists( - [concr], - And( - smtenc.element_class_fun(concr) == smtenc.classes["concrete_ConcreteInfrastructure"], - ForAll( - [ielem], - And( - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_VirtualMachine", - "concrete_RuntimeProvider::vms", - "concrete_VirtualMachine::maps" - ), - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_Network", - "concrete_RuntimeProvider::networks", - "concrete_Network::maps" - ), - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_Storage", - "concrete_RuntimeProvider::storages", - "concrete_Storage::maps" - ), - checkOneClass( - ielem, concr, provider, celem, - "infrastructure_FunctionAsAService", - "concrete_RuntimeProvider::faas", - "concrete_FunctionAsAService::maps" - ), - ) - ) - ) - ) - - -synthesis_default_req_store = RequirementStore( - [ - Requirement(*rt) for rt in [ - (vm_iface, "vm_iface", "All virtual machines must be connected to at least one network interface.", "A virtual machine is connected to no network interface."), - (software_package_iface_net, "software_package_iface_net", "All software packages can see the interfaces they need through a common network.", "A software package is deployed on a node that has no access to an interface it consumes."), - (iface_uniq, "iface_uniq", "There are no duplicated interfaces.", "There is a duplicated interface."), - (all_SoftwareComponents_deployed, "all_SoftwareComponents_deployed", "All software components have been deployed to some node.", "A software component has not been deployed to any node."), - (all_infrastructure_elements_deployed, "all_infrastructure_elements_deployed", "All abstract infrastructure elements are mapped to an element in the active concretization.", "An abstract infrastructure element has not been mapped to any element in the active concretization."), - ] - ] -) \ No newline at end of file diff --git a/mc_openapi/doml_mc/synthesis_old/xmi_gen.py b/mc_openapi/doml_mc/synthesis_old/xmi_gen.py deleted file mode 100644 index 14766b5b9be74da698d7c1ceebdd75e2b909c813..0000000000000000000000000000000000000000 --- a/mc_openapi/doml_mc/synthesis_old/xmi_gen.py +++ /dev/null @@ -1,89 +0,0 @@ -import re -from itertools import product - -from pyecore.ecore import EClass, EObject - -from mc_openapi.doml_mc.intermediate_model.doml_element import \ - IntermediateModel -from mc_openapi.doml_mc.intermediate_model.metamodel import DOMLVersion -from mc_openapi.doml_mc.synthesis_old.synthesis import AssocAndElems -from mc_openapi.doml_mc.xmi_parser.doml_model import get_rset - -import secrets -import base64 - -def _gen_random_suffix_hash(len: int = 6): - return base64.urlsafe_b64encode(secrets.token_bytes(len)).decode() - -def _convert_camelcase_to_snakecase(input: str): - return re.sub(r'(?<!^)(?=[A-Z])', '_', input).lower() - -def generate_xmi(root: EObject, new_assocs: list[AssocAndElems], im: IntermediateModel, doml_ver: DOMLVersion = DOMLVersion.V2_0): - - def find_eclass(eclass_package: str, eclass_name: str): - """`eclass_package` is like `infrastructure` - `eclass_name` is like `VirtualMachine` - """ - pkgs = [pkg for pkg - in list(get_rset(doml_ver).metamodel_registry.values()) - if pkg.name == eclass_package - ] - # `metamodel_registry` is a dict consisting of: - # - ecore - # - doml - # |- commons - # |- application - # |- infrastructure - # |- concrete - # |- optimization - return pkgs[0].getEClassifier(eclass_name) - - def find_elem(elem_name: str): - ret = [] - for elem in root.eAllContents(): - try: - if elem.name == elem_name: - ret.append(elem) - except: - pass - return ret[0] - - print(find_eclass("infrastructure", "NetworkInterface")) - - for ((e1_k, e1_v), assoc, (e2_k, e2_v)) in new_assocs: - if "unbound" not in e1_k: - # it's an existing element - e1 = im[e1_k] - e1_class = re.search("^.*_(.+?)$", e1.class_).group(1) - e1_name = e1.attributes["commons_DOMLElement::name"][0] - print(e1_class, e1_name) - # TODO: Should I handle the case where the first element is an unbound one? - - e1_instance = find_elem(e1_name) - - # Regex to split <package>_<class>::<name> - assoc_re = re.search("^(.+?)_(.+?)::(.+?)$", str(assoc)) - assoc_package = assoc_re.group(1) - assoc_class = assoc_re.group(2) - assoc_name = assoc_re.group(3) - print(assoc_package, assoc_class, assoc_name) - - if "unbound" not in e2_k: - # it's an existing element - e2 = im[e2_k] - e2_class = re.search("^.*_(.+?)$", e2.class_).group(1) - e2_name = e2.attributes["commons_DOMLElement::name"][0] - print(e2_class, e2_name) - # TODO: Add relationship between the two? - else: - e1_container = getattr(e1_instance, assoc_name) - e2_instance_type = e1_container.feature.eType - e2_instance_name = ( - _convert_camelcase_to_snakecase(e1_container.feature.eType.name) - + "_" - + _gen_random_suffix_hash()) - e2_instance = e2_instance_type(name=e2_instance_name) - e1_container.append(e2_instance) - - - return root \ No newline at end of file