Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
695 views
in Technique[技术] by (71.8m points)

Partition by using ID, year and month in Glue

I'm trying to perform a partition using merchant_id, year and month as you can check in datasink.

The partition process involving only merchant_id is running fine.. I already have the column in my datasource.

But I dont have the year and month. So i'm trying get the created_at, split it and add 'year' and 'month' columns in the same table. So this way I can perform the partition (merchant_id, year, month).

Anyone can help me with that? This is the code in Glue:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "bills", transformation_ctx = "DataSource0")


df0 = DataSource0.toDF()
dataframe1 = df0.withColumn("filedate", df0.created_at)

dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")

def map_function(dynamicRecord):
    date = dynamicRecord["filedate"].split("-")[0][-8:]
    dynamicRecord["year"] = date[0:4]
    dynamicRecord["month"] = date[4:6]
    dynamicRecord["day"]= date[6:8]
    return dynamicRecord
    
mapping3 = Map.apply(frame = dynamicframe2, f = map_function, transformation_ctx = "mapping3")

Transform2 = ApplyMapping.apply(frame = mapping3, mappings = [("op", "string", "bills_op", "string"), ("timestamp", "string", "bills_timestamp", "string"), ("id", "int", "bills_id", "int"), ("subscription_id", "int", "bills_subscription_id", "int"), ("customer_id", "int", "bills_customer_id", "int"), ("amount", "decimal", "bills_amount", "decimal"), ("created_at", "timestamp", "bills_created_at", "timestamp"), ("updated_at", "timestamp", "bills_updated_at", "timestamp"), ("status", "int", "bills_status", "int"), ("payment_method_id", "int", "bills_payment_method_id", "int"), ("due_at", "timestamp", "bills_due_at", "timestamp"), ("billing_at", "timestamp", "bills_billing_at", "timestamp"), ("installments", "int", "bills_installments", "int"), ("merchant_id", "int", "bills_merchant_id", "int"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "Transform2")

DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "clientes_ativos_enterprise", transformation_ctx = "DataSource1")

Transform0 = ApplyMapping.apply(frame = DataSource1, mappings = [("meta_id", "int", "filter_meta_id", "int"), ("meta_value", "string", "filter_meta_value", "string"), ("merc_id", "int", "filter_merc_id", "int")], transformation_ctx = "Transform0")

Transform1 = Join.apply(frame1 = Transform0, frame2 = Transform2, keys2 = ["bills_merchant_id"], keys1 = ["filter_merc_id"], transformation_ctx = "Transform1")

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/", "compression": "gzip", "partitionKeys": ["bills_merchant_id","year","month"]}, transformation_ctx = "DataSink0")
job.commit()

And this is the full message error:

 Traceback (most recent call last):
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o151.pyWriteDynamicFrame.
: org.apache.spark.sql.AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
         ;
    at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$validateSchema(DataSource.scala:733)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:523)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:535)
    at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:522)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:521)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/bills_partition_filtered.py", line 71, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = 
{
    "path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/",
    "compression": "gzip",
    "partitionKeys": [
        "bills_merchant_id",
        "year",
        "month"
    ]
}
, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: '
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
 

Thank you all!!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
  1. Error tracing says there is a problem with the dataframe schema. You shoud have a look df.printSchema before writing to understand your schema is correct.

  2. You should cast your created_at column to date/datetime.

  3. Use withColumn function and parse created_at column's year and month instead of taking the values statically which can cause inconsistency in future.

dynamicRecord["year"] = date[0:4]

it's not a good way to parse date.

Follow the answers here to apply #3: Splitting Date into Year, Month and Day, with inconsistent delimiters


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...