I have always had great contact with relational databases and later with distributed systems like Spark. Initially, I delved deeper into the DBMS, both to set up complex queries, administration and mainly how to put together a performative script for the DBMS. When I started working more with Spark and later with Databricks, initially I didn't have performance problems for the scenarios I had to build, but as the bigdata area really became bigdata I started to have performance problems in routines that increased by 30% every week and this made me look for how spark works 'under the hood', mainly because I already knew how a DBMS worked and this helped me understand some concepts that I will bring here.
Let's keep it brief as I want this article to focus on performance analysis scenarios, techniques and best practices.
This component is the basis of Spark, it is responsible for memory management, tasks, failure recovery, I/O management, in other words, it manipulates the RDD. Therefore, he is a guy who has a large part of the cluster.
This component is the real worker of the spark ecosystem (cluster), it is the one who receives the writing or reading orders (tasks), which can be on disk, memory or both (I will explain later why this comes into play). performance scenarios).
Workers are literally what they are for those already familiar with distributed computing, they are the nodes of the cluster, so it is what 'hosts' the executors I mentioned above, each worker can contain one or more executors. It is responsible for managing the resources allocated to the executors, as if the executor were an assistant and the worker was a warehouse worker. What if he is the warehouseman he reports to?
This is the manager, he manages resources (Memory and CPU) for the workers, he is the one who decides how many executors will be for each application and how much resource will be allocated, he manages the tasks sent by his 'boss' which I will explain further down, and as it is a higher position of responsibility, it also monitors the state of the cluster to recover from failures, redistributing tasks as necessary. (NOTE: there are several types of cluster managers: Yarn, mesos, kubernetes and the simplest which is standalone).
Well, this is the boss or the gateway, I say gateway because any Spark application will go through it, it is what allows the application to interact with the cluster, that is, the workers and executors, it which allows and manages tasks between workers and in this way it manages the entire application in terms of configuration, number of executors and resources such as memory. Do you need to know how tasks are being carried out? talk to this boss here.
So, in an illustrative way:
When I worked with the relational banking side and there were performance problems, mainly in procedures or functions or a query in an application, I analyzed the following aspects:
Well, I think that's it, now what do these points have in common with Apache Spark?
To summarize what each one is, despite the name, you can already get an idea:
Logical Plan:
Represents the original query as a series of logical operations. It is the abstract form of the query, without considering how it will actually be executed. Includes information about the operations that will be performed, such as filtering, selection, joining, aggregation and the wrong 'little things' too lol.
Physical Plane:
Details how Spark will actually execute the query. This includes the order of operations and which algorithms will be used (like DBMS algorithms). It may include details about how data will be partitioned and distributed among executors.
Execution Strategies:
The physical plane can show different execution strategies that Spark can use, such as "Broadcast Join" or "Shuffle Hash Join", depending on the operation and data size. I will also explain the main algorithms of the execution plan, calm down...
Estimated Cost:
Although not always displayed, some plans may include cost estimates for different parts of the plan, helping you understand which part of processing may be most costly.
We have the 'root' form which would be textual, using the explain() command and it will have an output similar to the one below showing a simple filter and a dataframe:
== Physical Plan ==
*(2) Filter (Value > 1)
- *(2) Project [Name#0, Value#1]
- *(1) Scan ExistingRDD[Name#0, Value#1]
And objectively, we can analyze it via the interface, through the Spark UI, in Databricks we can access it, whether in cell executions, in the job or in the cluster. In Apache Spark it is directly the IP on the default port 4040.
Spark UI is divided into several useful sections:
Jobs: Shows a list of all jobs executed in the application. Each job corresponds to an action in your code.
Stages: Displays the stages that make up each job. Stages are subdivisions of work that can be performed in parallel.
Tasks: Details the individual tasks within each stage, including information about task execution time and status.
Storage: Provides information about memory and storage usage of RDDs (Resilient Distributed Datasets).
Environment: Shows runtime environment properties, including Spark configurations and system variables.
Executors: Displays information about the executors created for the application, including memory usage, disk usage and performance statistics.
Here I was hierarchical, okay? This is the order in which things work.
I want images to put on the screen!!
Firstly, I will explain the main algorithms that are demonstrated both in the Spark UI interface and in the execution plan, be it the logical or the physical plan:
NOTE: Remembering that datasets are the same here as a Spark table ;)
1. Let's start with the most famous Scan:
2. Join (This one gives some B.O):
Broadcast Hash Join: Used when one of the datasets is small enough to be transmitted to all nodes in the cluster, avoiding Shuffle (I'll explain more about this thing, but in short it's a data shuffle operation to final join).
Shuffle Hash Join: Both datasets (tables if you prefer) are shuffled so that the corresponding keys are in the same partition. It is used when the datasets are large and cannot be transmitted to other nodes.
Sort Merge Join: Requires that both datasets be sorted before Joining. It is efficient for large datasets that are already partitioned and ordered, that is, the join is being done by partitioned and also ordered columns (e.g. df.write.partitionBy("coluna1").sortBy("coluna2").parquet("path /to/save/partitioned")
3. Aggregation (sum, count, group by etc...):
HashAggregate: uses a hash table to aggregate data. It is efficient for large data set that fits in memory.
SortAggregate. Aggregates data after sorting it. It is used when the data does not fit in memory.
4. Shuffle (Watch out for this guy):
5. Exchange:
6. Project:
7. Filter:
8. Sort:
All of these algorithms above can be observed as I said previously through the explain() command.
1. Join and GroupBy Operations
Operations such as join() and groupByKey() often trigger shuffle, which redistributes data between partitions. This can result in:
High disk I/O usage: Shuffle generates many intermediate files, which can saturate the executors' local disk.
High network load: The amount of data transferred between executors can be substantial, depending on the number of connections required (number of mappers multiplied by the number of reducers)
Mitigation
Shuffle metrics in Spark UI:
How shuffle works and why it is costly:
The vast majority work with notebooks due to the great popularity of Databricks, Jupyter notebook and Google Colab. Therefore, divide each query or transformation into separate cells, this makes it easier to identify which part is the performance problem. Leaving everything together, there are several jobs and it's difficult to know which stage is.
Use Merge instead of Overwrite, I know it's more work, but it's more logical and performative, since Merge will use less I/O than a 'dump' overwrite of the entire table again in the datalake.
Use cache() or persist() to store intermediate data in memory, especially if it will be reused across multiple operations. This can reduce recomputation time and improve performance.
In case you don't know, Spark runs on a JVM so it is natively Java, when you create the famous UDF - User Definition Function with Python you leave a kind of "black box" for Spark, preventing automatic optimizations. Whenever possible, use built-in Spark SQL functions, optimized for performance.
Well, I think I wrote everything I had in mind, I like writing articles because it helps me remember some scenarios. I intend to record a video showing all this, in practice with some public data, I'll probably get it on Kaggle so follow me on LinkedIn to keep up with everything related to the world of data, artificial intelligence and software development
--> https://www.linkedin.com/in/airton-lira-junior-6b81a661
Follow me on LinkedIn, give me a boost, I like feedback and I'm also completely open to improving knowledge sharing.
If you've read this far, congratulations!!! I hope it overcomes all performance issues. In the next article, I will address the advantages of Databricks, so follow me on LinkedIn to find out. Thank you!!
The above is the detailed content of Understanding and applying Apache Spark tuning strategies. For more information, please follow other related articles on the PHP Chinese website!