Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialization Issue with Sedona and Iceberg (Kryo serializer) #1724

Open
HelloJowet opened this issue Dec 15, 2024 · 3 comments
Open

Serialization Issue with Sedona and Iceberg (Kryo serializer) #1724

HelloJowet opened this issue Dec 15, 2024 · 3 comments

Comments

@HelloJowet
Copy link

HelloJowet commented Dec 15, 2024

Expected behavior

Data should be successfully inserted into the Iceberg table without serialisation errors when using Sedona and Iceberg.

Actual behavior

The INSERT INTO operation fails with a Kryo serialisation exception. The error trace indicates an IndexOutOfBoundsException in the Kryo serializer while handling Iceberg's GenericDataFile and SparkWrite.TaskCommit objects.

Error message:

py4j.protocol.Py4JJavaError: An error occurred while calling o55.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result:
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 44 out of bounds for length 14
Serialization trace:
partitionType (org.apache.iceberg.GenericDataFile)
taskFiles (org.apache.iceberg.spark.source.SparkWrite$TaskCommit)
writerCommitMessage (org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult)

Steps to reproduce the problem

  1. Configure Sedona and Iceberg with the following settings:
from sedona.spark import SedonaContext

config = (
    SedonaContext.builder()
    .master('spark://localhost:5581')
    .config(
        'spark.jars.packages',
        # sedona
        'org.apache.sedona:sedona-spark-3.5_2.12:1.7.0,'
        'org.datasyslab:geotools-wrapper:1.7.0-28.5,'
        # iceberg
        'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,'
        'org.apache.iceberg:iceberg-aws-bundle:1.7.1,'
        'org.postgresql:postgresql:42.7.4',
    )
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
    .config('spark.kryo.registrator', 'org.apache.sedona.core.serde.SedonaKryoRegistrator')
    .config('spark.sql.catalog.my_catalog', 'org.apache.iceberg.spark.SparkCatalog')
    .config('spark.sql.catalog.my_catalog.type', 'jdbc')
    .config('spark.sql.catalog.my_catalog.uri', 'jdbc:postgresql://localhost:5500/data_catalog_apache_iceberg')
    .config('spark.sql.catalog.my_catalog.jdbc.user', 'postgres')
    .config('spark.sql.catalog.my_catalog.jdbc.password', 'postgres')
    .config('spark.sql.catalog.my_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    .config('spark.sql.catalog.my_catalog.warehouse', 's3a://data-lakehouse')
    .config('spark.sql.catalog.my_catalog.s3.endpoint', 'http://localhost:5561')
    .config('spark.sql.catalog.my_catalog.s3.access-key-id', 'admin')
    .config('spark.sql.catalog.my_catalog.s3.secret-access-key', 'password')
    .getOrCreate()
)
sedona = SedonaContext.create(config)
  1. Execute the following queries:
sedona.sql('CREATE TABLE my_catalog.table2 (name string) USING iceberg;')
sedona.sql("INSERT INTO my_catalog.table2 VALUES ('Alex'), ('Dipankar'), ('Jason')")

Additional information

If I perform the same operations using Spark without Sedona, everything works seamlessly:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master('spark://localhost:5581')
    .config(
        'spark.jars.packages',
        'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,' 'org.apache.iceberg:iceberg-aws-bundle:1.7.1,' 'org.postgresql:postgresql:42.7.4',
    )
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    .config('spark.sql.catalog.my_catalog', 'org.apache.iceberg.spark.SparkCatalog')
    .config('spark.sql.catalog.my_catalog.type', 'jdbc')
    .config('spark.sql.catalog.my_catalog.uri', 'jdbc:postgresql://localhost:5500/data_catalog_apache_iceberg')
    .config('spark.sql.catalog.my_catalog.jdbc.user', 'postgres')
    .config('spark.sql.catalog.my_catalog.jdbc.password', 'postgres')
    .config('spark.sql.catalog.my_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    .config('spark.sql.catalog.my_catalog.warehouse', 's3a://data-lakehouse')
    .config('spark.sql.catalog.my_catalog.s3.endpoint', 'http://localhost:5561')
    .config('spark.sql.catalog.my_catalog.s3.access-key-id', 'admin')
    .config('spark.sql.catalog.my_catalog.s3.secret-access-key', 'password')
    .getOrCreate()
)

spark.sql('CREATE TABLE my_catalog.table8 (name string) USING iceberg;')
spark.sql("INSERT INTO my_catalog.table8 VALUES ('Alex'), ('Dipankar'), ('Jason')")

If I'm using the JavaSerializer (.config('spark.serializer', 'org.apache.spark.serializer.JavaSerializer')) in the Sedona example, it works.

Settings

Sedona version = 1.7.1

Apache Spark version = 3.5

API type = Python

Scala version = 2.12

JRE version = 11.0.25

Python version = 3.12.0

Environment = Standalone

Copy link

Thank you for your interest in Apache Sedona! We appreciate you opening your first issue. Contributions like yours help make Apache Sedona better.

@Kontinuation
Copy link
Member

Kontinuation commented Dec 18, 2024

I have not reproduced this using similar configurations. Can you try running the repro without Sedona but with Kryo serialization enabled?

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master('spark://localhost:5581')
    .config(
        'spark.jars.packages',
        'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,' 'org.apache.iceberg:iceberg-aws-bundle:1.7.1,' 'org.postgresql:postgresql:42.7.4',
    )
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    .config('spark.sql.catalog.my_catalog', 'org.apache.iceberg.spark.SparkCatalog')
    .config('spark.sql.catalog.my_catalog.type', 'jdbc')
    .config('spark.sql.catalog.my_catalog.uri', 'jdbc:postgresql://localhost:5500/data_catalog_apache_iceberg')
    .config('spark.sql.catalog.my_catalog.jdbc.user', 'postgres')
    .config('spark.sql.catalog.my_catalog.jdbc.password', 'postgres')
    .config('spark.sql.catalog.my_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    .config('spark.sql.catalog.my_catalog.warehouse', 's3a://data-lakehouse')
    .config('spark.sql.catalog.my_catalog.s3.endpoint', 'http://localhost:5561')
    .config('spark.sql.catalog.my_catalog.s3.access-key-id', 'admin')
    .config('spark.sql.catalog.my_catalog.s3.secret-access-key', 'password')
    .getOrCreate()
)

spark.sql('CREATE TABLE my_catalog.table8 (name string) USING iceberg;')
spark.sql("INSERT INTO my_catalog.table8 VALUES ('Alex'), ('Dipankar'), ('Jason')")

@HelloJowet
Copy link
Author

HelloJowet commented Dec 18, 2024

Thank's for looking into it.

Can you try running the repro without Sedona but with Kryo serialization enabled?

You're right. In this case the error appears too ...

I have not reproduced this using similar configurations.

Here are some instructions to reproduce the error:

  1. Create docker-compose.yaml file:

    services:  
      spark_master:
        restart: always
        image: bitnami/spark:3.5
        ports:
          - 8080:8080
          - 7077:7077
        hostname: spark-master
        environment:
          - SPARK_MODE=master
          - SPARK_RPC_AUTHENTICATION_ENABLED=no
          - SPARK_RPC_ENCRYPTION_ENABLED=no
          - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
          - SPARK_SSL_ENABLED=no
          - SPARK_WORKER_WEBUI_PORT=8081
      
      spark_worker:
        restart: always
        image: bitnami/spark:3.5
        ports:
          - 8081:8081
        environment:
          - SPARK_MODE=worker
          - SPARK_MASTER_URL=spark://spark-master:7077
          - SPARK_WORKER_MEMORY=8G
          - SPARK_WORKER_CORES=4
          - AWS_ACCESS_KEY_ID=user
          - AWS_SECRET_ACCESS_KEY=password
          - AWS_REGION=us-east-1
        depends_on:
          - spark_master
      
      storage_s3:
        restart: always
        image: quay.io/minio/minio:RELEASE.2024-10-29T16-01-48Z
        ports:
          - 5560:5560
          - 5561:5561
        hostname: storage-s3
        environment:
          MINIO_ROOT_USER: admin
          MINIO_ROOT_PASSWORD: password
        command: server /data --console-address ":5560" --address=":5561"
        healthcheck:
          test: ["CMD", "curl", "-f", "http://localhost:5560/minio/health/live"]
          interval: 5s
          timeout: 5s
          retries: 5
      
      storage_s3_initial_setup:
        image: minio/mc:RELEASE.2024-10-29T15-34-59Z
        depends_on:
          storage_s3:
            condition: service_healthy
        volumes:
          - ./minio_docker_entrypoint.sh:/docker_entrypoint.sh:z
        entrypoint:
          - /docker_entrypoint.sh
      
      database_postgres:
        restart: always
        image: postgis/postgis:16-3.4
        ports:
          - 5500:5432
        environment:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
  2. Create minio_docker_entrypoint.sh file:

    #!/bin/bash
    
    # Set up alias for MinIO
    mc alias set minio http://storage-s3:5561 admin password;
    
    # Create buckets
    mc mb minio/data-lakehouse;
  3. Start docker compose: docker compose up

  4. Example that works (replace YOUR_IP_ADDRESS with your ip address):

    from pyspark.sql import SparkSession
    
    spark = (
        SparkSession.builder.master('spark://localhost:7077')
        .config(
            'spark.jars.packages',
            'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,' 'org.apache.iceberg:iceberg-aws-bundle:1.7.1,' 'org.postgresql:postgresql:42.7.4',
        )
        # .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
        config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .config('spark.sql.catalog.my_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .config('spark.sql.catalog.my_catalog.type', 'hadoop')
        .config('spark.sql.catalog.my_catalog.type', 'jdbc')
        .config('spark.sql.catalog.my_catalog.uri', 'jdbc:postgresql://localhost:5500/postgres')
        .config('spark.sql.catalog.my_catalog.jdbc.user', 'postgres')
        .config('spark.sql.catalog.my_catalog.jdbc.password', 'postgres')
        .config('spark.sql.catalog.my_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .config('spark.sql.catalog.my_catalog.warehouse', 's3://data-lakehouse')
        .config('spark.sql.catalog.my_catalog.s3.region', 'us-east-1')
        .config('spark.sql.catalog.my_catalog.s3.endpoint', 'http://YOUR_IP_ADDRESS:5561')
        .config('spark.sql.catalog.my_catalog.s3.access-key-id', 'admin')
        .config('spark.sql.catalog.my_catalog.s3.secret-access-key', 'password')
        .getOrCreate()
    )
    
    spark.sql('CREATE TABLE my_catalog.table10 (name string) USING iceberg;')
    spark.sql("INSERT INTO my_catalog.table10 VALUES ('Alex'), ('Dipankar'), ('Jason')")
  5. Example that doesn't work:

    from sedona.spark import SedonaContext
    
    spark = (
        SedonaContext.builder()
        .master('spark://localhost:7077')
        .config(
            'spark.jars.packages',
            # sedona
            'org.apache.sedona:sedona-spark-3.5_2.12:1.7.0,'
            'org.datasyslab:geotools-wrapper:1.7.0-28.5,'
            # iceberg
            'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,'
            'org.apache.iceberg:iceberg-aws-bundle:1.7.1,'
            'org.postgresql:postgresql:42.7.4',
        )
        # .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
        .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .config('spark.sql.catalog.my_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .config('spark.sql.catalog.my_catalog.type', 'jdbc')
        .config('spark.sql.catalog.my_catalog.uri', 'jdbc:postgresql://localhost:5500/postgres')
        .config('spark.sql.catalog.my_catalog.jdbc.user', 'postgres')
        .config('spark.sql.catalog.my_catalog.jdbc.password', 'postgres')
        .config('spark.sql.catalog.my_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .config('spark.sql.catalog.my_catalog.warehouse', 's3://data-lakehouse')
        .config('spark.sql.catalog.my_catalog.s3.region', 'us-east-1')
        .config('spark.sql.catalog.my_catalog.s3.endpoint', 'http://YOUR_IP_ADRESS:5561')
        .config('spark.sql.catalog.my_catalog.s3.access-key-id', 'admin')
        .config('spark.sql.catalog.my_catalog.s3.secret-access-key', 'password')
        .config('spark.sql.catalog.my_catalog.s3.path-style-access', 'true')
        .getOrCreate()
    )
    
    spark.sql('CREATE TABLE my_catalog.table8 (name string) USING iceberg;')
    spark.sql("INSERT INTO my_catalog.table8 VALUES ('Alex'), ('Dipankar'), ('Jason')")

I'm using the following pyspark and sedona versions:

"apache-sedona[spark]>=1.7.0",
"pyspark>=3.5.3",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants