python-mylib/mylib/mapping.py

139 lines
4.3 KiB
Python
Raw Permalink Normal View History

2023-01-16 12:23:50 +01:00
"""
My hash mapping library
Mapping configuration
{
'[dst key 1]': { # Key name in the result
'order': [int], # Processing order between destinations keys
# Source values
'other_key': [key], # Other key of the destination to use as source of values
'key' : '[src key]', # Key of source hash to get source values
'keys' : ['[sk1]', '[sk2]', ...], # List of source hash's keys to get source values
# Clean / convert values
'cleanRegex': '[regex]', # Regex that be use to remove unwanted characters. Ex : [^0-9+]
'convert': [function], # Function to use to convert value : Original value will be passed
# as argument and the value retrieve will replace source value in
# the result
# Ex :
# lambda x: x.strip()
# lambda x: "myformat : %s" % x
# Deduplicate / check values
'deduplicate': [bool], # If True, sources values will be depluplicated
'check': [function], # Function to use to check source value : Source value will be passed
# as argument and if function return True, the value will be preserved
# Ex :
# lambda x: x in my_global_hash
# Join values
'join': '[glue]', # If present, sources values will be join using the "glue"
# Alternative mapping
2024-03-15 09:52:23 +01:00
'or': { [map configuration] } # If this mapping case does not retrieve any value, try to
2023-01-16 12:23:50 +01:00
# get value(s) with this other mapping configuration
},
'[dst key 2]': {
[...]
}
}
Return format :
{
'[dst key 1]': ['v1','v2', ...],
'[dst key 2]': [ ... ],
[...]
}
"""
import logging
import re
log = logging.getLogger(__name__)
def clean_value(value):
"""Clean value as encoded string"""
2023-01-16 12:23:50 +01:00
if isinstance(value, int):
value = str(value)
return value
def get_values(dst, dst_key, src, m):
"""Extract sources values"""
2023-01-16 12:23:50 +01:00
values = []
if "other_key" in m:
if m["other_key"] in dst:
values = dst[m["other_key"]]
if "key" in m:
if m["key"] in src and src[m["key"]] != "":
values.append(clean_value(src[m["key"]]))
if "keys" in m:
for key in m["keys"]:
if key in src and src[key] != "":
values.append(clean_value(src[key]))
# Clean and convert values
if "cleanRegex" in m and len(values) > 0:
new_values = []
for v in values:
nv = re.sub(m["cleanRegex"], "", v)
if nv != "":
new_values.append(nv)
values = new_values
if "convert" in m and len(values) > 0:
new_values = []
for v in values:
nv = m["convert"](v)
if nv != "":
new_values.append(nv)
values = new_values
# Deduplicate values
if m.get("deduplicate") and len(values) > 1:
new_values = []
for v in values:
if v not in new_values:
new_values.append(v)
values = new_values
# Check values
if "check" in m and len(values) > 0:
new_values = []
for v in values:
if m["check"](v):
new_values.append(v)
else:
log.debug("Invalid value %s for key %s", v, dst_key)
values = new_values
# Join values
if "join" in m and len(values) > 1:
values = [m["join"].join(values)]
# Manage alternative mapping case
if len(values) == 0 and "or" in m:
values = get_values(dst, dst_key, src, m["or"])
return values
def map_hash(mapping, src, dst=None):
"""Map hash"""
dst = dst if dst else {}
assert isinstance(dst, dict)
for dst_key in sorted(mapping.keys(), key=lambda x: mapping[x]["order"]):
values = get_values(dst, dst_key, src, mapping[dst_key])
if len(values) == 0:
if "required" in mapping[dst_key] and mapping[dst_key]["required"]:
log.debug(
"Destination key %s could not be filled from source but is required", dst_key
)
return False
continue
dst[dst_key] = values
return dst