Pandas to spark

Pandas to spark DEFAULT

Moving from Pandas to Spark

Most data science workflows start with Pandas. Pandas is an awesome library that lets you do a variety of transformations and can handle different kinds of data such as CSVs or JSONs etc. I love Pandas — I made a podcast on it called “Why Pandas is the New Excel”. I still think Pandas is an awesome library in a data scientist’s arsenal. However, there comes up a point where the datasets you are working on get too big and Pandas starts running out of memory. It is here that Spark comes into the picture.

I am writing this blog post in a Q and A format with questions you might have and I also had when I was getting started.

Q1. What is Spark?

Spark is a framework for handling massive datasets. It lets you process big data files in a distributed manner. It spins up several workers which handle and process chunks of your large dataset, all orchestrated by a driver node. The distributed nature of this framework lets you scale this to TBs of data. You are no longer limited by the RAM of any single machine. Spark ecosystem has evolved so much now that you don’t need to worry about any of the worker orchestration, and can use it out-of-the-box with great performance.

Q2. When should I move out of Pandas and seriously consider using Spark?

It depends on the RAM of your machine. I’d say datasets larger than 10GB start to get too big for Pandas, and Spark becomes really beneficial here. Say you have 10 columns in your dataset with each cell being characters and hence approx bytes and most characters are ASCII and can be encoded in 1 byte— then 10M rows would be about the place where you should think of Spark.

Q3. Does Spark do everything better than Pandas?

No! For beginners Pandas is definitely much easier to learn. Spark can be harder but with recent APIs, you can work with large data with dataframes which are almost as easy to work with as Pandas dataframes.

Also, till recently, Spark did not have great support for visualizations. You had to do visualizations on only subsets of data. It recently changed when Databricks announced that they will have native support for visualizations in Spark (I am still waiting to see this). But till that matures, Spark will not fully replace Pandas atleast in visualization land. You can always convert Spark dataframe to Pandas via and then run visualizations or Pandas code.

Q4. Spark is scary to get set up. What should I do?

Spark can either be interacted with in Python via PySpark or Scala (or R or SQL). I have written a blog on getting started with PySpark locally or on custom servers— and people have commented on how tough it can be to get started. I think you can go straight to managed cloud solutions in trying to run Spark.

I’d recommend 2 ways to get started with Spark:
1. Databricks — it is a fully managed service that manages Spark clusters in AWS / Azure / GCP for you. They have notebooks that you can run which are very similar to Jupyter notebooks.
2. Amazon EMR and Zeppelin notebooks — it is semi-managed service by AWS. You need to host a Spark EMR endpoint and then run Zeppelin notebooks to interact with it. Other cloud providers have similar services and I am not including them here.

Q5. Between Databricks and EMR which one is better?

Having spent several hours on both trying to see pros and cons of each, there are some considerations:
a) EMR is fully managed by Amazon and you don’t need to leave AWS ecosystem.
b) If you have devops expertise or have devops person helping you, EMR can be a cheaper option — you need to know how to spin up and down instances after being done with them. That being said, EMR can be unstable and you might need to spend several hours debugging. Databricks Spark is much more stable.
c) It is easy to schedule jobs with Databricks — you can schedule notebooks to be run on specific times of the day or week very easily. They also provide an interface to metrics in Ganglia UI.
d) Databricks jobs can be 30–40% more expensive for Spark jobs than EMR. But given the flexibility and stability and great customer support, I think they are worth it. Databricks does charge 6–7x for running notebooks interactively in Spark — so please be mindful of that. Given that after 30/60/ minutes of activity you can turn instances down and thus saving costs, I still think they can be overall cheaper.

Given these points, I’d say that you should go for Databricks if this is your first Spark project but if you have a good amount of DevOps expertise you can try EMR or running Spark on your own machines. If you don’t mind sharing your work publicly, you can try Databricks community edition for free or do a 14 day trial with their enterprise edition.

Q6. How similar or different PySpark is compared to Pandas?

I feel this deserves its own blog. This talk by Andrew Ray who is a Spark contributor should answer some of your questions.
Key similarities are:
a) Spark dataframes are very similar to Pandas dataframes.
b) PySpark’s groupby, aggregations, selection and other transformations are all very similar to Pandas. PySpark is little harder and has a bit of a learning curve compared to Pandas — but feel similar.

Key differences are:
a) Spark lets you query the dataframes with both SQL and Python — which I feel is really great. Sometimes some logic can be easier to write in SQL than remember exact API in Pandas/PySpark and you can do so and work interchangeably.
b) Spark dataframes are immutable. No slicing, overwriting of data etc allowed.
c) Spark does lazy evaluations. It builds a graph of all the transformations and then lazily evaluates them when you actually provide an action such as or or . Transformations can be either wide (look at entire data across all nodes so or ) or narrow (look at individual data in each node so or ) Doing several wide transformations can be slower compared to narrow transformations. Compared to Pandas you need to much more mindful of the wide transformations you are using!

Q7. Is there any other advantage to Spark?

Spark provides not just dataframes (which are higher level abstractions over RDDs) but also great APIs for streaming data and distributed machine learning via MLLib. So if you want to do transformations on streaming data or want to do machine learning on large datasets — Spark can be useful for you.

Q8. Any examples of data pipeline architectures with Spark?

Yes, here is an ETL pipeline where raw data is processed from a data lake (S3) and transformed in Spark, loaded back into S3 and then loaded into a data warehouse like Snowflake or Redshift which then powers a BI tool like Tableau or Looker.

Sours: https://towardsdatascience.com/moving-from-pandas-to-spark-7b0b7dadb

Python has gathered a lot of popularity and interest as a choice of language for data analysis due to its active community with a vast selection of libraries and resources. It has the potential to become a common language for data science and the production of web-based analytics products. I am Anchal Gupta, a Machine Learning Engineer at Indellient. In this blog post, I will be discussing the shift from Pandas Dataframes to Spark Dataframes. 

In data science, machine learning is one of the significant elements used to maximize value from data. Therefore, it becomes essential to study the distribution and statistics of the data to get useful insights.

With the availability of a wide variety of packages and libraries like NumPy, Pandas, Scikit-learn, Keras, TensorFlow, etc., the math and calculations get easier and therefore, Python has become the most preferred data analysis and machine learning tool. 

From a broader perspective, data can be either in a structured or unstructured format. 

Structured data  is comprised of clearly defined data schema and form, while semi-structured and unstructured data  includes formats like json, audio, video, and social media postings. 

An application, such as cognitive pricing models, has most of the data in a well-defined structure. One of the most commonly used packages to analyze and manipulate this type of data is Pandas

What is Python Pandas? 

Pandas stands for “Python Data Analysis Library”. Whoever knew that?  

It is an open-source python package that is highly efficient and easy to use data structure for data analysis.  

When to Use Python Pandas? 

Pandas is an ideal tool for data wrangling. Python Pandas is intended for quick and easy data manipulation tasks such as:

  • Reading
  • Visualization
  • Aggregation

Pandas can read data from most of the file formats such as:

Its extended features also allow us to connect to a SQL database.

After reading the data, Pandas creates a Python object in row columnar format, also known as a data frame. The data frame is very similar to a table in an excel spreadsheet. 

What can we do using Pandas Dataframe? 

  • Data Manipulation tasks such as indexing, renaming, sorting, merging data frame. 
  • Modifying the definition by updating, adding and deleting columns from a data frame. 
  • Cleaning and data preparation by imputing missing data or NANs 

With all the above listed advantages, it has some drawbacks too. 

Why Pandas failed for us? 

With exponentially growing data where we have billions of rows and columns, complex operations like merging or grouping of data require parallelization and distributed computing. These operations are very slow and quite expensive and become difficult to handle with a Pandas dataframe, which does not support parallelization

Therefore, to build scalable applications, we need packages or software that can be faster and support parallelization for large datasets. 

At Indellient we faced a similar problem while working for one of our clients. We had to merge two data frames with millions of rows, and the resulting data frame was to be of around 39 billion rows.  

While exploring options to overcome the above problem, we found Apache Spark to be the best alternative. 

What is Apache Spark? 

Apache Spark is an open-source cluster computing framework. It provides up to times faster performance for a few applications with in-memory primitives.

Spark is suitable for machine learning algorithms, as it allows programs to load and query data repeatedly.  

  • It runs on memory (RAM), which makes the processing faster than on disk drives. It can be used for creating data pipelines, running machine learning algorithms, and much more.   
  • Operations on Spark Dataframe run in parallel on different nodes in a cluster, which is not possible with Pandas as it does not support parallel processing. 
  • Apache Spark has easy-to-use APIs for operating on large datasets and across languages: Python, R, Scala and Java. 

Apche Spark also supports different types of data structures: 

  • Resilient Distributed Datasets (RDD)
  • Data frames
  • Datasets

Spark Data frames are more suitable for structured data where you have a well-defined schema whereas RDD’s are used for semi and unstructured data.  

Computation times comparison Pandas vs. Apache Spark 

While running multiple merge queries for a million rows data frame, pandas ran out of memory. An Apache Spark data frame, on the other hand, did the same operation within 10 seconds. Since the Pandas dataframe is not distributed, processing in the Pandas dataframe will be slower for a large amount of data.  

Deciding Between Pandas and Spark

Pandas dataframes are in-memory and single-server, so their size is limited by your server memory and you will process them with the power of a single server.

The advantages of using Pandas instead of Apache Spark are clear:

  • no need for a cluster
  • more straightforward
  • more flexible
  • more libraries
  • easier to implement
  • better performance when scalability is not an issue.

Spark dataframes are excellent to build a scalable application as they are distributed on your spark cluster. If you need to put more data, you only need to add more nodes in your cluster. 

New to Apache Spark?

P.S. Koalas by Databricks 

After reading the blog you must be concerned that you have to learn one more technology and memorize some more syntax to implement Spark. Let me solve this problem too, by introducing you to “Koalas”. 

Koalas: Easy Transition from pandas to Apache Spark”  

Pandas is a great tool to analyze small datasets on a single machine. When the need for bigger datasets arises, users often choosPyspark. However, the converting code from pandas to Pyspark is not easy a Pyspark API are considerably different from Pandas APIs. Koalas makes the learning curve significantly easier by providing pandas-like APIs on the top of PySpark. With Koalas, users can take advantage of the benefits of PySpark with minimal efforts, and thus get to value much faster. [1] 

[1] https://databricks.com/blog//04/24/koalas-easy-transition-from-pandas-to-apache-spark.html 



Tags data engineeringData Sciencemachine learningtechnicalSours: https://www.indellient.com/blog/a-journey-from-pandas-to-spark-data-frames/
  1. Big w cat
  2. Gypsy soul tribe
  3. Travel nurse indeed

Stop using Pandas and start using Spark with Scala

Moving from Pandas to Spark with Scala isn’t as challenging as you might think, and as a result your code will run faster and you’ll probably end up writing better code.

In my experience as a Data Engineer, I’ve found building data pipelines in Pandas often requires us to regularly increase resources to keep up with the increasing memory usage. In addition, we often see many runtime errors due to unexpected data types or nulls. As a result of using Spark with Scala instead, solutions feel more robust and easier to refactor and extend.

In this article we’ll run through the following:

  1. Why you should use Spark with Scala over Pandas
  2. How the Scala Spark API really isn’t too different from the Pandas API
  3. How to get started using either a Jupyter notebook or your favourite IDE
  • Spark is an Apache open-source framework
  • It can be used as a library and run on a “local” cluster, or run on a Spark cluster
  • On a Spark cluster the code can be executed in a distributed way, with a single master node and multiple worker nodes that share the load
  • Even on a local cluster you will still see performance improvements over Pandas, and we’ll go through why below

Spark has become popular due to its ability to process large data sets at speed

  • By default, Spark is multi-threaded whereas Pandas is single-threaded
  • Spark code can be executed in a distributed way, on a Spark Cluster, whereas Pandas runs on a single machine
  • Spark is lazy, which means it will only execute when you collect (ie. when you actually need to return something), and in the meantime it builds up an execution plan and finds the optimal way to execute your code
  • This differs to Pandas, which is eager, and executes each step as it reaches it
  • Spark is also less likely to run out of memory as it will start using disk when it reaches its memory limit

For a visual comparison of run time see the below chart from Databricks, where we can see that Spark is significantly faster than Pandas, and also that Pandas runs out of memory at a lower threshold.

Spark has a rich ecosystem

  • Data science libraries such as Spark ML, which is built in, or Graph X for graph algorithms
  • Spark Streaming for real time data processing
  • Interoperability with other systems and file types (orc, parquet, etc.)

Spark provides a familiar API, so using Scala instead of Python won’t feel like a huge learning curve. Here are few reasons why you might want to use Scala:

  • Scala is a statically typed language, which means you’ll find your code will likely have fewer runtime errors than with Python
  • Scala also allows you to create immutable objects, which means when referencing an object you can be confident its state hasn’t been mutated in between creating it and calling it
  • Spark is written in Scala, so new features are available in Scala before Python
  • For Data Scientists and Data Engineers working together, using Scala can help with collaboration, because of the type safety and immutability of Scala code
  • DataFrame: a spark DataFrame is a data structure that is very similar to a Pandas DataFrame
  • Dataset: a Dataset is a typed DataFrame, which can be very useful for ensuring your data conforms to your expected schema
  • RDD: this is the core data structure in Spark, upon which DataFrames and Datasets are built

In general, we’ll use Datasets where we can, because they’re type safe, more efficient, and improve readability as it’s clear what data we can expect in the Dataset.

To create our Dataset we first need to create a case class, which is similar to a data class in Python, and is really just a way to specify a data structure.

For example, let’s create a case class called FootballTeam, with a few fields:

case class FootballTeam(
name: String,
league: String,
matches_played: Int,
goals_this_season: Int,
top_goal_scorer: String,
wins: Int
)

Now, let’s create an instance of this case class:

val brighton: FootballTeam =
FootballTeam(
"Brighton and Hove Albion",
"Premier League",
matches_played = 29,
goals_this_season = 32,
top_goal_scorer = "Neil Maupay",
wins = 6
)

Let’s create another instance called manCity and now we’ll create a Dataset with these two FootballTeams:

val teams: Dataset[FootballTeam] = spark.createDataset(Seq(brighton,
manCity))

Another way to do this is:

val teams: Dataset[FootballTeam] =
spark.createDataFrame(Seq(brighton, manCity)).as[FootballTeam]

The second way can be useful when reading from an external data source and returning a DataFrame, as you can then casting to your Dataset, so that we now have a typed collection.

Most (if not all) of the data transformations you can apply to Pandas DataFrames, are available in Spark. There are of course differences in syntax, and sometimes additional things to be aware of, some of which we’ll go through now.

In general, I’ve found Spark more consistent in notation compared with Pandas and because Scala is statically typed, you can often just do and wait for your compiler to tell you what methods are available!

Let’s start with a simple transformation, where we just want to add a new column to our Dataset, and assign it constant value. In Pandas this looks like:

Pandasdf_teams['sport'] = 'football'

There’s a small difference in Spark, besides syntax, and that’s that adding a constant value to this new field requires us to import a spark function called lit.

Sparkimport org.apache.spark.sql.functions.litval newTeams = teams.withColumn("sport", lit("football"))

Note that we’ve created a new objectas our original teams dataset is a val, which means it’s immutable. This is a good thing as we know that whenever we use our teams Dataset, we always get the same object.

Now let’s add a column based on a function. In Pandas this will look like:

Pandasdefis_prem(league):
if league == 'Premier League':
returnTrue
else:
returnFalse
df_teams['premier_league'] = df_teams['league'].apply(lambda x:
is_prem(x))

To do the same in Spark, we need to serialise the function so that Spark can apply it. This is done using something called UserDefinedFunctions. We’ve also used a case match, as this is a nicer implementation in Scala than the if-else, but either will work.

We will also need to import another useful spark function, col, which is used to refer to a column.

Sparkimport org.apache.spark.sql.functions.coldefisPrem(league: String): Boolean =
league match {
case "Premier League" => true
case _ => false
}
val isPremUDF: UserDefinedFunction =
udf[Boolean, String](isPrem)
val teamsWithLeague: DataFrame = teams.withColumn("premier_league",
isPremUDF(col("league")))

Now that we’ve added a new column that isn’t in our case class, this will convert it back to a DataFrame. So we either need to add another field to our original case class (and allow it to be nullable, using Options), or create a new case class.

An Option in Scala just means the field is nullable. If the value is null we use , and if populated we use . An example of an optional string:

val optionalString : Option[String] = Some("something")

To get the string from this we can call , and this will just return . Note that if we’re not sure whether it will be null or not, we can use which will return the string if null.

Filtering a Dataset is another common requirement, which is a good example of where Spark is more consistent than Pandas, as it follows the same pattern as other transformations, where we do dataset “dot” transformation (ie ).

Pandasdf_teams = df_teams[df_teams['goals_this_season'] > 50]
Sparkval filteredTeams = teams.filter(col("goals_this_season") > 50)

We are likely to need to perform some aggregations on our dataset, which is very similar in Pandas and Spark.

Pandasdf_teams.groupby(['league']).count()
Sparkteams.groupBy("league").count()

For multiple aggregations, we can again do something similar to Pandas, with a map of field to aggregation. If we want to do our own aggregations we can use UserDefinedAggregations.

teams.agg(Map(
"matches_played" -> "avg",
"goals_this_season" -> "count"))

Often we also want to combine multiple Datasets, which may be with union:

Pandaspd.concat([teams, another_teams], ignore_index=True)
Spark
teams.unionByName(anotherTeams)

… or with a join:

val players: Dataset[Player] = spark
.createDataset(Seq(neilMaupey, sergioAguero))
teams.join(players,
teams.col("top_goal_scorer") === players.col("player_name"),
"left"
).drop("player_name")

In this example we have also created a new Dataset, this time using a case class called Player. Note that this case class has a field injury, which can be null.

caseclassPlayer(player_name: String, goals_scored: Int,
injury: Option[String])

Notice that we’ve dropped the player_name column as this will be a duplicate of top_goal_scorer.

We may also want parts of our code to just use Scala native data structures such as Arrays, Lists, etc. To get one of our columns as an Array, we need to map to our value and call .

val teamNames: Array[String] = teams.map(team => team.name)
.collect()

Note that we’re able to use the case class’ inbuilt getters to return the name field, and this won’t compile if name is not a field in our class FootballTeam.

As an aside, we can add functions to our case classes too, and both values and functions will come up as options for autocompletion when using an IDE such as IntelliJ or vs code with Metals plugin.

To filter our Dataset based on whether it exists in this Array we need to treat it as a sequence of args, by calling _*.

val filteredPlayers: Dataset[Player] = players
.filter(col("team").isin(teamNames: _*))

At this point hopefully you’re keen to have a go at writing some Spark code, even if just to see whether my claim that it’s not too different from Pandas stands up.

To get started, we have a couple of options. We can use a notebook, which is a quick way to get some data and start playing around. Alternatively, we can set up a simple project. Either way you’ll need Java 8 installed.

For this example we’re going to use a spylon kernel in a Jupyter notebook. https://pypi.org/project/spylon-kernel/. First run the following commands to set up your notebook, which should open up your notebook in a browser. Then select the spylon-kernel from your available kernels.

pip install spylon-kernel
python -m spylon_kernel install
jupyter notebook

Let’s check we have the correct Java version by adding the following to a cell:

!java -version

The output should be:

java version "_"
Java(TM) SE Runtime Environment (build _b12)
Java HotSpot(TM) Bit Server VM (build b12, mixed mode)

If not, check JAVA_HOME in your bash profile, and ensure it’s pointing to Java 8.

The next step is to install some dependencies. To do this we can add the following code snippet to a new cell. This sets up some spark config and also allows you to add dependencies. Here I’ve added a visualisation library called vegas.

%%init_spark
launcher.num_executors = 4
launcher.executor_cores = 2
launcher.driver_memory = '4g'
launcher.conf.set("spark.sql.catalogImplementation", "hive")
launcher.packages = ["org.vegas-viz:vegas_",
"org.vegas-viz:vegas-spark_"]

To connect to our data source we can define a function, perhaps something like this:

defgetData(file: String): DataFrame =
spark.read
.format("csv")
.option("header", "true")
.load(file)

This is a connection to a csv file but there are lots of other data sources we can connect to. This function returns a DataFrame, which we may want to convert to a Dataset:

val footballTeams: Dataset[FootballTeam] =
getData("footballs_teams.csv").as[FootballTeam]

We can then start working with this data and have a go at some of the data transformations we discussed, and many more.

Now that you’ve had a go at playing around with some data, you might want to set up a project.

The two main things to include:

  • build.sbt - where previously we added our dependencies in one of our notebook cells, now we need to add them to our build.sbt file
  • SparkSession - in the notebook we already had a spark session, which meant we were able to do things such as . In our project we need to create this spark session

Example build.sbt:

name := "spark-template"
version := ""
scalaVersion := ""val sparkVersion = ""
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion

Example SparkSession:

import org.apache.spark.sql.SparkSession trait SparkSessionWrapper { val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("spark-example")
.getOrCreate()
}

We can then extend objects with this wrapper, which gives us a spark session.

object RunMyCode extends SparkSessionWrapper { //your code here
}

You can then start writing your spark code!

To conclude, Spark is a great tool for fast data processing and is growing every more popular in the data world. As a result Scala is also becoming a more popular language, and due to its type safety can be a good choice for data engineers and data scientists, who may be more familiar with Python and Pandas. Spark is a great introduction to the language because we can use familiar concepts such as DataFrames, so it doesn’t feel like a huge learning curve.

Hopefully this has given you a quick overview, and perhaps enabled you to start exploring Spark, either within your notebook, or within your new project. Good luck!

Sours: https://towardsdatascience.com/stop-using-pandas-and-start-using-spark-with-scala-fc2e0

Optimize conversion between PySpark and pandas DataFrames

Apache Arrow is an in-memory columnar data format used in Apache Spark to efficiently transfer data between JVM and Python processes. This is beneficial to Python developers that work with pandas and NumPy data. However, its usage is not automatic and requires some minor changes to configuration or code to take full advantage and ensure compatibility.

PyArrow versions

PyArrow is installed in Databricks Runtime. For information on the version of PyArrow available in each Databricks Runtime version, see the Databricks runtime release notes.

Supported SQL types

All Spark SQL data types are supported by Arrow-based conversion except , of , and nested . is represented as a instead of . is supported only when PyArrow is equal to or higher than

Convert PySpark DataFrames to and from pandas DataFrames

Arrow is available as an optimization when converting a PySpark DataFrame to a pandas DataFrame with and when creating a PySpark DataFrame from a pandas DataFrame with . To use Arrow for these methods, set the Spark configuration to . This configuration is disabled by default.

In addition, optimizations enabled by could fall back to a non-Arrow implementation if an error occurs before the computation within Spark. You can control this behavior using the Spark configuration .

Example

importnumpyasnpimportpandasaspd# Enable Arrow-based columnar data transfersspark.conf.set("spark.sql.execution.arrow.enabled","true")# Generate a pandas DataFramepdf=pd.DataFrame(np.random.rand(,3))# Create a Spark DataFrame from a pandas DataFrame using Arrowdf=spark.createDataFrame(pdf)# Convert the Spark DataFrame back to a pandas DataFrame using Arrowresult_pdf=df.select("*").toPandas()

Using the Arrow optimizations produces the same results as when Arrow is not enabled. Even with Arrow, results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data.

In addition, not all Spark data types are supported and an error can be raised if a column has an unsupported type. If an error occurs during , Spark falls back to create the DataFrame without Arrow.


Sours: https://docs.databricks.com/spark/latest/spark-sql/spark-pandas.html

To spark pandas

Bryan Cutler

If you are a Pandas or NumPy user and have ever tried to create a Spark DataFrame from local data, you might have noticed that it is an unbearably slow process. In fact, the time it takes to do so usually prohibits this from any data set that is at all interesting. Starting from Spark , the addition of SPARK enables creating a DataFrame from Pandas using Arrow to make this process much more efficient. You can now transfer large data sets to Spark from your local Pandas session almost instantly and also be sure that your data types are preserved. This post will demonstrate a simple example of how to do this and walk through the Spark internals of how it is accomplished.

A simple example to create a DataFrame from Pandas

For this example, we will generate a 2D array of random doubles from NumPy that is 1,, x We will then wrap this NumPy data with Pandas, applying a label for each column name, and use this as our input into Spark.

To input this data into Spark with Arrow, we first need to enable it with the below config. This could also be included in to be enabled for all sessions. Spark simply takes the Pandas DataFrame as input and converts it into a Spark DataFrame which is distributed across the cluster. Using Arrow, the schema is automatically transferred to Spark and data type information will be retained, but you can also manually specify the schema to override if desired.

Assuming an existing Spark session

That’s all there is to it! The Pandas DataFrame will be sliced up according to the number from which can be set by the conf “spark.default.parallelism” for the default scheduler. Depending on the size of the data you are importing to Spark, you might need to tweak this setting.

The above can be found as a notebook gist here to try out for yourself.

How it Works Behind the Scenes

The code path for this is pretty straight-forward and boils down to just a few key steps. All the work is done in from session.py, which is invoked from after the input is found to be a Pandas DataFrame and Arrow is enabled.

  1. Slice the Pandas DataFrame into chunks according to the number for default parallelism

  2. Convert each chunk of Pandas data into an Arrow

  3. Convert the schema from Arrow to Spark

  4. Send the es to the JVM which become a

  5. Wrap the JavaRDD with the Spark schema to create a

Let’s look at these steps in a bit more detail to examine performance. First, slicing the Pandas DataFrame is a cheap operation because it only uses references to the original data and does not make copies. Converting the slices to Arrow record batches will end up copying the data since it came from slices, but it is efficiently copied as chunks. Arrow can perform zero-copy conversions to/from Pandas data and will do so automatically when it is able to safely reference the data.

Step 3 will create a Spark schema from Arrow schema, which is a simple mapping. Arrow has detailed type definitions and supports all types available in Spark, however Spark only supports ya subset of Arrow types, so you might need to be careful what you are importing. For example a union type is supported in Arrow, but not Spark. At the time of writing this and are fully supported, see the Spark documentation for more info.

Step 4 is where the Arrow data is sent to the JVM. This is necessary in actualizing the DataFrame and will allow Spark to perform SQL operations completely within the JVM. Here the Arrow record batches are written to a temporary file in where they are read back in chunks by the JVM and then parallelized to an RDD. Writing to a temporary file was done to meld with existing code and is definitely much better than transferring the data over a call with Py4J. In practice, this works pretty well and doesn’t seem to be much of a bottleneck and I’m not sure if setting up a local socket to send the data would do better, but could be an area to check out in the future.

With all the above complete, the final step is done in which maps the partitions of the containing the Arrow record batches to an iterator and uses that along with the schema to construct the DataFrame.

Performance Comparison with Arrow Disabled

Here is a few benchmarks of comparing the wall-clock time of calling with and without Arrow enabled. The data used is random doubles similar to the example above, the column Size below is the total number of double values transferred. The runs were done on laptop in Spark local mode with default Spark settings, each timing is the best of 3 consecutive iterations.

SizeWith ArrowWithout Arrow
50, ms ms
, ms ms
, ms s
1,, ms s
5,, ms s
10,, ms63 s

I won’t get into the details of the code path of when Arrow is disabled, but there are a few reasons that make it inefficient. First, Spark does not look at the Pandas DataFrame to get data type information, it tries to infer itself. It can not make use of NumPy data chunks and must iterate over each record and read each value as a Python object. When it prepares the data to send to the JVM, it must serialize each scalar value in the pickle format. Finally, once on the JVM, it goes through another set of conversions to apply the proper Scala type.

Download this notebook to try out the above examples or here for the gist

Written on January 6,

Sours: https://bryancutler.github.io/createDataFrame/
Accelerating Data Processing in Spark SQL with Pandas UDFs

PySpark Usage Guide for Pandas with Apache Arrow

Apache Arrow in PySpark

Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.

Ensure PyArrow Installed

To use Apache Arrow in PySpark, the recommended version of PyArrow should be installed. If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel. See PyArrow installation for details.

Enabling for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call and when creating a Spark DataFrame from a Pandas DataFrame with . To use Arrow when executing these calls, users need to first set the Spark configuration to . This is disabled by default.

In addition, optimizations enabled by could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by .

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow, results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data. Not all Spark data types are currently supported and an error can be raised if a column has an unsupported type, see Supported SQL Types. If an error occurs during , Spark will fall back to create the DataFrame without Arrow.

Pandas UDFs (a.k.a. Vectorized UDFs)

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.

Before Spark , Pandas UDFs used to be defined with . From Spark with Python +, you can also use Python type hints. Using Python type hints are preferred and using will be deprecated in the future release.

Note that the type hint should use in all cases but there is one variant that should be used for its input or output type hint instead when the input or output column is of . The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of and as below:

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

In the following sections, it describes the combinations of the supported type hints. For simplicity, variant is omitted.

Series to Series

The type hint can be expressed as , &#; -> .

By using with the function having such type hints above, it creates a Pandas UDF where the given function takes one or more and outputs one . The output of the function should always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

The following example shows how to create this Pandas UDF that computes the product of 2 columns.

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see

Iterator of Series to Iterator of Series

The type hint can be expressed as -> .

By using with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of and outputs an iterator of . The length of the entire output from the function should be the same length of the entire input; therefore, it can prefetch the data from the input iterator as long as the lengths are the same. In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator of Series.

It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case. The pseudocode below illustrates the example.

The following example shows how to create this Pandas UDF:

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see

Iterator of Multiple Series to Iterator of Series

The type hint can be expressed as -> .

By using with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of a tuple of multiple and outputs an iterator of . In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series to Iterator of Series case.

The following example shows how to create this Pandas UDF:

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see

Series to Scalar

The type hint can be expressed as , &#; -> .

By using with the function having such type hints above, it creates a Pandas UDF similar to PySpark&#;s aggregate functions. The given function takes and returns a scalar value. The return type should be a primitive data type, and the returned scalar can be either a python primitive type, e.g., or or a numpy data type, e.g., or . should ideally be a specific scalar type accordingly.

This UDF can be also used with and . It defines an aggregation from one or more to a scalar value, where each represents a column within the group or window.

Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by and window operations:

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see

Pandas Function APIs

Pandas Function APIs can directly apply a Python native function against the whole by using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function API behaves as a regular API under PySpark instead of , and Python type hints in Pandas Functions APIs are optional and do not affect how it works internally at this moment although they might be required in the future.

From Spark , grouped map pandas UDF is now categorized as a separate Pandas Function API, . It is still possible to use it with and as it was; however, it is preferred to use directly. Using will be deprecated in the future.

Grouped Map

Grouped map operations with Pandas instances are supported by which requires a Python function that takes a and return another . It maps each group to each in the Python function.

This API implements the &#;split-apply-combine&#; pattern which consists of three steps:

  • Split the data into groups by using .
  • Apply a function on each group. The input and output of the function are both . The input data contains all the rows and columns for each group.
  • Combine the results into a new PySpark .

To use , the user needs to define the following:

  • A Python function that defines the computation for each group.
  • A object or a string that defines the schema of the output PySpark .

The column labels of the returned must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a .

Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.

The following example shows how to use to subtract the mean from each value in the group.

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see .

Map

Map operations with Pandas instances are supported by which maps an iterator of s to another iterator of s that represents the current PySpark and returns the result as a PySpark . The functions takes and outputs an iterator of . It can return the output of arbitrary length in contrast to some Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.

The following example shows how to use :

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see .

Co-grouped Map

Co-grouped map operations with Pandas instances are supported by which allows two PySpark s to be cogrouped by a common key and then a Python function applied to each cogroup. It consists of the following steps:

  • Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
  • Apply a function to each cogroup. The input of the function is two (with an optional tuple representing the key). The output of the function is a .
  • Combine the s from all groups into a new PySpark .

To use , the user needs to define the following:

  • A Python function that defines the computation for each cogroup.
  • A object or a string that defines the schema of the output PySpark .

The column labels of the returned must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a .

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.

The following example shows how to use to perform an asof join between two datasets.

Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

For detailed usage, please see .

Usage Notes

Supported SQL Types

Currently, all Spark SQL data types are supported by Arrow-based conversion except , of , and nested .

Setting Arrow Batch Size

Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf &#;spark.sql.execution.arrow.maxRecordsPerBatch&#; to an integer that will determine the maximum number of rows for each batch. The default value is 10, records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.

Timestamp with Time Zone Semantics

Spark internally stores timestamps as UTC values, and timestamp data that is brought in without a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp data is exported or displayed in Spark, the session time zone is used to localize the timestamp values. The session time zone is set with the configuration &#;spark.sql.session.timeZone&#; and will default to the JVM system local time zone if not set. Pandas uses a type with nanosecond resolution, , with optional time zone on a per-column basis.

When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds and each column will be converted to the Spark session time zone then localized to that time zone, which removes the time zone and displays values as local time. This will occur when calling or with timestamp columns.

When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This occurs when calling with a Pandas DataFrame or when returning a timestamp from a . These conversions are done automatically to ensure Spark will have data in the expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond values will be truncated.

Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is different than a Pandas timestamp. It is recommended to use Pandas time series functionality when working with timestamps in s to get the best performance, see here for details.

Recommended Pandas and PyArrow Versions

For usage with pyspark.sql, the supported versions of Pandas is and PyArrow is Higher versions may be used, however, compatibility and data correctness can not be guaranteed and should be verified by the user.

Compatibility Setting for PyArrow >= and Spark x, x

Since Arrow , a change in the binary IPC format requires an environment variable to be compatible with previous versions of Arrow <= This is only necessary to do for PySpark users with versions x and x that have manually upgraded PyArrow to The following can be added to to use the legacy Arrow IPC format:

This will instruct PyArrow >= to use the legacy IPC format with the older Arrow Java that is in Spark x and x. Not setting this environment variable will lead to a similar error as described in SPARK when running s or with Arrow enabled. More information about the Arrow IPC change can be read on the Arrow release blog.

Sours: https://spark.apache.org/docs//sql-pyspark-pandas-with-arrow.html

Now discussing:

While working with a huge dataset Python Pandas DataFrame are not good enough to perform complex transformation operations hence if you have a Spark cluster, it&#;s better to convert Pandas to PySpark DataFrame, apply the complex transformations on Spark cluster, and convert it back.

In this article, I will explain steps in converting Pandas to PySpark DataFrame and how to Optimize the Pandas to PySpark DataFrame Conversion by enabling Apache Arrow.

1. Create Pandas DataFrame

In order to convert Pandas to PySpark DataFrame first, let&#;s create Pandas DataFrame with some test data. In order to use pandas you have to import it first using

Operations on Pyspark run faster than Python Pandas due to its distributed nature and parallel execution on multiple cores and machines. In other words, pandas run operations on a single node whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets it&#;s a good option to consider PySpark. PySpark processes operations many times faster than pandas.

2. Convert Pandas to PySpark (Spark) DataFrame

Spark provides a method to convert Pandas to Spark DataFrame, Spark by default infers the schema based on the Pandas data types to PySpark data types.

If you want all data types to String use .

3. Change Column Names & DataTypes while Converting

If you wanted to change the schema (column name & data type) while converting Pandas to PySpark DataFrame, create a PySpark Schema using StructType and use it for the schema.

4. Use Apache Arrow to Convert Pandas to Spark DataFrame

Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM. You need to enable to use Arrow as this is disabled by default and have Apache Arrow (PyArrow) install on all Spark cluster nodes using or by directly downloading from Apache Arrow for Python.

You need to have Spark compatible Apache Arrow installed to use the above statement, In case if you have not installed Apache Arrow you get the below error.

When an error occurs, Spark automatically fallback to non-Arrow optimization implementation, this can be controlled by .

Note: Apache Arrow currently support all Spark SQL data types except ,  of , and nested .

5. Complete Example of Convert Pandas to Spark Dataframe

Belo is complete example to convert Pandas to PySpark DataFrame.

Conclusion

In this article, you have learned how easy to convert Pandas to Spark DataFrame and optimize the conversion using Apache Arrow (in-memory columnar format)

Happy Learning!!

Tags: Pandas

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

How to Convert Pandas to PySpark DataFrame
Sours: https://sparkbyexamples.com/pyspark/convert-pandas-to-pyspark-dataframe/


786 787 788 789 790