스파크의 데이터프레임과 실행 계획에 대해 정리한 글입니다.
1. 들어가며
스파크의 동작 원리를 이해하기 위해 유튜브의 PySpark - Zero to Hero 시리즈와 여러 자료를 보면서 내용을 정리하고 있습니다. 이번 글에서는 스파크의 데이터프레임(DataFrame)과 실행 계획에 대해 다뤄보겠습니다. 내용 중 부정확하거나 애매한 부분이 있다면, 편하게 알려주세요!
2. 데이터프레임이란?
데이터프레임은 스파크에서 데이터를 다룰 때 가장 많이 사용하는 구조적 API(Structured API)입니다. 여기서 "구조적"이라는 말은 데이터가 스키마, 즉, 컬럼 이름과 데이터 타입이 명시된 정형 데이터 형태로 표현된다는 뜻입니다.
데이터프레임은 행(row)과 열(column)로 구성된 표 형태의 데이터 구조로, 엑셀 시트와 비슷하게 생각하면 이해하기 쉽습니다. 파이썬의 Pandas나 R에서도 데이터프레임 개념이 존재하지만, Pandas나 R의 데이터프레임은 보통 한 대의 컴퓨터 메모리 위에서 동작하기 때문에 처리 가능한 데이터 크기에 제한이 있습니다. 반면, 스파크의 데이터프레임은 여러 대의 노드에 데이터를 분산 저장하고, 병렬로 연산할 수 있어, 훨씬 더 큰 규모의 데이터를 효율적으로 처리할 수 있습니다.

3. 스파크 코드의 실행 과정
데이터프레임을 이용해 코드를 작성하면 스파크는 이 코드를 바로 실행하지 않습니다. 대신 내부적으로 여러 단계를 거쳐 실행 계획을 세우고 최적화한 뒤, 실제로 클러스터에서 실행하게 됩니다. 이 과정에 대해서 좀 더 자세히 살펴보겠습니다.

3.1. 논리적 실행 계획
먼저, 코드를 논리적 실행 계획(logical plan)으로 바꾸는 단계를 거치는데요. 이 단계는 "무엇을 해야 하는가"에 초점을 맞추고, "어떻게 수행할 것인가"는 고려하지 않습니다. 논리적 실행 계획은 다음 세 단계를 거칩니다.
1) 검증 전 논리적 실행 계획 (Unresolved Logical Plan)
사용자가 작성한 SQL이나 데이터프레임 연산이 트리 구조로 변환됩니다. 이 트리에는 어떤 연산을 수행할지, 어떤 컬럼을 사용할지가 담기지만, 실제 테이블이나 컬럼이 존재하는지는 아직 확인되지 않는 상태입니다.
2) 검증된 논리적 실행 계획 (Analyzed Logical Plan)
스파크 분석기는 검증 전 논리적 실행 계획을 입력받아, 카탈로그, 테이블 메타데이터, 데이터프레임 스키마 정보를 참조하여 각 컬럼과 테이블을 실제 객체로 연결(binding)합니다. 즉, "이 코드가 정말 실행 가능한지?"를 검사하는 단계로, 존재하지 않는 컬럼이나 테이블을 참조했다면, 이 시점에서 오류가 발생합니다.
3) 최적화된 논리적 실행 계획 (Optimized Logical Plan)
검증이 완료되면 스파크의 Catalyst Optimizer가 개입해서 논리적 실행 계획을 더 효율적인 형태로 최적화합니다. 대표적인 최적화 기법은 다음과 같습니다.
- 조건절 푸시다운(predicate pushdown): 필터 연산을 가능한 한 앞단으로 이동시켜 처리해야 할 데이터 양 줄임
- 선택절 최적화(projection pruning): 필요한 컬럼만 선택하여 불필요한 데이터 읽기 방지
3.2. 물리적 실행 계획
논리적 실행 계획이 최적화되면, 이제 스파크는 "어떻게 실행할지"를 결정합니다. 이 단계에서 생성되는 것이 물리적 실행 계획입니다. 물리적 실행 계획에서는 다음과 같은 요소들이 정해집니다.
- 연산 방식: 정렬, 필터, 조인 등 각 연산을 어떤 알고리즘으로 수행할지
- 병렬 처리 전략: 작업을 워커 노드로 어떻게 분배할지
- 데이터 이동(shuffle): 필요한 경우 데이터를 어떻게 재분배할지
스파큰 가능한 여러 전략을 만들어보고, 각 전략의 비용을 계산해 가장 효율적인 계획을 선택합니다. 최종적으로 선택된 계획은 DAG 형태로 표현되어, 클러스터 전역에서 병렬로 실행됩니다.
4. 실습
앞서 살펴본 내용을 간단한 예제로 확인해보겠습니다.
(실습은 Docker 기반의 jupyter/pyspark-notebook:aarch64-spark-3.5.0 이미지를 사용했습니다.)
4.1. 데이터프레임 생성
먼저 간단한 데이터를 이용해 데이터프레임을 만들어보겠습니다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("Playground").getOrCreate()
data = [
(1, "원동선", "아메리카노", 4000),
(2, "성이헌", "아메리카노", 4000),
(3, "하정서", "라떼", 4500),
(4, "정민음", "율무차", 5000),
(5, "최준범", "녹차", 6000)
]
df = spark.createDataFrame(data, ["id", "name", "menu", "price"])
df.show()
다음과 같은 결과가 출력됩니다.
+---+------+----------+-----+
| id| name| menu|price|
+---+------+----------+-----+
| 1|원동선|아메리카노| 4000|
| 2|성이헌|아메리카노| 4000|
| 3|하정서| 라떼| 4500|
| 4|정민음| 율무차| 5000|
| 5|최준범| 녹차| 6000|
+---+------+----------+-----+
4.2. 실행 계획 확인
1단계에서 만든 데이터프레임에서 가격이 4,500원을 넘는 주문만 필터링하고, id를 제외한 컬럼만 선택해보겠습니다.
# 데이터 필터링
filtered_df = df.filter(col("price") > 4500).select("name", "menu", "price")
# 실행 계획 확인
filtered_df.explain(True)
explain(True)을 실행하면 스파크가 내부적으로 어떤 단계를 거쳐 쿼리를 실행하는지 트리 형태의 실행 계획으로 출력됩니다.
결과를 읽을 때는 "아래에서 위로" 읽습니다.
1) Parsed Logical Plan - 코드 초안
== Parsed Logical Plan ==
'Project ['name, 'menu, 'price]
+- Filter (price#11L > cast(4500 as bigint))
+- LogicalRDD [id#8L, name#9, menu#10, price#11L], false
- 'name처럼 앞에 따옴표(')가 붙은 것은 컬럼이 아직 실제 객체와 연결되지 않은 상태를 의미합니다.
- LogicalRDD란 메모리에서 직접 만든 데이터프레임/RDD를 뜻합니다.
- cast(4500 as bigint)는 비교 대상의 자료형에 맞춰 자동 형변환된 것을 의미합니다.
2) Analyzed Logical Plan - 스키마 검증 완료
== Analyzed Logical Plan ==
name: string, menu: string, price: bigint
Project [name#9, menu#10, price#11L]
+- Filter (price#11L > cast(4500 as bigint))
+- LogicalRDD [id#8L, name#9, menu#10, price#11L], false
- 스파크 분석기가 df의 스키마를 참조해 각 컬럼이 실제 존재하는지, 타입이 일치하는지 확인합니다.
- 'name처럼 추상적이던 컬럼이 name#9: string처럼 식별자와 타입이 붙은 실제 객체로 바뀌었습니다.
- 존재하지 않는 컬럼을 읽으려고 하면, 이 단계에서 아래와 같이 AnalysisException이 발생합니다.
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
Cell In[6], line 2
1 # 데이터 필터링
----> 2 filtered_df = df.filter(col("price") > 4500).select("name", "menu", "pe")
4 # 실행 계획 확인
5 filtered_df.explain(True)
3) Optimized Logical Plan - 불필요한 연산 제거
== Optimized Logical Plan ==
Project [name#9, menu#10, price#11L]
+- Filter (isnotnull(price#11L) AND (price#11L > 4500))
+- LogicalRDD [id#8L, name#9, menu#10, price#11L], false
- isnotnull(price) 조건이 추가됐습니다. 추가해서 null 비교로 생길 오류나 불필요한 연산을 사전에 차단합니다.
4) Physical Plan - 실제 실행 전략
== Physical Plan ==
*(1) Project [name#9, menu#10, price#11L]
+- *(1) Filter (isnotnull(price#11L) AND (price#11L > 4500))
+- *(1) Scan ExistingRDD[id#8L,name#9,menu#10,price#11L]
- Scan -> Filter -> Project 순서로 데이터를 처리합니다.
- 여기서 *(1)이 세 번 적혀있는 것은, 하나의 Stage에서 실행이 이뤄졌다는 것을 나타냅니다. 만약 셔플이 발생했다면, *(1) -> *(2)처럼 Stage 번호가 바뀝니다.
5. 나가며
이번 글에서는 스파크의 데이터프레임과 실행 계획에 대해 살펴봤습니다. 다음 글에서는 스파크의 기본 연산을 정리해보겠습니다.
참고자료
'Data > Spark' 카테고리의 다른 글
| [Spark] 변환과 액션 (0) | 2025.10.31 |
|---|---|
| [Spark] 스파크란 무엇인가? (0) | 2025.10.10 |
