# flake8: noqa
import os
import copy
import random
import torch
import numpy as np
[docs]
class LogisticsDepartmentOrganizationStructureModel:
"""SCSP demo model class for the logistics department
scenario.
"""
# status: 0 - optional, 1 - mandatory, 2 - replaceble
node_type_dict = {
0: {'title': 'none', 'status': 0, 'weight': 0},
1: {'title': 'Warehouse Management', 'status': 2, 'replacement': [2, 3]},
2: {'title': 'Material Stock Management', 'status': 0},
3: {'title': 'Product Stock Management', 'status': 0},
4: {'title': 'Planning', 'status': 2, 'replacement': [5, 6, 7], 'children': [8]},
5: {'title': 'Material Planning', 'status': 0},
6: {'title': 'Product Stick Planning', 'status': 0},
7: {'title': 'Logistics Planning', 'status': 0},
8: {'title': 'Analytics', 'status': 0},
9: {'title': 'Audit', 'status': 0},
10: {'title': 'Transportation', 'status': 1, 'children': [11]},
11: {'title': 'Fleet Management', 'status': 0},
}
top_level_nodes = [1, 4, 9, 10]
relations_dict = [
# 0 1 2 3 4 5 6 7 8 9 10 11
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 0
[ 0, 0, 1, 1, 2, 2, 2, 0, 2, 2, 2, 2], # 1
[ 0, 0, 0, 0, 2, 2, 0, 2, 2, 2, 2, 2], # 2
[ 0, 0, 0, 0, 2, 0, 2, 2, 2, 2, 2, 0], # 3
[ 0, 2, 2, 2, 0, 1, 1, 1, 1, 0, 2, 2], # 4
[ 0, 2, 2, 0, 0, 0, 0, 2, 2, 0, 0, 2], # 5
[ 0, 2, 0, 2, 0, 0, 0, 2, 2, 0, 0, 0], # 6
[ 0, 0, 2, 2, 0, 2, 2, 0, 2, 0, 2, 2], # 7
[ 0, 2, 2, 2, 0, 2, 2, 2, 0, 0, 2, 2], # 8
[ 0, 2, 2, 2, 0, 0, 0, 0, 0, 0, 2, 2], # 9
[ 0, 2, 2, 2, 2, 0, 0, 2, 2, 2, 0, 1], # 10
[ 0, 2, 2, 0, 2, 2, 0, 2, 2, 2, 0, 0], # 11
]
# Number of node types
NODE_N_TYPES = len(node_type_dict)
# Number of edge types
EDGE_N_TYPES = 3
# Max number of vertices per graph
MAX_NODES_PER_GRAPH = 12
# Parametrization constants
upper_limit = 25
min_person = 0.7
max_person = 1.4
min_nonzero_value = 0.1
min_orgunit = 2
req_orgunit = 5
[docs]
def check_org_unit_feasibility(self, nodes, load, unit_id,
min_person, max_person, min_orgunit, req_orgunit,
logging=False):
"""Checking the feasibility of the staff quantity of an organisational unit.
Parameters
----------
nodes : list
Model's nodes.
load : numeric
Expected load of the node to be checked.
unit_id : int
The node to be checked.
min_person : numeric
Allowed minimum load for a person.
max_person : numeric
Allowed maximum load for a person.
min_orgunit : numeric
Allowed minimum load for a dedicated organisational unit.
req_orgunit : numeric
Minimum load for that requaires a dedicated organisational unit.
logging : bool
Enable/disable logging.
Returns
-------
bool
True if the validations successful, False otherwise.
"""
if ((nodes[unit_id]==0 and load[unit_id]>max_person*req_orgunit) or
(nodes[unit_id]>0 and load[unit_id]<min_person*min_orgunit)):
#error
if logging:
print(f"Node {unit_id} {self.node_type_dict[unit_id]['title']}"
" doesn't meet the requirements: load = {load[unit_id]},"
" node = {nodes[unit_id]}.")
return False
return True
[docs]
def generate_key_values(self, nodes, logging=False):
"""Generation of random key values for model parameters.
Parameters
----------
nodes : list
List of model nodes.
logging : bool
Enable/disable logging..
Returns
-------
list
list of generated key values.
"""
v = np.zeros(12)
if nodes[2] > 0:
v[2] = np.random.uniform(self.min_person*self.min_orgunit, self.upper_limit)
else:
v[2] = np.random.uniform(self.min_nonzero_value, self.max_person*self.req_orgunit)
if nodes[3] > 0:
v[3] = np.random.uniform(self.min_person*self.min_orgunit, self.upper_limit)
else:
v[3] = np.random.uniform(self.min_nonzero_value, self.max_person*self.req_orgunit)
if logging:
print("input v=", v)
return v
[docs]
def generate_values(self, nodes, v, logging=False):
"""Generation of augmented model parameters.
Parameters
----------
nodes : list
List of model nodes.
v : list
Key values for parameter generation.
logging : bool
Enable/disable logging.
Returns
-------
list
list of generated model parameter values.
"""
v2_k = 0.2 if nodes[2] > 0 else 1
v3_k = 0.2 if nodes[3] > 0 else 1
v[1] = v2_k * v[2] + v3_k * v[3]
if not self.check_org_unit_feasibility(nodes, v, 1,
self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v[10] = v[2] * 1 + v[3] * 1
v[11] = v[10] * 0.2
if not self.check_org_unit_feasibility(nodes, v, 11,
self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v12_k = v[2] * 0.4 if nodes[2] > 0 else v[1] * 0.2
v11_k = v[11] * 0.2 if nodes[11] > 0 else 0
v[5] = v12_k + v11_k
if not self.check_org_unit_feasibility(nodes, v, 5,
self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v[6] = v[3] * 0.4 if nodes[3] > 0 else v[1] * 0.2
if not self.check_org_unit_feasibility(nodes, v, 6, self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v[7] = v[10] * 0.2
if not self.check_org_unit_feasibility(nodes, v, 7, self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v[8] = v[5] * 0.2 + v[6] * 0.2 + v[7] * 0.2
if not self.check_org_unit_feasibility(nodes, v, 8,
self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v5_k = 0.2 if nodes[5] > 0 else 1
v6_k = 0.2 if nodes[6] > 0 else 1
v7_k = 0.2 if nodes[7] > 0 else 1
v8_k = 0.2 if nodes[8] > 0 else 1
v[4] = v5_k * v[5] + v6_k * v[6] + v7_k * v[7] + v8_k * v[8]
if not self.check_org_unit_feasibility(nodes, v, 4,
self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
v[9] = v[1] * 0.05 + v[2] * 0.05 + v[3] * 0.05 + v[10] * 0.05 + v[11] * 0.05
if not self.check_org_unit_feasibility(nodes, v, 9,
self.min_person, self.max_person,
self.min_orgunit, self.req_orgunit,
logging=logging):
return np.empty(0)
return v
[docs]
def convert_values2persons(self, nodes, load, min_person, max_person):
"""Convert load values to staff quantity.
Parameters
----------
nodes : list
List of model nodes.
load : list
Load per unit.
min_person : numeric
Allowed minimum load for a person.
max_person : numeric
Allowed maximum load for a person.
Returns
-------
list
list of staff quantities.
"""
staff = np.zeros(12)
for i in range(len(nodes)):
#print(i)
if nodes[i] > 0:
#print(load[i], load[i]/max_person, load[i]/min_person)
staff[i] = np.random.randint(np.ceil(load[i]/max_person),
np.ceil(load[i]/min_person),
1)[0]
return staff
[docs]
def pack_to_ctx(self, v):
"""Pack list of load values for all nodes to context.
Parameters
----------
v : list
List of load values for all nodes.
Returns
-------
list
context (key parameters).
"""
return [v[2], v[3]]
[docs]
def unpack_ctx(self, ctx):
"""Unpack context to list of load values for all nodes.
Parameters
----------
ctx : list
context.
Returns
-------
list
list of load values for all nodes.
"""
v = np.zeros(12)
v[2] = ctx[0]
v[3] = ctx[1]
return v
[docs]
def check_children(self, top_level_nodes, node_list, force_nodes = False):
"""Recursive function for checking correctness of model's node structure.
Parameters
----------
top_level_nodes : list
List of top level nodes.
node_list : list
List of child nodes.
force_nodes : bool
Flag indicating that child models are mandatory.
Returns
-------
tuple
Boolean flag of the model structure correctness, txtual description
of the problem (empty the model structure is valid).
"""
for key in top_level_nodes:
if self.node_type_dict[key]['status'] == 1 or force_nodes:
if node_list[key] == 0:
return False, f'Mandatory element {key} is missing.'
if 'replacement' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['replacement'], node_list)
if not result:
return False, explanation
if 'children' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['children'], node_list)
if not result:
return False, explanation
if self.node_type_dict[key]['status'] == 2 or force_nodes:
if node_list[key] == 0:
if 'replacement' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['replacement'], node_list, True)
if not result:
return False, f'Replacable element {key} is missing. {explanation}'
else:
return False, f'Replacable element {key} is missing.'
if 'children' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['children'], node_list)
if not result:
return False, explanation
elif self.node_type_dict[key]['status'] == 0:
if 'replacement' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['replacement'], node_list)
if not result:
return False, explanation
if 'children' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['children'], node_list)
if not result:
return False, explanation
return True, ''
[docs]
def check_relations(self, nodes, relations):
"""Checks relations validity.
Parameters
----------
nodes : List, numpy.array
The list of node types for each vertex (length must be
==`self.MAX_NODES_PER_GRAPH`).
relations : numpy.ndarray (n, n)
Relation type matrix.
Returns
-------
bool
Returns `True` if all the set of edges is valid and consistent
with the nodes.
diff
Boolean matrix of edge validness (`True` for valid edges).
"""
target_relations = np.array(copy.deepcopy(self.relations_dict))
for node_key in self.node_type_dict:
if node_key not in nodes:
target_relations[node_key, :] = 0
target_relations[:, node_key] = 0
relations_diff = np.array([relations == target_relations])
result = relations_diff.all()
return result, relations_diff
[docs]
def check_nodes(self, nodes) -> bool:
"""Checks node types validity.
Parameters
----------
nodes : List, numpy.array
The list of node types for each vertex (length must be
==`self.MAX_NODES_PER_GRAPH`).
Returns
-------
bool
Returns `True` if the structure contains valid set of nodes.
"""
return self.check_children(self.top_level_nodes, nodes)[0]
[docs]
def overlap(self, first, last, another_first, another_last)->bool:
"""Checks if two intervals intersect.
Parameters
----------
first : numeric
Lower bound of the first interval
last : numeric
Upper bound of the first interval
another_first : numeric
Lower bound of the second interval
another_last : numeric
Upper bound of the second interval
Returns
-------
bool
Returns `True` if the intervals intersect.
"""
return min(last, another_last) - max(first, another_first) >= 0
[docs]
def check_paramater_feasibility(self, nodes, staff, logging=False, ctx=None):
"""Checks parameter validity.
Parameters
----------
nodes : List, numpy.array
The list of node.
staff : List, numpy.array
The list of parameters.
logging : bool
Enable/disable logging.
ctx : list
context.
Returns
-------
bool
Returns `True` if the parameters are valid.
"""
def log_capacity_problem(unit_id):
log = f"Capacity of node {unit_id} " \
f"{self.node_type_dict[unit_id]['title']} " \
"does not meet requirements: "
log += ' '.join([str(load_min[unit_id]),
str(load_max[unit_id]),
str(v_min[unit_id]),
str(v_max[unit_id])])
return log
log = None
for unit_id in self.node_type_dict:
# Optional node (department) has too few staff members
if (not self.node_type_dict[unit_id]['status'] == 1 and
nodes[unit_id] > 0 and staff[unit_id] < self.min_orgunit):
#error
if logging:
log = f"Node {unit_id} " \
f"{self.node_type_dict[unit_id]['title']} " \
f"doesn't meet the requirements: " \
f"staff = {staff[unit_id]}"
return False, log
# Empty node (department) has some staff
if nodes[unit_id] == 0 and staff[unit_id] > 0:
#error
if logging:
log = f"Non-existing Node {unit_id} " \
f"{self.node_type_dict[unit_id]['title']} " \
f"has staff: " \
f"staff = {staff[unit_id]}"
return False, log
load_min = staff * self.min_person
load_max = staff * self.max_person + self.min_person
max_no_unit = self.req_orgunit * self.max_person
for unit_id in self.node_type_dict:
if nodes[unit_id] == 0:
load_max[unit_id] = max_no_unit
v_min = np.zeros(12)
v_max = np.zeros(12)
unit_id = 1
v_min[unit_id] = ((0.2 if nodes[2] > 0 else 1) * load_min[2] +
(0.2 if nodes[3] > 0 else 1) * load_min[3])
v_max[unit_id] = ((0.2 if nodes[2] > 0 else 1) * load_max[2] +
(0.2 if nodes[3] > 0 else 1) * load_max[3])
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 10
v_min[10] = load_min[2] * 1 + load_min[3] * 1
v_max[10] = load_max[2] * 1 + load_max[3] * 1
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 11
v_min[11] = load_min[10] * 0.2
v_max[11] = load_max[10] * 0.2
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 5
v_min[5] = ((load_min[2] * 0.4 if nodes[2] > 0 else load_min[1] * 0.2) +
(load_min[11] * 0.2 if nodes[11] > 0 else 0))
v_max[5] = ((load_max[2] * 0.4 if nodes[2] > 0 else load_max[1] * 0.2) +
(load_max[11] * 0.2 if nodes[11] > 0 else 0))
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 6
v_min[6] = load_min[3] * 0.4 if nodes[3] > 0 else load_min[1] * 0.2
v_max[6] = load_max[3] * 0.4 if nodes[3] > 0 else load_max[1] * 0.2
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 7
v_min[7] = load_min[10] * 0.2
v_max[7] = load_max[10] * 0.2
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 8
v_min[8] = load_min[5] * 0.2 + load_min[6] * 0.2 + load_min[7] * 0.2
v_max[8] = load_max[5] * 0.2 + load_max[6] * 0.2 + load_max[7] * 0.2
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 4
v_min[4] = (0.2 if nodes[5] > 0 else 1) * load_min[5] + \
(0.2 if nodes[6] > 0 else 1) * load_min[6] + \
(0.2 if nodes[7] > 0 else 1) * load_min[7] + \
(0.2 if nodes[8] > 0 else 1) * load_min[8]
v_max[4] = (0.2 if nodes[5] > 0 else 1) * load_max[5] + \
(0.2 if nodes[6] > 0 else 1) * load_max[6] + \
(0.2 if nodes[7] > 0 else 1) * load_max[7] + \
(0.2 if nodes[8] > 0 else 1) * load_max[8]
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
unit_id = 9
v_min[9] = (load_min[1] * 0.05 +
load_min[2] * 0.05 +
load_min[3] * 0.05 +
load_min[10] * 0.05 +
load_min[11] * 0.05)
v_max[9] = (load_max[1] * 0.05 +
load_max[2] * 0.05 +
load_max[3] * 0.05 +
load_max[10] * 0.05 +
load_max[11] * 0.05)
if nodes[unit_id] > 0 and not self.overlap(load_min[unit_id],
load_max[unit_id],
v_min[unit_id],
v_max[unit_id]):
#error
if logging:
log = log_capacity_problem(unit_id)
return False, log
if ctx is not None:
tmp_v = self.generate_values(nodes, self.unpack_ctx(ctx), logging=logging)
for idx, val in enumerate(tmp_v):
if idx == 0 or idx == 2 or idx == 3:
continue
if nodes[idx]>0 and (val > v_max[idx] or val < v_min[idx]):
if logging:
log = f"Value of node {idx} {self.node_type_dict[idx]['title']} does not meet context. "
log += str(v_min[idx]) + ' '
log += str(val) + ' '
log += str(v_max[idx])
return False, log
return True, None
[docs]
def generate_augmentation(self,
base_nodes,
base_edges,
base_staff,
logging=False,
max_iterations=100):
"""Generate augmentation.
Parameters
----------
base_nodes : List, numpy.array
Nodes of the source configuration.
base_nodes : List, numpy.array
Edges of the source configuration.
base_staff : List, numpy.array
The staff quantiities of the source configuration.
logging : bool
Enable/disable logging.
max_iterations : int
maximum number of iterations until a valid
augmented configuration is generated.
Returns
-------
tuple
aug_nodes - augmented nodes,
aug_edges - augmented edges,
aug_staff - augmented staff,
self.pack_to_ctx(v) - augmented context.
"""
# Augment structure
iterations = 0
valid_structure = False
while iterations < max_iterations:
iterations += 1
aug_nodes = base_nodes.copy()
for i in range(1,len(self.node_type_dict)):
if random.choices([True, False], k=1, weights=[1, 8])[0]:
if aug_nodes[i] == 0:
aug_nodes[i] = i
else:
aug_nodes[i] = 0
if self.check_children(self.top_level_nodes, aug_nodes)[0]:
valid_structure = True
break
if not valid_structure:
return [], [], [], []
aug_edges = np.array(copy.deepcopy(self.relations_dict))
for node_key in self.node_type_dict:
if node_key not in aug_nodes:
aug_edges[node_key, :] = 0
aug_edges[:, node_key] = 0
# Augment parameters
iterations = 0
valid_params = False
while iterations < max_iterations:
iterations += 1
v = self.generate_key_values(aug_nodes, logging)
load = self.generate_values(aug_nodes, v, logging)
if len(load) > 0:
valid_params = True
if logging:
print(load)
aug_staff = self.convert_values2persons(aug_nodes,
load,
self.min_person,
self.max_person)
break
if not valid_params:
return [], [], [], []
return aug_nodes, aug_edges, aug_staff, self.pack_to_ctx(v)
[docs]
def check_uniqueness(self,
ground_truth_nodes,
ground_truth_edges,
ground_truth_staff,
ground_truth_ctx,
nodes,
edges,
staff,
ctx):
"""Checks structure uniqueness compared to the training set.
Parameters
----------
ground_truth_nodes : List, numpy.array
Nodes of the training set configurations.
ground_truth_edges : List, numpy.array
Edges of the training set configurations.
ground_truth_staff : List, numpy.array
Staff quantities of the training set configurations.
ground_truth_ctx : List, numpy.array
Contexts of the training set configurations.
nodes : List, numpy.array
Nodes of the checked configuration.
edges : List, numpy.array
Edges of the checked configuration.
staff : List, numpy.array
Staff quantiities of the checked configuration.
ctx: List, numpy.array
Context of the checked configuration.
Returns
-------
bool
True if the configuration is unique,
False otherwise.
"""
for i in range(ground_truth_nodes.shape[0]):
if ((ground_truth_nodes[i]==nodes).all() and
(ground_truth_edges[i]==edges).all() and
(ground_truth_staff[i]==staff).all() and
(ground_truth_ctx[i]==ctx).all()):
return False
return True
[docs]
class ManagementStructureModel:
"""SCSP demo model class, describing administration and sales scenario.
"""
# status: 0 - optional, 1 - mandatory, 2 - replaceble
node_type_dict = {
0: {'title': 'none', 'status': 0, 'weight': 0},
1: {'title': 'Administration', 'status': 2, 'replacement': [2, 3], 'children': [4]},
2: {'title': 'Headquarters', 'status': 0},
3: {'title': 'Management', 'status': 0},
4: {'title': 'Marketing department', 'status': 0},
5: {'title': 'Warehouse', 'status': 0},
6: {'title': 'Logistics', 'status': 0},
7: {'title': 'ERP System', 'status': 1},
8: {'title': 'Shop', 'status': 1},
}
top_level_nodes = [1, 5, 6, 7, 8]
relations_dict = [
# 0 1 2 3 4 5 6 7 8
[ 0, 0, 0, 0, 0, 0, 0, 0, 0], # 0
[ 0, 0, 1, 1, 1, 0, 0, 2, 0], # 1
[ 0, 0, 0, 0, 0, 0, 0, 2, 0], # 2
[ 0, 0, 0, 0, 0, 0, 0, 2, 6], # 3
[ 0, 0, 0, 0, 0, 0, 0, 2, 0], # 4
[ 0, 0, 0, 0, 0, 0, 4, 3, 0], # 5
[ 0, 0, 0, 0, 0, 0, 0, 0, 0], # 6
[ 0, 0, 0, 0, 0, 0, 0, 0, 0], # 7
[ 0, 0, 0, 0, 0, 0, 5, 3, 0], # 8
]
# Number of node types
NODE_N_TYPES = len(node_type_dict)
# Number of edge types
EDGE_N_TYPES = 7
# Max number of vertices per graph
MAX_NODES_PER_GRAPH = 9
# Parametrization constants
node_param_0_min_allowed = [0.0, 0.0, 1000.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
node_param_0_min_required = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1200.0, 0.0, 0.0]
param_0_max = 2000.0
def __init__(self):
pass
[docs]
def generate_key_values(self, nodes, logging=False):
"""Generation of random key values for model parameters.
Parameters
----------
nodes : list
List of model nodes.
logging : bool
Enable/disable logging..
Returns
-------
list
list of generated key values.
"""
v = np.zeros(2)
tmp_param_min = 0
tmp_param_max = self.param_0_max
for i, node in enumerate(nodes):
if node > 0:
tmp_param_min = max(self.node_param_0_min_allowed[i], tmp_param_min)
elif self.node_param_0_min_required[i] > 0:
tmp_param_max = min(self.node_param_0_min_required[i], tmp_param_max)
v[0] = np.random.uniform(tmp_param_min, tmp_param_max)
v[1] = int(nodes[4] > 0)
if logging:
print("input v=", v)
return v
[docs]
def generate_values(self, nodes, v, logging=False):
"""Generation of augmented model parameters.
Parameters
----------
nodes : list
List of model nodes.
v : list
Key values for parameter generation.
logging : bool
Enable/disable logging.
Returns
-------
list
list of generated model parameter values.
"""
params = np.zeros([9, 2])
params[8, 0] = v[0]
pm = max(int(nodes[1]>0) * params[8, 0] / 200, int(nodes[1]>0)) + \
max(int(nodes[2]>0) * params[8, 0] / 200, int(nodes[2]>0)) + \
max(int(nodes[3]>0) * params[8, 0] / 200, int(nodes[3]>0)) + \
int(nodes[4]>0) * 5 + \
max(int(nodes[5]>0) * params[8, 0] / 100, int(nodes[5]>0)) + \
max(int(nodes[6]>0) * params[8, 0] / 500, int(nodes[6]>0)) + \
max(int(nodes[8]>0) * params[8, 0] / 100, int(nodes[3]>0))
params[7, 1] = round(pm)
return params
[docs]
def check_children(self, top_level_nodes, node_list, force_nodes = False):
"""Recursive function for checking correctness of model's node structure.
Parameters
----------
top_level_nodes : list
List of top level nodes.
node_list : list
List of child nodes.
force_nodes : bool
Flag indicating that child models are mandatory.
Returns
-------
tuple
Boolean flag of the model structure correctness, txtual description
of the problem (empty the model structure is valid).
"""
for key in top_level_nodes:
if self.node_type_dict[key]['status'] == 1 or force_nodes:
if node_list[key] == 0:
return False, f'Mandatory element {key} is missing.'
if 'replacement' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['replacement'], node_list)
if not result:
return False, explanation
if 'children' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['children'], node_list)
if not result:
return False, explanation
if self.node_type_dict[key]['status'] == 2 or force_nodes:
if node_list[key] == 0:
if 'replacement' in self.node_type_dict[key]:
flag = False
for child_node in self.node_type_dict[key]['replacement']:
result, explanation = self.check_children(
[child_node], node_list, True)
if result:
flag = True
if not flag:
return False, f'No replacements for {key}.'
else:
return False, f'Replacable element {key} is missing.'
if 'children' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['children'], node_list)
if not result:
return False, explanation
elif self.node_type_dict[key]['status'] == 0:
if 'replacement' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['replacement'], node_list)
if not result:
return False, explanation
if 'children' in self.node_type_dict[key]:
result, explanation = self.check_children(
self.node_type_dict[key]['children'], node_list)
if not result:
return False, explanation
return True, ''
[docs]
def check_relations(self, nodes, relations):
"""Checks relations validity.
Parameters
----------
nodes : List, numpy.array
The list of node types for each vertex (length must be
==`self.MAX_NODES_PER_GRAPH`).
relations : numpy.ndarray (n, n)
Relation type matrix.
Returns
-------
bool
Returns `True` if all the set of edges is valid and consistent
with the nodes.
diff
Boolean matrix of edge validness (`True` for valid edges).
"""
target_relations = np.array(copy.deepcopy(self.relations_dict))
for node_key in self.node_type_dict:
if node_key not in nodes:
target_relations[node_key, :] = 0
target_relations[:, node_key] = 0
if nodes[1] == 1:
target_relations[2, 7] = 0
target_relations[3, 7] = 0
target_relations[4, 7] = 0
relations_diff = np.array([relations == target_relations])
result = relations_diff.all()
return result, relations_diff
[docs]
def check_nodes(self, nodes) -> bool:
"""Checks node types validity.
Parameters
----------
nodes : List, numpy.array
The list of node types for each vertex (length must be
==`self.MAX_NODES_PER_GRAPH`).
Returns
-------
bool
Returns `True` if the structure contains valid set of nodes.
"""
return self.check_children(self.top_level_nodes, nodes)[0]
[docs]
def check_paramater_feasibility(self, nodes, staff, ctx):
"""Checks parameter validity.
Parameters
----------
nodes : List, numpy.array
The list of node.
staff : List, numpy.array
The list of parameters.
logging : bool
Enable/disable logging.
ctx : list
context.
Returns
-------
bool
Returns `True` if the parameters are valid.
"""
log = "Valid configuration"
if ctx[1] != int(nodes[4] > 0):
log = f"Structure does not meet the context. "
log += f"{self.node_type_dict[4]['title']} = {nodes[4]}. "
log += f"Context[1] = {ctx[1]}"
pm = max(int(nodes[1]>0) * staff[8, 0] / 200, int(nodes[1]>0)) + \
max(int(nodes[2]>0) * staff[8, 0] / 200, int(nodes[2]>0)) + \
max(int(nodes[3]>0) * staff[8, 0] / 200, int(nodes[3]>0)) + \
int(nodes[4]>0) * 5 + \
max(int(nodes[5]>0) * staff[8, 0] / 100, int(nodes[5]>0)) + \
max(int(nodes[6]>0) * staff[8, 0] / 500, int(nodes[6]>0)) + \
max(int(nodes[8]>0) * staff[8, 0] / 100, int(nodes[3]>0))
if round(staff[7, 1]) != round(pm):
log = f"Value 1 of node 7 {self.node_type_dict[7]['title']} does not meet the configuration. "
log += f"Value = {staff[7, 1]} "
log += f"Required value = {pm} "
return False, log
return True, None
[docs]
def generate_augmentation(self,
base_nodes,
base_edges,
base_staff,
logging=False,
max_iterations=100):
"""Generate augmentation.
Parameters
----------
base_nodes : List, numpy.array
Nodes of the source configuration.
base_nodes : List, numpy.array
Edges of the source configuration.
base_staff : List, numpy.array
The staff quantiities of the source configuration.
logging : bool
Enable/disable logging.
max_iterations : int
maximum number of iterations until a valid
augmented configuration is generated.
Returns
-------
tuple
aug_nodes - augmented nodes,
aug_edges - augmented edges,
aug_staff - augmented staff,
ctx - augmented context.
"""
# Augment structure
iterations = 0
valid_structure = False
while iterations < max_iterations:
iterations += 1
aug_nodes = base_nodes.copy()
for i in range(1,len(self.node_type_dict)):
if random.choices([True, False], k=1, weights=[1, 8])[0]:
if aug_nodes[i] == 0:
aug_nodes[i] = i
else:
aug_nodes[i] = 0
if self.check_children(self.top_level_nodes, aug_nodes)[0]:
valid_structure = True
break
if not valid_structure:
return [], [], [], []
aug_edges = np.array(copy.deepcopy(self.relations_dict))
for node_key in self.node_type_dict:
if node_key not in aug_nodes:
aug_edges[node_key, :] = 0
aug_edges[:, node_key] = 0
if aug_nodes[1] == 1:
aug_edges[2, 7] = 0
aug_edges[3, 7] = 0
aug_edges[4, 7] = 0
# Augment parameters
ctx = self.generate_key_values(aug_nodes, logging)
aug_params = self.generate_values(aug_nodes, ctx, logging)
return aug_nodes, aug_edges, aug_params, ctx
[docs]
def check_uniqueness(self,
ground_truth_nodes,
ground_truth_edges,
ground_truth_staff,
ground_truth_ctx,
nodes,
edges,
staff,
ctx):
"""Checks structure uniqueness compared to the training set.
Parameters
----------
ground_truth_nodes : List, numpy.array
Nodes of the training set configurations.
ground_truth_edges : List, numpy.array
Edges of the training set configurations.
ground_truth_staff : List, numpy.array
Staff quantities of the training set configurations.
ground_truth_ctx : List, numpy.array
Contexts of the training set configurations.
nodes : List, numpy.array
Nodes of the checked configuration.
edges : List, numpy.array
Edges of the checked configuration.
staff : List, numpy.array
Staff quantiities of the checked configuration.
ctx: List, numpy.array
Context of the checked configuration.
Returns
-------
bool
True if the configuration is unique,
False otherwise.
"""
for i in range(ground_truth_nodes.shape[0]):
if ((ground_truth_nodes[i]==nodes).all() and
(ground_truth_edges[i]==edges).all() and
(ground_truth_staff[i]==staff).all() and
(ground_truth_ctx[i]==ctx).all()):
return False
return True
[docs]
class LogisticsDepartmentModel(
LogisticsDepartmentOrganizationStructureModel):
"""An adapter, defining methods to use organization structure
model for logistics scenario during OrGAN training."""
def __init__(self):
super(LogisticsDepartmentModel, self).__init__()
[docs]
def validness(self, org) -> bool:
"""Checks structure validness.
Parameters
----------
org : Configuration
Organization structure configuration.
Returns
-------
bool
True if the configuration is valid,
False otherwise.
"""
return self.check_nodes(org.nodes) and \
self.check_relations(org.nodes, org.edges)[0] and \
self.check_paramater_feasibility(org.nodes,
org.node_features.ravel(),
ctx=org.condition)[0]
[docs]
def metrics(self, org) -> dict:
"""Returns a dict with relevant metric values.
Parameters
----------
org : Configuration
Organization structure configuration.
Returns
-------
dict
metrics and functions for their evaluation.
"""
return {
'node score': self.check_nodes(org.nodes),
'edge score': self.check_relations(org.nodes, org.edges)[0],
'staff score': self.check_paramater_feasibility(org.nodes,
org.node_features.ravel(),
ctx=org.condition)[0]
}
[docs]
class ManagementModel(ManagementStructureModel):
"""An adapter, defining methods to use organization structure
model for the administration and sales scenario during OrGAN
training.
"""
def __init__(self):
super(ManagementStructureModel, self).__init__()
[docs]
def validness(self, org) -> bool:
"""Checks structure validness.
Parameters
----------
org : Configuration
Organization structure configuration.
Returns
-------
bool
True if the configuration is valid,
False otherwise.
"""
return self.check_nodes(org.nodes) and \
self.check_relations(org.nodes, org.edges)[0] and \
self.check_paramater_feasibility(org.nodes,
org.node_features,
ctx=org.condition)[0]
[docs]
def metrics(self, org) -> dict:
"""Returns a dict with relevant metric values.
Parameters
----------
org : Configuration
Organization structure configuration.
Returns
-------
dict
metrics and functions for their evaluation.
"""
return {
'node score': self.check_nodes(org.nodes),
'edge score': self.check_relations(org.nodes, org.edges)[0],
'staff score': self.check_paramater_feasibility(org.nodes,
org.node_features,
ctx=org.condition)[0]
}
[docs]
def soft_constraints(self, nodes, edges, params, ctx):
"""Soft constraints for this scenario.
The function describes some relationships between
node parameters, context, and organization structure
to simplify the training of a generator.
Parameters
----------
nodes : torch.tensor
Nodes description in an 'internal' format:
(batch, nodes, node_types). Value is the probability that
a node of the specific type is located in a certain position.
Zero-type corresponds to the absense of a node. Non-zero
values can be only on the matrix diagonal or zeroth column.
edges : torch.tensor
Edges representation in an 'internal' format:
(batch, nodes, nodes, edge_types).
params : torch.tensor
Node features: (batch, nodes, features_per_node).
ctx : torch.tensor
Generation context: (batch, context_features).
Returns
-------
torch.tensor
Value tensor (0-dimensional). Non-negative loss for violation
of the constraints.
"""
marketing_cnt = torch.mean(torch.abs(nodes[:, 4, 4] - ctx[:, 1]))
node_8_cnt = torch.mean(torch.abs(params[:, 8, 0] - ctx[:, 0])) / self.param_0_max
node_existence = torch.sum(nodes[:, :, 1:], axis=-1)
pm_nodes = torch.stack([
ctx[:, 0] * 0,
ctx[:, 0] / 200,
ctx[:, 0] / 200,
ctx[:, 0] / 200,
ctx[:, 0] / ctx[:, 0] * 5,
ctx[:, 0] / 100,
ctx[:, 0] / 500,
ctx[:, 0] * 0,
ctx[:, 0] / 100], dim=-1)
pm = torch.sum(node_existence * pm_nodes, axis=-1)
node_7_cnt = torch.mean(torch.abs(pm-params[:, 7, 1]))
node_2_cnt = torch.mean(torch.nn.functional.relu((1000 - ctx[:, 0]) / 1000) *
nodes[:, 2, 2])
node_6_cnt = torch.mean(torch.nn.functional.relu((ctx[:, 0] - 1200) / 1200) *
(1-nodes[:, 6, 6]))
return torch.mean(torch.stack([marketing_cnt,
node_8_cnt,
node_7_cnt,
node_2_cnt,
node_6_cnt]))