Skip to content

Instantly share code, notes, and snippets.

@rjurney
Last active January 11, 2025 07:49
Show Gist options
  • Save rjurney/6abeffbd59c67df5e5243c8f6619b6bf to your computer and use it in GitHub Desktop.
Save rjurney/6abeffbd59c67df5e5243c8f6619b6bf to your computer and use it in GitHub Desktop.
GraphFrames Connected Components OutOfMemoryError in Java 11 on TINY Graph...
I can't figure out why this unit test is failing with this error:
> [error] Uncaught exception when running org.graphframes.lib.ConnectedComponentsSuite: java.lang.OutOfMemoryError: Java
heap space sbt.ForkMain$ForkError: java.lang.OutOfMemoryError: Java heap space
The test is an 8 node, 6 edge graph of two components and two dangling vertices. WTF heap space? I cleaned up the `Dockerfile`
below because it was on wonky versions and tried the same commands there... no go. Same exception. The weird thing is that
CI does pass these tests... so I don't get what is going wrong.
HOW YOU CAN HELP: Please run this command and tell me if the tests pass:
> build/sbt clean compile package test
Thanks!
package org.graphframes.lib
import java.io.IOException
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.storage.StorageLevel
import org.graphframes._
import org.graphframes.GraphFrame._
import org.graphframes.examples.Graphs
class ConnectedComponentsSuite extends SparkFunSuite with GraphFrameTestSparkContext {
...
test("two components and two dangling vertices") {
val vertices = spark.range(8L).toDF(ID)
val edges = spark.createDataFrame(Seq(
(0L, 1L), (1L, 2L), (2L, 0L),
(3L, 4L), (4L, 5L), (5L, 3L)
)).toDF(SRC, DST)
val g = GraphFrame(vertices, edges)
val components = g.connectedComponents.run()
val expected = Set(Set(0L, 1L, 2L), Set(3L, 4L, 5L), Set(6L), Set(7L))
assertComponents(components, expected)
}
}
FROM ubuntu:22.04
ARG PYTHON_VERSION=3.11
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y wget bzip2 build-essential openjdk-11-jdk ssh sudo && \
apt-get clean
# Install Spark and update env variables.
ENV SCALA_VERSION "2.12.20"
ENV SPARK_VERSION "3.5.4"
ENV SPARK_BUILD "spark-${SPARK_VERSION}-bin-hadoop3"
ENV SPARK_BUILD_URL "https://dist.apache.org/repos/dist/release/spark/spark-${SPARK_VERSION}/${SPARK_BUILD}.tgz"
RUN wget --quiet "$SPARK_BUILD_URL" -O /tmp/spark.tgz && \
tar -C /opt -xf /tmp/spark.tgz && \
mv /opt/$SPARK_BUILD /opt/spark && \
rm /tmp/spark.tgz
ENV SPARK_HOME /opt/spark
ENV PATH $SPARK_HOME/bin:$PATH
ENV PYTHONPATH /opt/spark/python/lib/py4j-0.10.9.2-src.zip:/opt/spark/python/lib/pyspark.zip:$PYTHONPATH
ENV PYSPARK_PYTHON python
# The graphframes dir will be mounted here.
VOLUME /mnt/graphframes
WORKDIR /mnt/graphframes
ENTRYPOINT /bin/bash
build/sbt test
Using /usr/lib/jvm/java-11-openjdk-amd64 as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /home/rjurney/Software/graphframes/project
[info] Set current project to graphframes (in build file:/home/rjurney/Software/graphframes/)
[info] LabelPropagationSuite:
25/01/10 17:51:31 WARN Utils: Your hostname, heracles resolves to a loopback address: 127.0.0.1; using 10.1.10.3 instead
(on interface eno1)
25/01/10 17:51:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/01/10 17:51:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java
classes where applicable
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
(file:/home/rjurney/.ivy2/cache/org.apache.spark/spark-unsafe_2.12/jars/spark-unsafe_2.12-3.5.3.jar) to constructor
java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
...
[info] ConnectedComponentsSuite:
[info] - default params
[info] - empty graph
[info] - single vertex
[info] - disconnected vertices
[info] - two connected vertices
[info] - chain graph
[info] - star graph
[info] - two blobs
[info] - two components
[info] - one component, differing edge directions
[info] - two components and two dangling vertices
[info] org.graphframes.lib.ConnectedComponentsSuite *** ABORTED ***
[info] java.lang.OutOfMemoryError: Java heap space
[info] at java.base/java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:86)
[info] at java.base/java.lang.StringBuilder.<init>(StringBuilder.java:116)
[info] at org.apache.spark.sql.catalyst.util.StringConcat.toString(StringUtils.scala:62)
[info] at org.apache.spark.sql.catalyst.util.StringUtils$PlanStringConcat.toString(StringUtils.scala:152)
[info] at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:254)
[info] at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:777)
[info] at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:285)
[info] at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3652/0x0000000841541840.apply$mcVJ$sp(Unknown Source)
[info] at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
[info] at scala.Option.foreach(Option.scala:407)
[info] ...
[error] Uncaught exception when running org.graphframes.lib.ConnectedComponentsSuite: java.lang.OutOfMemoryError: Java heap space
sbt.ForkMain$ForkError: java.lang.OutOfMemoryError: Java heap space
at java.base/java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:86)
at java.base/java.lang.StringBuilder.<init>(StringBuilder.java:116)
at org.apache.spark.sql.catalyst.util.StringConcat.toString(StringUtils.scala:62)
at org.apache.spark.sql.catalyst.util.StringUtils$PlanStringConcat.toString(StringUtils.scala:152)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:254)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:777)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:285)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3652/0x0000000841541840.apply$mcVJ$sp(Unknown Source)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3621/0x0000000841526040.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$5080/0x0000000841c3b040.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3327/0x000000084143b840.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3320/0x0000000841437040.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1469/0x0000000840c29440.apply(Unknown Source)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
...
[info] Run completed in 1 minute, 25 seconds.
[info] Total number of tests run: 97
[info] Suites: completed 15, aborted 1
[info] Tests: succeeded 97, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error during tests:
[error] org.graphframes.lib.ConnectedComponentsSuite
[error] (test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 88 s, completed Jan 10, 2025, 10:03:11 PM
# Original... fails with OutOfMemoryError
export PYSPARK_SUBMIT_ARGS="--driver-memory 2g --executor-memory 2g --jars $JAR_PATH pyspark-shell "
# Even this fails with OutOfMemoryError
export PYSPARK_SUBMIT_ARGS="--driver-memory 32g --executor-memory 16g --jars $JAR_PATH pyspark-shell "
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment