Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error on rawDf.show() #1723

Open
angelosnm opened this issue Dec 15, 2024 · 6 comments
Open

error on rawDf.show() #1723

angelosnm opened this issue Dec 15, 2024 · 6 comments

Comments

@angelosnm
Copy link

I have set up a standalone Spark cluster where PySpark jobs are being sent. These jobs are having the below config where S3/MinIO is being used as HDFS (using the S3A package) to read raster files:

config = (
    SedonaContext.builder()
    .master(spark_endpoint) \
    .appName("RasterProcessingWithSedona") \
    .config("spark.driver.host", socket.gethostbyname(socket.gethostname())) \
    .config("spark.driver.port", "2222") \
    .config("spark.blockManager.port", "36859") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "10g") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key_id) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_access_key) \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config(
        'spark.jars.packages',
        'org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.6.1,'
        'org.datasyslab:geotools-wrapper:1.6.1-28.2'
    )
    .getOrCreate()
)

Then, the raster/tif files are being accessed as per below:

raster_path = "s3a://data/BFA"

rawDf = sedona.read.format("binaryFile").option("recursiveFileLookup", "true").option("pathGlobFilter", "*.tif*").load(raster_path)
rawDf.createOrReplaceTempView("rawdf")
rawDf.show()

And this code. returns the error mentioned in the "Actual behavior" entry.

If this code runs under local mode it runs normally

Expected behavior

image

Actual behavior

Py4JJavaError: An error occurred while calling o66.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 5) (192.168.18.112 executor 1): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4334)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4324)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4322)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4322)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)

Steps to reproduce the problem

Settings

Sedona version = 1.6.1

Apache Spark version = 3.5.2

Apache Flink version = N/A

API type = Python

Scala version = 2.12

JRE version = 1.8.0_432

Python version = 3.11.10

Environment = Standalone

Copy link

Thank you for your interest in Apache Sedona! We appreciate you opening your first issue. Contributions like yours help make Apache Sedona better.

@jiayuasu
Copy link
Member

jiayuasu commented Dec 16, 2024

@angelosnm the stacktrace above is incomplete. Can you find the place where it shows caused by?

@Jaeggi99
Copy link

Hello I have a similar problem when trying to load "larger" rasterfiles with 65 MB size, although my error message is different. Therefore I do not open a new issue.

Setup:
I use docker desktop and have pulled the image of sedona with

docker pull apache/sedona:latest

Then I run the container with:

docker run -e DRIVER_MEM=6g -e EXECUTOR_MEM=8g -p 8888:8888 -p 8080:8080 -p 8081:8081 -p 4040:4040 apache/sedona:latest

In Jupyter lab, I execute the code in the examples notebook ApacheSedonaRaster.ipynb and everything works fine.

When I copy my orthophoto swissimage-dop10_2021_2637-1223_01_2056.tif (attached below) into the examples datafolder and try to load and show it within the same examples notebook or in any other notebook, I get the following error message.

Notebook Code:

from sedona.spark import *
from IPython.display import display, HTML
config = (
    SedonaContext.builder()
    .getOrCreate()
)
sedona = SedonaContext.create(config)
sc = sedona.sparkContext
swissimage_df = sedona.read.format("binaryFile").load("data/raster/swissimage-dop10_2021_2637-1223_01_2056.tif")
swissimage_df.show(2))

Expected behavior:
A table like showing information about the raster:

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/opt/workspa...|2024-12-15 00:21:05|209199|[4D 4D 00 2A 00 0...|
+--------------------+-------------------+------+--------------------+

Actual behaviour:
Unfortunately I am aswell not able to capture more of the error log using stdout and stderr.
I tried the following function to capture the whole error message:

import sys
import os

logfile = "cell_output.log"
with open(logfile, "w") as f:
    # Redirect stdout and stderr
    original_stdout = sys.stdout
    original_stderr = sys.stderr
    sys.stdout = f
    sys.stderr = f
    try:
        print("This will be logged into the file.")
        swissimage_df = sedona.read.format("binaryFile").load("data/raster/swissimage-dop10_2021_2637-1223_01_2056.tif")
        swissimage_df.show(2)
    except Exception as e:
        import traceback
        traceback.print_exc()
    finally:
        sys.stdout = original_stdout
        sys.stderr = original_stderr

Resulting error message:

Traceback (most recent call last):
  File "/tmp/ipykernel_45/2611185478.py", line 16, in <module>
    swissimage_df.show(2)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/dataframe.py", line 899, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.10/dist-packages/pyspark/errors/exceptions/captured.py", line 169, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.10/dist-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o45.showString.
: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Formatter.parse(Formatter.java:2807)
	at java.base/java.util.Formatter.format(Formatter.java:2763)
	at java.base/java.util.Formatter.format(Formatter.java:2717)
	at java.base/java.lang.String.format(String.java:4150)
	at scala.collection.immutable.StringLike.format(StringLike.scala:354)
	at scala.collection.immutable.StringLike.format$(StringLike.scala:353)
	at scala.collection.immutable.StringOps.format(StringOps.scala:33)
	at org.apache.spark.sql.Dataset.$anonfun$getRows$5(Dataset.scala:293)
	at org.apache.spark.sql.Dataset.$anonfun$getRows$5$adapted(Dataset.scala:293)
	at org.apache.spark.sql.Dataset$$Lambda$3379/0x0000000802066638.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$177/0x0000000801203f00.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofByte.foreach(ArrayOps.scala:210)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofByte.map(ArrayOps.scala:210)
	at org.apache.spark.sql.Dataset.$anonfun$getRows$4(Dataset.scala:293)
	at org.apache.spark.sql.Dataset$$Lambda$3378/0x0000000802066278.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$177/0x0000000801203f00.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.Dataset.$anonfun$getRows$3(Dataset.scala:290)
	at org.apache.spark.sql.Dataset$$Lambda$3377/0x0000000802065eb8.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$177/0x0000000801203f00.apply(Unknown Source)

But with the provided docker image and orthophoto tif file you should be able to reproduce the error.

As I use the latest docker image I do not add the versions of the several packages. The mentioned orthophoto is to large (65 MB), so I cannot attach it, but with the following download link, one can directly access it from the site of the Federal Office for Topography of Switzerland:
https://data.geo.admin.ch/ch.swisstopo.swissimage-dop10/swissimage-dop10_2021_2637-1223/swissimage-dop10_2021_2637-1223_0.1_2056.tif

If there are any further questions, I am happy to answer.

@angelosnm
Copy link
Author

@jiayuasu I was monitoring the Spark Web UI console and nothing specific was logged...

My guess is that it was a network related issue. I am using a Jupyter based container image inside a k8s cluster where I send PySpark jobs to a spark cluster that is in the same local network with the k8s cluster but referencing it externally by exposing the master endpoint to the global internet.

This setup introduces lots of network configurations that need to be done on both clusters (such as having to route k8s pods CIDR from the Spark nodes, set up networking policies on k8s, etc).

I just deployed Spark inside the k8s cluster (using the official apache/spark image) and the code ran successfully!

@jiayuasu
Copy link
Member

@Jaeggi99 Your error message is pretty clear: : java.lang.OutOfMemoryError: Java heap space

This is a separate issue which is irrelevant to this ticket.

In your case, the image is too large (65MB) so it explodes the driver memory of Spark. I suggest you use the out-db raster mode of WherobotsDB to load it: https://docs.wherobots.com/latest/tutorials/wherobotsdb/raster-data/raster-load/#create-an-out-db-raster-type-column . WherobotsDB has a free tier and you can play with it.

@Jaeggi99
Copy link

Thank you for clarifying. Although the symptom of outOfMemory was clear, I thought maybe that Sedona or Spark could be the cause of the problem. Because I was able to load and show a 2.3 GB large geopackage with vector data, therefore as a newbie I was confused that Sedona/Spark is not able to handle a 65 MB tif.

Now I now and will try out your suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants