Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
197 views
in Technique[技术] by (71.8m points)

python - pySpark mapping multiple variables

The code below maps values and column names of my reference df with my actual dataset, finding exact matches and if an exact match is found, return the OutputValue. However, I'm trying to add the rule that when PrimaryValue = DEFAULT to also return the OutputValue.

The solution I'm trying out to tackle this is to create a new dataframe with null values - since there was no match provided by code below. Thus the next step would be to target the null values whose corresponding PrimaryValue = DEFAULT to replace null by the OutputValue.

  #create a map based on columns from reference_df
  map_key = concat_ws('', final_reference.PrimaryName, final_reference.PrimaryValue)
  map_value = final_reference.OutputValue

  #dataframe of concatinated mappings to get the corresponding OutputValues from reference table
  d = final_reference.agg(collect_set(array(concat_ws('','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
  #display(d)

  #iterate through mapped values 
  mappings = create_map([lit(i) for i in chain.from_iterable(d)])

  #dataframe with corresponding matched OutputValues
  dataset = datasetM.select("*",*[ mappings[concat_ws('', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()]) 
  display(dataset)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:

from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce

# skip some old code

d    
#[['LeaseStatusx00Abandoned', 'Active'],
# ['LeaseStatusx00DEFAULT', 'Pending'],
# ['LeaseRecoveryTypex00Gross-modified', 'Modified Gross'],
# ['LeaseStatusx00Archive', 'Expired'],
# ['LeaseStatusx00Terminated', 'Terminated'],
# ['LeaseRecoveryTypex00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryTypex00Gross', 'Gross']]

# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])

# default mapping
mappings_default = create_map([ lit(j.split('')[0]) for i in d if i[0].upper().endswith('x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>

# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}

# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[ 
  coalesce(
    mappings[concat_ws('', lit(c), col(c))],
    mappings_default[c],
    lit("Not Specified at Source" if c in available_list else "Lookup not found")
  ).alias(c_name) for c,c_name in matchedAttributeName_List.items()])

Some explanation:

(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ] to flatten this to a list and apply the flattened list to the create_map function:

(2) The mappings_default is similar to the above, but add a if condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]) ending with x00DEFAULT and then use split to strip PrimaryLookupAttributeValue(which is basically x00DEFAULT) off from the map_key.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
...