From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:
from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce
# skip some old code
d
#[['LeaseStatusx00Abandoned', 'Active'],
# ['LeaseStatusx00DEFAULT', 'Pending'],
# ['LeaseRecoveryTypex00Gross-modified', 'Modified Gross'],
# ['LeaseStatusx00Archive', 'Expired'],
# ['LeaseStatusx00Terminated', 'Terminated'],
# ['LeaseRecoveryTypex00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryTypex00Gross', 'Gross']]
# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])
# default mapping
mappings_default = create_map([ lit(j.split('')[0]) for i in d if i[0].upper().endswith('x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>
# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}
# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[
coalesce(
mappings[concat_ws('', lit(c), col(c))],
mappings_default[c],
lit("Not Specified at Source" if c in available_list else "Lookup not found")
).alias(c_name) for c,c_name in matchedAttributeName_List.items()])
Some explanation:
(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ]
to flatten this to a list and apply the flattened list to the create_map
function:
(2) The mappings_default is similar to the above, but add a if
condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]
) ending with x00DEFAULT
and then use split
to strip PrimaryLookupAttributeValue(which is basically x00DEFAULT
) off from the map_key.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…