UPDATE BELOW.....
Have automated csv data dumping into our backend and it looks like there are some malformed items buried in the data. There is a job family title that errantly has a
in between two words. Which is wrecking our data, so that's the problem.
I want to read in the csv as wholetext, regexp_replace the title with the correction, then load this fixed wholetext into a new dataframe as if I loaded up a correct csv to start with.. Here's the madness of where I'm at right now: Lol.
# Import in the functions I need
# from pyspark.sql.functions import col
# Looks like there is a job family title with an issue. There's a carriage return / line feed between two words messing up the csv
# This needs to be patched before we actually pull the data into the dataframes to begin work
data_requisitions_patch0 = spark.read.text('abfss://[email protected]/Data/brokencsv.csv', wholetext=True)
data_requisitions_patch0.collect()
data_requisitions_new = data_requisitions_patch0
# print(data_requisitions_patch0)
# data_requisitions_patch0.printSchema()
# data_requisitions_patch0.show()
data_requisitions_patch1 = data_requisitions_patch0
.withColumn("value", regexp_replace(col('value'), 'Job - Starting
', 'Job - Starting'))
data_requisitions_patch1.collect()
print('patch0')
data_requisitions_new.count()
print('patch1')
data_requisitions_patch1.count()
# print('Patch0 dataframe: ' + data_requisitions_patch0.count())
# print('Patch0 dataframe: ' + data_requisitions_patch1.count())
# data_requisitions_test0 = spark(data_requisitions_patch1, header=True)
# data_requisitions_test1 = spark.read.csv('abfss://[email protected]/Data/brokencsv.csv', header=True)
# data_requisitions_test0.count()
# data_requisitions_test0.printSchema()
# data_requisitions_test1.count()
# data_requisitions_test1.printSchema()
It's obviously a mess right now, I'm trying to troubleshoot is the regexp_replace is working, but not having much luck. Then it occurred to me that I have a single row single column dataframe. Now I'm attempting to try to figure how how to take the dataframe post the 'patch' and turn that back into a normal csv'ed dataframe like everything was ok to begin with.
I left in all my testing nonsense, thought was that you might see where my head is... Unsure if that was helpful or not. Links have been faked, obviously.
First off: Am I going in the right direction? No part of this is really working.. I can't even get the counts to work. test1.count() does return... but test0.count() doesn't? I don't even really care about the counts, that's me just trying to figure out why it's not working.
Secondly: Malformed csv -> wholetext dataframe -> regexp fix the problem -> fixed dataframe with correct headers, rows, like normal.
How off am I?
=======
UPDATE
Made some great progress, I ended up splitting the wholetext dataframe on
line feeds and exploded that into rows. That works great. Now the dataframe has exactly how many rows it's supposed to have. Now working on trying to figure out how to re-map the columns to get those created correctly.
Thoughts are to take in the header row and try to use that as a map? I don't know, still researching.
question from:
https://stackoverflow.com/questions/65929870/correct-malformed-csv-and-pull-corrected-data-back-into-a-dataframe 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…