Skip to content
Snippets Groups Projects
Unverified Commit 0b9edd4f authored by Andrea Franchini's avatar Andrea Franchini
Browse files

Remove unused code

parent 916325b0
No related branches found
No related tags found
No related merge requests found
import warnings
warnings.warn("the synthesis module is deprecated", DeprecationWarning,
stacklevel=2)
\ No newline at end of file
+ "All VMs have iface"
forall vm (
vm is class abstract.VirtualMachine
implies
exists iface (
vm has abstract.ComputingNode.ifaces iface
)
)
---
"X"
+ "Concretization of Software Interfaces"
forall asc_consumer, asc_exposer, siface (
(
asc_consumer has application.SoftwareComponent.exposedInterfaces siface
and
asc_exposer has application.SoftwareComponent.consumedInterfaces siface
)
implies
exists cdeployment, cnode, edeployment, enode, net (
cdeployment has commons.Deployment.component asc_consumer
and
cdeployment has commons.Deployment.node cnode
and
exists vm, net_iface (
(
# asc_consumer is deployed on a component with an interface in network n
node has abstract.ComputingNode.ifaces net_iface
and
net_iface has abstract.NetworkInterface.belongsTo net
)
or
(
# asc_consumer is deployed on a component with an interface in network n
cnode has abstract.Container.hosts vm
and
vm has abstract.ComputingNode.ifaces net_iface
and
net_iface has abstract.NetworkInterface.belongsTo net
)
or
(
# asc_consumer is deployed on a VM in an AutoScalingGroup with an interface in network n
cnode has abstract.AutoScalingGroup.machineDefinition vm
and
vm has abstract.ComputingNode.ifaces net_iface
and
net_iface has abstract.NetworkInterface.belongsTo net
)
)
)
and
edeployment has commons.Deployment.component asc_exposer
and
edeployment has commons.Deployment.node enode
and
exists vm, net_iface (
(
# asc_exposer is deployed on a component with an interface in network n
enode has abstract.ComputingNode.ifaces net_iface
and
net_iface has abstract.NetworkInterface.belongsTo net
)
or
(
# asc_exposer is deployed on a container hosted on a VM with an interface in network n
enode has abstract.Container.hosts vm
and
vm has abstract.ComputingNode.ifaces net_iface
and
net_iface has abstract.NetworkInterface.belongsTo net
)
or
(
# asc_exposer is deployed on a VM in an AutoScalingGroup with an interface in network n
enode has abstract.AutoScalingGroup.machineDefinition vm
and
vm has abstract.ComputingNode.ifaces net_iface
and
net_iface has abstract.NetworkInterface.belongsTo net
)
)
)
---
"YY"
\ No newline at end of file
from dataclasses import dataclass
from itertools import product
from typing import Tuple
from z3 import (DatatypeRef, DatatypeSortRef, Not, FuncDeclRef, Model, Solver, sat,
unsat)
from mc_openapi.doml_mc.imc import Requirement, SMTEncoding, SMTSorts
from mc_openapi.doml_mc.intermediate_model.doml_element import IntermediateModel
from mc_openapi.doml_mc.intermediate_model.metamodel import (DOMLVersion, MetaModel,
parse_metamodel)
from mc_openapi.doml_mc.xmi_parser.doml_model import parse_doml_model
from mc_openapi.doml_mc.z3encoding.im_encoding import (
assert_im_associations, assert_im_attributes,
def_elem_class_f_and_assert_classes, mk_attr_data_sort, mk_elem_sort_dict,
mk_stringsym_sort_dict)
from mc_openapi.doml_mc.z3encoding.metamodel_encoding import (
def_association_rel, def_attribute_rel, mk_association_sort_dict,
mk_attribute_sort_dict, mk_class_sort_dict)
from mc_openapi.doml_mc.z3encoding.types import Refs
# Types
Elem = Value = Tuple[str, DatatypeRef]
AssocAndElems = Tuple[Elem, DatatypeRef, Elem]
AttrAndValues = Tuple[Elem, DatatypeRef, Value]
@dataclass
class Context:
solver: Solver
class_sort: DatatypeSortRef
class_refs: Refs
assoc_sort: DatatypeSortRef
assoc_refs: Refs
attr_sort: DatatypeSortRef
attr_refs: Refs
str_sort: DatatypeSortRef
str_refs: Refs
elem_sort: DatatypeSortRef
elem_refs: Refs
attr_data_sort: DatatypeSortRef
elem_class_fn: FuncDeclRef
attr_rel: FuncDeclRef
assoc_rel: FuncDeclRef
unbound_elems: list[str]
unbound_values: Refs
class Synthesis:
def __init__(self,
metamodel: MetaModel,
intermediate_model: IntermediateModel,
verbose: bool = False
) -> None:
"""
Initialize the data required to synthetize a new DOML according to provided requirements.
:param metamodel_input_file: The result of parsing the YAML metamodel file
:param domlx_input_file: A XMI DOML file read as bytes containing the model to run through the tests.
:param doml_version: The DOML version as an Enum.
"""
self.mm = metamodel
self.im = intermediate_model
self.verbose = verbose
def _init_context(
self,
ub_elem_n : int = 0,
ub_vals_n : int = 0,
reqs : list[Requirement] = [],
user_req_strings : list[str] = []
) -> Context:
"""Builds a Context object containing all the relationships sorts and refs.
"""
solver = Solver()
class_sort, class_refs = mk_class_sort_dict(self.mm, solver.ctx)
assoc_sort, assoc_refs = mk_association_sort_dict(self.mm, solver.ctx)
attr_sort, attr_refs = mk_attribute_sort_dict(self.mm, solver.ctx)
str_sort, str_refs = mk_stringsym_sort_dict(self.im, self.mm, solver.ctx, user_req_strings)
attr_data_sort = mk_attr_data_sort(str_sort, solver.ctx)
unbound_elems = [f"unbound_elem_{i}" for i in range(ub_elem_n)]
# Takes a list of strings and creates an Enum out of 'em
elem_sort, elem_refs = mk_elem_sort_dict(self.im, solver.ctx, unbound_elems)
unbound_values_names = [f"unbound_val_{i}" for i in range(ub_vals_n)]
unbound_values = {
name : attr_data_sort.placeholder for name in unbound_values_names
}
# Examples of values that can go in unbound_values:
# ctx["attr_data_sort"].int(42), # ok
# ctx["attr_data_sort"].bool(True), # ok
# ctx["attr_data_sort"].str("x"), # cant do: it accept a ctx["str"][<str_key>] as input
# Const("x", ctx["attr_data_sort"]) # cant do: it is a symbolic value that cannot be converted to a BoolRef expression
elem_class_fn = def_elem_class_f_and_assert_classes(
self.im,
solver,
elem_sort,
elem_refs,
class_sort,
class_refs
)
# attr_rel :: (elem_sort, attr_sort, attr_data_sort) -> BoolRef
attr_rel = def_attribute_rel(
attr_sort,
elem_sort,
attr_data_sort
)
assert_im_attributes(
attr_rel,
solver,
self.im,
self.mm,
elem_refs,
attr_sort,
attr_refs,
attr_data_sort,
str_refs
)
# assoc_rel :: (elem_sort, assoc_sort, elem_sort) -> BoolRef
assoc_rel = def_association_rel(
assoc_sort,
elem_sort
)
assert_im_associations(
assoc_rel,
solver,
{k: v for k, v in self.im.items() if k not in unbound_elems},
elem_refs,
assoc_sort,
assoc_refs,
)
context = Context(
solver,
class_sort, class_refs,
assoc_sort, assoc_refs,
attr_sort, attr_refs,
str_sort, str_refs,
elem_sort, elem_refs,
attr_data_sort,
elem_class_fn,
attr_rel,
assoc_rel,
unbound_elems,
unbound_values
)
encodings = SMTEncoding(
class_refs,
assoc_refs,
attr_refs,
elem_refs,
str_refs,
elem_class_fn,
attr_rel,
assoc_rel
)
sorts = SMTSorts(
class_sort,
assoc_sort,
attr_sort,
elem_sort,
str_sort,
attr_data_sort
)
# Add requirements
# TODO: Investigate whether it's possible or a good idea
# to handle each requirement individually, like in imc.py
for req in reqs:
req_fn = req.assert_callable(encodings, sorts)
req_name = req.assert_name
solver.assert_and_track(req_fn, req_name)
return context
def check(self,
ub_elems_n: int = 0,
ub_vals_n: int = 0,
reqs: list = [],
curr_try: int = 0,
max_tries: int = 10,
user_req_strings: list[str] = []
) -> Context:
if curr_try > max_tries:
raise RuntimeError("Max tries exceeded.")
ctx = self._init_context(ub_elems_n, ub_vals_n, reqs, user_req_strings)
res = ctx.solver.check()
if res == sat:
if self.verbose:
print(f"<Sat>\tub_elems_n={ub_elems_n}, ubvals_n={ub_vals_n}")
return ctx
elif res == unsat:
if self.verbose:
print(f"<Unsat>\tub_elems_n={ub_elems_n}, ubvals_n={ub_vals_n}")
# TODO: Choose which goes first in a smart way?
if ub_elems_n > ub_vals_n:
new_ub_vals_n = ub_vals_n * 2 if ub_vals_n >= 1 else 1
return self.check(ub_elems_n, new_ub_vals_n, reqs, curr_try + 1, max_tries)
elif ub_elems_n <= ub_vals_n:
new_ub_elems_n = ub_elems_n * 2 if ub_elems_n >= 1 else 1
return self.check(new_ub_elems_n, ub_vals_n, reqs, curr_try + 1, max_tries)
else: # res == dontknow
raise RuntimeError("It took too long to decide satifiability.")
def get_ub_elems_and_assoc(self, ctx: Context, model: Model) -> list[AssocAndElems]:
"""Returns the associations between unbound elements."""
return [ ((elem_1_k, elem_1_v), a, (elem_2_k, elem_2_v))
for (elem_1_k, elem_1_v), a, (elem_2_k, elem_2_v) in product(ctx.elem_refs.items(), ctx.assoc_refs.values(), ctx.elem_refs.items())
if (elem_1_k in ctx.unbound_elems or elem_2_k in ctx.unbound_elems) and model.eval(ctx.assoc_rel(elem_1_v, a, elem_2_v))
]
def get_ub_vals_and_attr(self, ctx: Context, model: Model) -> list[AttrAndValues]:
"""Returns the attribute relationships between elements and attribute data/values."""
return [ ((elem_k, elem_v), a, (ubval_k, ubval_v))
for (elem_k, elem_v), a, (ubval_k, ubval_v) in product(ctx.elem_refs.items(), ctx.attr_refs.values(), ctx.unbound_values.items())
if model.eval(ctx.attr_rel(elem_v, a, ubval_v))
]
def pretty_ub_elems_assoc(self, assoc_elems: list[AssocAndElems]) -> str:
"""Returns a string containg a human-readable name of the elements and their association.
"""
(elem_1_k, _), a, (elem_2_k, _) = assoc_elems
elem_1 = self.im.get(elem_1_k)
if elem_1:
elem_1_name = f"{elem_1.class_} ({elem_1.user_friendly_name})" if elem_1_k[0:4] == "elem" else f"<'{elem_1_k}' not found>"
else:
elem_1_name = elem_1_k
elem_2 = self.im.get(elem_2_k)
if elem_2:
elem_2_name = f"{elem_2.class_} ({elem_2.user_friendly_name})" if elem_2_k[0:4] == "elem" else f"<'{elem_2_k}' not found>"
else:
elem_2_name = elem_2_k
assoc_name = str(a)
return f"{elem_1_name:<50s} {assoc_name:<60s} {elem_2_name:<30s}"
def pretty_ub_vals_attr(self, attr_and_val: list[AttrAndValues]) -> str:
"""Returns a string containg a human-readable name of the element, the value and the
attribute relationship.
"""
(elem_k, _), a, (ubval_k, _) = attr_and_val
elem_1 = self.im.get(elem_k)
if elem_1:
elem_1_name = f"{elem_1.class_} ({elem_1.user_friendly_name})" if elem_k[0:4] == "elem" else f"<'{elem_k}' not found>"
else:
elem_1_name = elem_k
attr_name = str(a)
val_name = str(ubval_k)
return f"{elem_1_name:<50s} {attr_name:<60s} {val_name:<30s}"
def thin_ub_elems_and_assoc(self, ctx: Context, ub_elems_and_assoc: list[AssocAndElems]):
if not ub_elems_and_assoc:
return []
(_, elem_1_v), a, (_, elem_2_v) = assoc = ub_elems_and_assoc[0]
assoc_rel = ctx.assoc_rel(elem_1_v, a, elem_2_v)
# Add negated constraint
ctx.solver.push()
if self.verbose:
print(f"\tAdd constraint Not({self.pretty_ub_elems_assoc(assoc)})")
ctx.solver.add(Not(assoc_rel))
res = ctx.solver.check()
if res == sat:
if self.verbose:
print("SAT:\tAdding one more constraint and trying again")
# Get new ub_elems_and_assoc
model = ctx.solver.model()
thinned_ub_elems_and_assoc = self.get_ub_elems_and_assoc(ctx, model)
# Print table showing the diff
from difflib import context_diff
uvar_as_text = lambda input: [self.pretty_ub_elems_assoc(assoc) for assoc in input]
if self.verbose:
print("\n".join([a for a in context_diff(uvar_as_text(ub_elems_and_assoc), uvar_as_text(thinned_ub_elems_and_assoc), lineterm="", fromfile='Before', tofile="After")]))
# Iterate
return self.thin_ub_elems_and_assoc(ctx, thinned_ub_elems_and_assoc)
else:
if self.verbose:
print("UNSAT\tLast constraint was the association we are looking for!")
ctx.solver.pop()
if ub_elems_and_assoc[1:]:
if self.verbose:
print("\tIterating over")
print("\t\t" + "\n\t\t".join([self.pretty_ub_elems_assoc(assoc) for assoc in ub_elems_and_assoc[1:]]))
return [*set([assoc] + self.thin_ub_elems_and_assoc(ctx, ub_elems_and_assoc[1:]))]
def thin_ub_vals_and_attr(self, ctx: Context, ub_vals_and_attr: list[AttrAndValues]):
if not ub_vals_and_attr:
return []
(_, elem_v), a, (_, attr_v) = attr = ub_vals_and_attr[0]
attr_rel = ctx["attr_rel"](elem_v, a, attr_v)
# Add negated constraint
ctx.solver.push()
if self.verbose:
print(f"\tAdd constraint Not({self.pretty_ubvals_attrs(attr)})")
ctx.solver.add(Not(attr_rel))
res = ctx.solver.check()
if res == sat:
print("SAT:\tAdding one more constraint and trying again")
# Get new ub_elems_and_assoc
model = ctx.solver.model()
thinned_ub_vals_and_attr = self.get_ubvals_and_attr(ctx, model)
# Print table showing the diff
from difflib import context_diff
uvar_as_text = lambda input: [self.pretty_ubvals_attrs(attr) for attr in input]
if self.verbose:
print("\n".join([a for a in context_diff(uvar_as_text(ub_vals_and_attr), uvar_as_text(thinned_ub_vals_and_attr), lineterm="", fromfile='Before', tofile="After")]))
# Iterate
return self.thin_ub_vals_and_attr(ctx, thinned_ub_vals_and_attr)
else:
if self.verbose:
print("UNSAT\tLast constraint was the attribute we are looking for!")
ctx.solver.pop()
if ub_vals_and_attr[1:]:
if self.verbose:
print("\tIterating over")
print("\t\t" + "\n\t\t".join([self.pretty_ubvals_attrs(attr) for attr in ub_vals_and_attr[1:]]))
return [*set([attr] + self.thin_ub_vals_and_attr(ctx, ub_vals_and_attr[1:]))]
from z3 import (
Consts, ExprRef,
ForAll, Exists, Implies, And, Or
)
from mc_openapi.doml_mc.imc import (
SMTEncoding, SMTSorts, Requirement, RequirementStore
)
def get_consts(smtsorts: SMTSorts, consts: list[str]) -> list[ExprRef]:
return Consts(" ".join(consts), smtsorts.element_sort)
def vm_iface(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef:
vm, iface = get_consts(smtsorts, ["vm", "iface"])
return ForAll(
[vm],
Implies(
smtenc.element_class_fun(vm) == smtenc.classes["infrastructure_VirtualMachine"],
Exists(
[iface],
And(
smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], iface)
)
)
)
)
def software_package_iface_net(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef:
asc_consumer, asc_exposer, siface, net, net_iface, cnode, cdeployment, enode, edeployment, vm, dc = get_consts(
smtsorts,
["asc_consumer", "asc_exposer", "siface", "net", "net_iface", "cnode", "cdeployment", "enode", "edeployment", "vm", "dc"]
)
return ForAll(
[asc_consumer, asc_exposer, siface],
Implies(
And(
smtenc.association_rel(asc_consumer, smtenc.associations["application_SoftwareComponent::exposedInterfaces"], siface),
smtenc.association_rel(asc_exposer, smtenc.associations["application_SoftwareComponent::consumedInterfaces"], siface),
),
Exists(
[cdeployment, cnode, edeployment, enode, net],
And(
smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::component"], asc_consumer),
smtenc.association_rel(cdeployment, smtenc.associations["commons_Deployment::node"], cnode),
Exists(
[vm, net_iface],
Or(
And( # asc_consumer is deployed on a component with an interface in network n
smtenc.association_rel(cnode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface),
smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net),
),
And( # asc_consumer is deployed on a component with an interface in network n
smtenc.association_rel(cnode, smtenc.associations["infrastructure_Container::hosts"], vm),
smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface),
smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net),
),
And( # asc_consumer is deployed on a VM in an AutoScalingGroup with an interface in network n
smtenc.association_rel(cnode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm),
smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface),
smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net),
),
)
),
smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::component"], asc_exposer),
smtenc.association_rel(edeployment, smtenc.associations["commons_Deployment::node"], enode),
Exists(
[vm, net_iface],
Or(
And( # asc_exposer is deployed on a component with an interface in network n
smtenc.association_rel(enode, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface),
smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net),
),
And( # asc_exposer is deployed on a container hosted on a VM with an interface in network n
smtenc.association_rel(enode, smtenc.associations["infrastructure_Container::hosts"], vm),
smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface),
smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net),
),
And( # asc_exposer is deployed on a VM in an AutoScalingGroup with an interface in network n
smtenc.association_rel(enode, smtenc.associations["infrastructure_AutoScalingGroup::machineDefinition"], vm),
smtenc.association_rel(vm, smtenc.associations["infrastructure_ComputingNode::ifaces"], net_iface),
smtenc.association_rel(net_iface, smtenc.associations["infrastructure_NetworkInterface::belongsTo"], net),
),
)
)
)
)
)
)
def iface_uniq(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef:
def any_iface(elem, iface):
ifaces_assocs = [
"infrastructure_ComputingNode::ifaces",
"infrastructure_Storage::ifaces",
"infrastructure_FunctionAsAService::ifaces"
]
return Or(*(smtenc.association_rel(elem, smtenc.associations[assoc_name], iface) for assoc_name in ifaces_assocs))
e1, e2, ni = get_consts(smtsorts, ["e1", "e2", "i"])
return ForAll(
[e1, e2, ni],
Implies(
And(any_iface(e1, ni), any_iface(e2, ni)),
e1 == e2
)
)
def all_SoftwareComponents_deployed(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef:
sc, deployment, ielem = get_consts(smtsorts, ["sc", "deployment", "ielem"])
return ForAll(
[sc],
Implies(
smtenc.element_class_fun(sc) == smtenc.classes["application_SoftwareComponent"],
Exists(
[deployment, ielem],
And(
smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::component"], sc),
smtenc.association_rel(deployment, smtenc.associations["commons_Deployment::node"], ielem),
)
)
)
)
def all_infrastructure_elements_deployed(smtenc: SMTEncoding, smtsorts: SMTSorts) -> ExprRef:
def checkOneClass(ielem, concr, provider, celem, ielemClass, providerAssoc, celemAssoc):
return Implies(
smtenc.element_class_fun(ielem) == smtenc.classes[ielemClass],
Exists(
[provider, celem],
And(
smtenc.association_rel(concr, smtenc.associations["concrete_ConcreteInfrastructure::providers"], provider),
smtenc.association_rel(provider, smtenc.associations[providerAssoc], celem),
smtenc.association_rel(celem, smtenc.associations[celemAssoc], ielem)
)
)
)
ielem, concr, provider, celem = get_consts(smtsorts, ["ielem", "concr", "provider", "celem"])
return Exists(
[concr],
And(
smtenc.element_class_fun(concr) == smtenc.classes["concrete_ConcreteInfrastructure"],
ForAll(
[ielem],
And(
checkOneClass(
ielem, concr, provider, celem,
"infrastructure_VirtualMachine",
"concrete_RuntimeProvider::vms",
"concrete_VirtualMachine::maps"
),
checkOneClass(
ielem, concr, provider, celem,
"infrastructure_Network",
"concrete_RuntimeProvider::networks",
"concrete_Network::maps"
),
checkOneClass(
ielem, concr, provider, celem,
"infrastructure_Storage",
"concrete_RuntimeProvider::storages",
"concrete_Storage::maps"
),
checkOneClass(
ielem, concr, provider, celem,
"infrastructure_FunctionAsAService",
"concrete_RuntimeProvider::faas",
"concrete_FunctionAsAService::maps"
),
)
)
)
)
synthesis_default_req_store = RequirementStore(
[
Requirement(*rt) for rt in [
(vm_iface, "vm_iface", "All virtual machines must be connected to at least one network interface.", "A virtual machine is connected to no network interface."),
(software_package_iface_net, "software_package_iface_net", "All software packages can see the interfaces they need through a common network.", "A software package is deployed on a node that has no access to an interface it consumes."),
(iface_uniq, "iface_uniq", "There are no duplicated interfaces.", "There is a duplicated interface."),
(all_SoftwareComponents_deployed, "all_SoftwareComponents_deployed", "All software components have been deployed to some node.", "A software component has not been deployed to any node."),
(all_infrastructure_elements_deployed, "all_infrastructure_elements_deployed", "All abstract infrastructure elements are mapped to an element in the active concretization.", "An abstract infrastructure element has not been mapped to any element in the active concretization."),
]
]
)
\ No newline at end of file
import re
from itertools import product
from pyecore.ecore import EClass, EObject
from mc_openapi.doml_mc.intermediate_model.doml_element import \
IntermediateModel
from mc_openapi.doml_mc.intermediate_model.metamodel import DOMLVersion
from mc_openapi.doml_mc.synthesis_old.synthesis import AssocAndElems
from mc_openapi.doml_mc.xmi_parser.doml_model import get_rset
import secrets
import base64
def _gen_random_suffix_hash(len: int = 6):
return base64.urlsafe_b64encode(secrets.token_bytes(len)).decode()
def _convert_camelcase_to_snakecase(input: str):
return re.sub(r'(?<!^)(?=[A-Z])', '_', input).lower()
def generate_xmi(root: EObject, new_assocs: list[AssocAndElems], im: IntermediateModel, doml_ver: DOMLVersion = DOMLVersion.V2_0):
def find_eclass(eclass_package: str, eclass_name: str):
"""`eclass_package` is like `infrastructure`
`eclass_name` is like `VirtualMachine`
"""
pkgs = [pkg for pkg
in list(get_rset(doml_ver).metamodel_registry.values())
if pkg.name == eclass_package
]
# `metamodel_registry` is a dict consisting of:
# - ecore
# - doml
# |- commons
# |- application
# |- infrastructure
# |- concrete
# |- optimization
return pkgs[0].getEClassifier(eclass_name)
def find_elem(elem_name: str):
ret = []
for elem in root.eAllContents():
try:
if elem.name == elem_name:
ret.append(elem)
except:
pass
return ret[0]
print(find_eclass("infrastructure", "NetworkInterface"))
for ((e1_k, e1_v), assoc, (e2_k, e2_v)) in new_assocs:
if "unbound" not in e1_k:
# it's an existing element
e1 = im[e1_k]
e1_class = re.search("^.*_(.+?)$", e1.class_).group(1)
e1_name = e1.attributes["commons_DOMLElement::name"][0]
print(e1_class, e1_name)
# TODO: Should I handle the case where the first element is an unbound one?
e1_instance = find_elem(e1_name)
# Regex to split <package>_<class>::<name>
assoc_re = re.search("^(.+?)_(.+?)::(.+?)$", str(assoc))
assoc_package = assoc_re.group(1)
assoc_class = assoc_re.group(2)
assoc_name = assoc_re.group(3)
print(assoc_package, assoc_class, assoc_name)
if "unbound" not in e2_k:
# it's an existing element
e2 = im[e2_k]
e2_class = re.search("^.*_(.+?)$", e2.class_).group(1)
e2_name = e2.attributes["commons_DOMLElement::name"][0]
print(e2_class, e2_name)
# TODO: Add relationship between the two?
else:
e1_container = getattr(e1_instance, assoc_name)
e2_instance_type = e1_container.feature.eType
e2_instance_name = (
_convert_camelcase_to_snakecase(e1_container.feature.eType.name)
+ "_"
+ _gen_random_suffix_hash())
e2_instance = e2_instance_type(name=e2_instance_name)
e1_container.append(e2_instance)
return root
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment