Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
175 views
in Technique[技术] by (71.8m points)

google cloud storage - Iceberg is not working when writing AVRO from spark

  1. We are encountering the following error when appending AVRO files from GCS to table. The avro files are valid but we use deflated avro, is that a concern?

Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/avro/InvalidAvroMagicException at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77) at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320) at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237) at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46) at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127) at org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149) at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:343) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:163) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) at com.snapchat.transformer.TransformerStreamingWorker.lambda$execute$d121240d$1(TransformerStreamingWorker.java:162) at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2$adapted(JavaDStreamLike.scala:280) at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.ClassNotFoundException: org.apache.avro.InvalidAvroMagicException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 more

  1. The log shows that table already exists for the iceberg table, however I am unable to see the metadata file in gcs? I am running the spark job from the dataproc cluster, where can i see the metadata file?

##################### Iceberg version: 0.11 spark version 3.0 #####################

public void appendData(List<FileMetadata> publishedFiles, Schema icebergSchema) {
    TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER, jobConfig.streamName());
    // PartitionSpec partitionSpec = IcebergInternalFields.getPartitionSpec(tableSchema);
    HadoopTables tables = new HadoopTables(new Configuration());

   
    PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
            .build();

    Table table = null;
    if (tables.exists(tableIdentifier.name())) {
        table = tables.load(tableIdentifier.name());
    } else {
        table = tables.create(
                icebergSchema,
                partitionSpec,
                tableIdentifier.name());
    }
    AppendFiles appendFiles = table.newAppend();
    for (FileMetadata fileMetadata : publishedFiles) {

        appendFiles.appendFile(DataFiles.builder(partitionSpec)
                .withPath(fileMetadata.getFilename())
                .withFileSizeInBytes(fileMetadata.getFileSize())
                .withRecordCount(fileMetadata.getRowCount())
                .withFormat(FileFormat.AVRO)
                .build());
    }
    appendFiles.commit();
}
question from:https://stackoverflow.com/questions/65932414/iceberg-is-not-working-when-writing-avro-from-spark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The following two things have fixed my problem

  • Ensured that I provided the correct path name for the iceberg table ( with gs:// prefix in my case)

  • Resolved the apache.avro dependency version conflict


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...