Recently at eBay we migrated our 20 petabyte analytics platform from a data warehousing vendor to our own open-source-based in-house system built on Hadoop. Making that transition gave eBay the flexibility to innovate at scale and to quickly source the data necessary to fuel our tech-led reimagination. However, the third and final key component of the migration hinged on optimizing our data processing to meet or beat the performance of the legacy vendor system.

Background

eBay has a set of critical production tables (listed below) that must be refreshed on a daily basis within a set time period. This timing is extremely crucial since the data is used for various high priority reports that are distributed to eBay leaders across the globe. Therefore, it was imperative for the Hadoop system to be able to refresh these critical tables with the same speed and efficiency as the vendor system so the downstream reports could all be generated and distributed without any impact to business.

There were other factors that added to the need for Hadoop job optimizations. For example, Hadoop cluster resources would be wasted if extract, transform, load (ETL) batch jobs did not run efficiently. Additionally, inefficient job processing would also demand more support effort from the Hadoop platform team as well as the data services organization.

The initial assessments identified opportunities to streamline the design of the system, as well as batch jobs, tables, SQL scripts and other parameters. All these efforts to optimize Hadoop jobs yielded the results below.

Results

You can find the SLA table in italics below, demonstrating how the completion time in Hadoop is earlier than in our former vendor system.

Table

Before Tuning

After Tuning in Hadoop

Former Vendor System

dw_users_info

02:01 am

01:30 am

01:32 am

dw_bid

04:00 am

02:37 am

02:24 am

dw_checkout_trans

04:38 am

02:19 am

02:37 am

dw_lstg_item

03:12 am

02:37 am

02:34 am

dw_accounts

05:30 am

02:35 am

02:47 am

Approach

Multiple levels of tuning strategies were designed and incorporated to achieve the results above.

Level

Key Tuning Strategies

System

Queue control; parallel commit files; version upgrade

Process

Dependency enhancements; reducing intermediate data volume/columns; logic/flow optimization

Table

Bucket table; partition table; indexing; table retention

SQL

Avoiding skew data joins; reducing small files; reducing logic with multiple calculations; reducing shuffle; caching

Parameter

Executor; shuffle; timeout; broadcast join; dynamic executor

System Level

Queue control is the key contributor to system-level performance optimization. Queue control does two main things. First, it makes the jobs use the correct queue based on their priority and the capacity available in the various queues. Second, it helps map-reduce jobs use low-memory queues and save high-memory queues for Spark jobs.

Queues are set up with two thresholds: maximum capacity and guaranteed capacity. Maximum capacity serves as a ceiling for how high the queue utilization can go, whereas guaranteed capacity serves as the minimum quantity of resources/capacity that a given queue will get at any point in time. If a particular queue does not have enough resources and is below its guaranteed capacity while the root of the system is at 100% capacity, jobs running on other queues utilizing more than the guaranteed capacity will be killed to provide for the queue that is under its guaranteed capacity.

Process Level

With a concrete approach to optimizing jobs at the system level, the next step was to tackle the gaps at the process level. Below is a breakdown of the approach:

  1. Manage job dependencies to identify long-running portions of the job and split the logic so they can be run independently without holding up all the execution tasks and without having to wait for all the source tables/data to be ready.
  2. Extend the previous approach to split jobs' logic wherever possible, thereby increasing the number of job sections that can execute in parallel.
  3. Tighten the process logic by combining redundant calculations.
  4. Reduce the number of joins for each table (especially the target table, which is typically much larger than an incremental table) and combine them into a single join wherever possible.
  5. When columns of a single large table are used multiple times in the process, create an intermediate table with just the frequently used columns so the dependent processes can work with this intermediate table rather than the entire large table, which might demand a lot of resources for operations.
  6. Change the logic to perform filter operations before join operations so the data size is smaller and hence less demanding in terms of resources needed.

Example: The original logic includes 16 minutes for the first operation, another 16 minutes for the join operation and then 32 minutes of wait time, all of which can be eliminated per the updated logic in the second figure below.

Process flow before the change:


Process flow after the change:

Table Level

The key optimization approaches at the table level include:

  1. Building Spark tables (using a parquet format) instead of Hive tables (stored as parquet).
  2. Building bucket tables for the most used attributes and non-skew columns.
  3. Building indexes on frequently used filtered columns.
  4. Ensuring bucket numbers are optimal (e.g. it's better to have buckets as multiples of one block and less than 2GB in size).

Example: The DW_ACCOUNTS table was changed from 20,000 buckets to 40,000 buckets. By adding a 'Sort by' for bucket columns in the table DDL, the job performance dramatically improved:

Bucket Number

Memory Each Executor

Memory Overhead

Max Executor Number

Cores Each Executor

Running Time

20000

26GB

18GB

20000

2

31mins

40000

16GB

4GB

10000

2

16mins

SQL Level

The next category of performance optimizations was performed at the SQL level:

  1. Reduce the shuffle volume when using joins and filter out unnecessary records and columns.
  2. If the tables being joined have skew data, filter out the skew portion if it is not necessary, and do a union after the join operation is complete. If the skew data is necessary for the join, split the skew data using a broadcast join (if the volume is not very large), or manually split the skew data into different nodes.
  3. If two large tables are joined and the join and output yield very little data, or if insert-only operation logic (not overwritten in the daily process) is used, reset a shuffle partition and add 'distribute by part' to reduce the number of small files.
  4. If one temp view with logic will be calculated several times, either cache it or change the logic to calculate only once.

Example 1: The original code had more than 200 steps and ran for an hour. After tuning the code, the same task was reduced to 50 steps and ran in 12 minutes.

Example 2:

Example 3: The first join in the original code (byr_id=-999 is a high-skewed key) will finish as soon as byr_id=-999 data is not joined. However, the results of the first join are shuffled into one node, which causes the second join to shuffle the most skewed data from this node, making the query run for four to six hours.

After tuning (using 'union all' to avoid shuffling byr_id=-999 data, so there is no skew shuffle for the entire query), the query finished in 15 minutes.

Parameter Level

The final category of optimizations were at the parameter level:

  1. Set the optimal memory for each executor based on the size of the data. (Typically the size should not be larger than 10GB per task.) Also, make garbage collection time less than 10% of the task period.
  2. Set a reasonable overhead memory size for each executor by considering the shuffle data size for each executor.
  3. Set the optimal number of executor cores: Too many will make high inputs/outputs on the same node, and too few will require more executors.
  4. Keep the initial dynamic executor size small, and use the dynamic executor min/max to control the number of executors when the job is running. Parallel runs with too many executors can cause the drivers to fail.
  5. Set the shuffle partition size according to the shuffle data volume, and reset it to a small value if the output volume is small to reduce the number of small files.
  6. Set a timeout; the default is five minutes, but best practice is to set it as two minutes.
  7. Split the file size according to the block size. If the table is a bucket table and the SQL does not use bucket joins, disable bucket reading and split the table as two blocks.
  8. Use a broadcast join when one join table is very large or has skew data and the other is very small.

Example: A merge job for dw_accounts which is a 26+ terabyte table after compression:

Original:

  • Runtime - 1hour
  • spark.executor.cores=2
  • spark.executor.memory=40g
  • spark.yarn.executor.memoryOverhead=18g
  • spark.sql.shuffle.partitions=20000
  • spark.dynamicAllocation.maxExecutors=2000s
  • spark.network.timeout=3000

After tuning:

  • Runtime - 16mins (meanwhile enabling a bucket join)
  • spark.executor.cores=3
  • spark.executor.memory=27g
  • spark.yarn.executor.memoryOverhead=2g
  • spark.sql.shuffle.partitions=40000
  • spark.dynamicAllocation.maxExecutors=10000
  • spark.network.timeout=120s

These five levels of optimization of Hadoop jobs yield performance results that are consistently better than what was provided by the vendor system. By providing teams with a tool that quickly processes the data they need for their innovations, we are powering the next level of eBay's tech-led reimagination, one report at a time.

Attachments

  • Original document
  • Permalink

Disclaimer

eBay Inc. published this content on 01 June 2021 and is solely responsible for the information contained therein. Distributed by Public, unedited and unaltered, on 01 June 2021 15:07:59 UTC.