1. Apache Saprk 등장 배경
빅데이터를 처리하다 보면 처리해야 하는 데이터가 보유하고 있는 머신으로 감당하기 힘든 적 또는 SQL 쿼리를 실행하였더니 밤새도록 걸린 적이 있을 것이다.
이런 경우 머신을 업그레이드하거나, 인내심을 길러 어쨌든 문제를 해결할 수 있었지만, 처리해야 할
데이터가 계속해서 증가하고 데이터베이스에 수백만 개의 행이 추가되면 지속적으로 문제가 생길 것이다.
이러한 경우에 Apache Spark를 사용하면 문제를 해결할 수 있다!
2. Apache Spark란?
Spark는 현재 오픈 소스 프로젝트를 운영하는 단체인 Apache에 의해 관리되고 있는 프로젝트이다. Apache에 따르면 Spark는 대규모의 데이터 처리를 위한 고속처리엔진 또는 대규모 데이터 처리를 위한 빠른 범용 클러스터 프레임워크를 제공하는 기술이라고 한다.
단순하게 말하면 그냥 빅데이터를 빨리 처리하는 기술이다.
기존에 Spark의 역할을 하던 것이 Hadoop의 MapReduce다. 그러나 기존의 MapReduce는 disk에서 수행되었기 때문에 속도적인 부분에서 좋지 못한 모습을 보였고 이러한 한계를 극복하기 위해 개발된 것이 in-memory에서 수행되는 Spark이다. 극단적인 예시이긴 하지만, Apache에 의하면 in-memory에서는 Saprk가 MapReduce보다 100배가량 빠르고, disk에서는 MapReduce가 Spark보다 10배 가량 빠르다고 한다. 또한 Spark를 사용하는 이유로는 8000개 이상의 노드를 추가 가능한 확장성과 hdfs, s3 등 다양한 데이터의 활용 같은 여러 이유가 있다.
3. 대화형 프로그램
Spark는 Scala, Pyspark를 이용해 대화형 프로그램으로 실행할 수 있다. 대화형 프로그램이란 사용자의 입력에 대하여 바로 결과를 출력하는 프로그램이다.
4. 실행과정
그림에 나와있는 Driver Program(Master Node), Cluster Manager, Worker Node(Slave Node)를 모두 합쳐
Spark Application이라고 한다.
Driver Program은 Master Node에서 스파크 전체의 main() 함수를 실행하고 Spark Application 내 정보의 유지 관리, Executor의 실행 및 실행 분석, 배포 등의 역할을 수행한다. 또한 사용자가 구성한 사용자 프로그램(Job)을 task 단위로 변환하여, Worker Node의 Executor로 전달한다.
Executor은 여러 개의 Worker Node에서 실행되는 프로세스로, Spark Driver가 할당한 작업(task)을 수행하여 결과를 반환한다.
Cluster Mananger는 Spark Application의 리소스를 효율적으로 분배하는 역할을 하고, 작업이 실행되는 Node들을 관리한다. 이때 Spark는 Cluster Mananger의 상세 동작을 알지 못하며 단지 Cluster Mananger와 통신을 하여 할당 가능한 Executor를 전달받는다.
Spark 3.0 기준으로 사용가능한 Cluster Manager는 아래와 같다.
- StandAlone: 스파크에서 자체적으로 제공하는 클러스터 매니저. 각 노드에서 하나의 익스큐터만 실행 가능
- Apache Mesos: 아파치 클러스터 매니저. 동적 리소스 공유 및 격리로 여러 소스의 워크로드를 처리
- Hadoop YARN: 하둡 클러스터 매니저 (리소스 매니저, 노드 매니저)
한 개의 Spark Application은 한 개의 Master Node와 다수의 Worker Node로 이루어져 있다.
이때, Executor는 Cluster Manager에 의하여 해당 Spark Application에 할당되며, 해당 Spark Application이
종료된 후 할당에서 해방된다. 그렇기 때문에 서로 다른 Spark Application 간의 직접적인 데이터 공유는 불가능하다.
Spark Application 실행 과정
사용자가 Spark를 실행할 때의 대략적인 실행 흐름은 다음과 같다.
1. 사용자가 Spark-submit을 통해 어플리케이션을 제출한다.
2. Spark Driver가 main()을 실행하며, SparkContext를 생성한다.
3. SparkContext가 Cluster Manager와 연결된다.
4. Spark Driver가 Cluster Manager로부터 Executor 실행을 위한 리소스를 요청한다.
5. Spark Context는 작업 내용을 task 단위로 분할하여 Excutor에 보낸다.
6. 각 Executor는 작업을 수행하고, 결과를 저장한다.
5. RDD vs DataFrame vs DataSet
Spark Api를 보면 그림처럼 RDD, DataFrame, DataSet순으로 발전하였다. 그러나 Api 단순화를 위해
Spark 2.0에서 DataFrame과 DataSet을 통합하였으므로 RDD와 DataFrame만 보면 된다.
(1) RDD
RDD는 Resilient Distributed Dataset(탄력 있는 분산 데이터세트)의 줄임말로 Spark의 기본 데이터 구조이면서 Spark에서 사용하는 기본 데이터 단위이다. Spark = RDD라고 생각해도 될만큼 RDD는 Sprak를 이해하는데 있어서 아주 중요한 요소이다.
- Resilient(탄력 있음) : 데이터가 유실되면 다시 생성됨
- Distributed(분산시킴) : 데이터를 클러스터의 여러 worker 노드들에 분산시킴
RDD는 여러 개의 Parition이라는 단위로 나뉘고, Spark에서는 이 Partition이 분산처리의 단위가 된다. 다시 말해 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러개의 Partition으로 나뉜다. RDD를 여러 Partition으로 나누어 처리하기 때문에 한 대의 머신으로 처리할 수 있는 것보다 더 많은 데이터를 처리할 수 있게 된 것이다. Partition의 개수는 선택 가능한 부분이지만 일반적으로 CPU 코어의 개수보다 많게 설정한다.
예를 들어 현재 클러스터의 코어 개수가 3개이고, RDD를 4개의 Partition으로 나누었다고 가정하고, 총 처리해야할 데이터의 개수는 4000개라고 해보자. 이때 하나의 Partition에서 1000개의 데이터를 처리할 때 1시간이 걸린다고 가정하겠다.
이 경우 Partition이 4개이고 처리해야할 데이터가 4000개이므로 한 개의 Partition 당 1000개의 데이터를 처리해야 한다.
그렇다면 3개의 코어에서 3개의 Partiton이 처리되는 task가 진행되고, task가 끝난 후 1개의 코어에서 추가적으로 남은 1개의 Partition이 처리되는 task가 진행될 것이다. 그렇다면 걸리는 시간은 1시간 + 1시간으로 총 2시간이 소요된다.
이번에는 Partiton의 개수를 3개로 지정해보자. 그러면 이전에 처리한 데이터는 총 4000개이므로 3개의 Partition에서 처리하려면 하나의 Partition당 1333개를 처리하면 된다. 3개의 코어에서 3개의 Partition이 실행되는 task가 진행될 것이고, 추가 작업은 없을 것이다. 처음에 처리되는 task가 아까보다는 길어지겠지만 (비율로 따지면 1시간 20분) 총 작업시간은 1시간 20분으로 Partition이 4개일 때에 비해 걸리는 시간이 훨씬 적어졌다.
이렇게 코어의 개수를 고려하여 Partition의 개수를 지정하는 것이 엔지니어로서의 중요한 역량이다.
그러면 Partition의 개수를 어떻게 설정해야할까?
각 Executor는 각 Core당 하나의 task를 실행할 수 있으며, 한 Partition이 하나의 task와 연계된다.

일반적으로 Partition의 개수를 늘리는 것은 오버헤드가 너무 많아지는 수준이 되기 전까지는 성능을 높여준다. 또한 총 코어 개수보다 적은 Partition을 쓰면 일부 CPU가 쉬게 되므로 최소한 총 CPU 코어 개수 이상의 Partition을 사용해야 한다.따라서 Partition이 부족한 것보다는 차라리 조금 더 많은 것이 낫다. 또한 Partition 개수를 늘리는 것은 각 Executor에서 Spark가 한 번에 처리하는 양이 적어지므로 메모리 부족 오류를 줄이는데 도움을 준다.
그렇다고 Partition의 개수를 무작정 늘리는 것도 잘못된 방법이다.
스파크 드라이버가 모든 파티션의 메타데이터를 보관해야 하며, Driver memory errors & Driver overhead errors가 발생할 수 있다. 또한 모든 파티션을 Scheduling 하기 위해 드는 시간은 공짜가 아니고, 작은 사이즈의 파일들을 생성하기 위한 I/O가 많이 발생하며, 이 과정에서 오랜 시간이 소요될 수 있다.
Saprk RDD vs Hadoop MapReduce
위에서 말했던 Hadoop의 MapReduce와 Spark의 RDD에 무슨 차이가 있길래 속도 차이가 난다고 한 것일까? Hadoop의 MapReduce를 실행하면 기존의 프레임워크에서 사용되었던 데이터를 재사용하기 위해, 디스크 스토리지에
저장을 하게 되는데 이때 소요되는 시간이 전체 MapReduce 소요 시간의 90%를 차지한다고 한다.
반면 Spark는 메모리에 저장하기 때문에 이 부분에서 시간차이가 발생한다.
앞에서 말했듯이 두 방법의 가장 큰 차이는 MapReduce는 HDFS(디스크 스토리지)에 읽고 쓰고, RDD는 메모리에 읽고 쓰기 때문에 이 부분에서 MapReduce와 Spark의 시간차이가 발생한다.
RDD에 관한 Spark의 모든 작업은 새로운 RDD를 만들거나 이미 존재하는 RDD를 변형하는 것이다.
RDD 생성
RDD는 sparkContext로부터 생성되며 생성 방법에는 두 가지 방법이 있는데, 드라이버 프로그램의 컬렉션 객체를 이용하거나 외부에서 데이터를 가져오는 방법이다. 전자는 리스트나 시퀀스 타입의 객체를 사용하여 데이터를 설정하는 방법이다.
RDD = sc.parallelize(["a","b","c","d"])
print(", ".join(str(i) for i in RDD.collect()))
후자는 외부파일을 가져와 RDD로 만드는 방법이다. 모든 외부데이터는 디스크, HDFS에 들어가 있다. 그렇게 외부에 있는 데이터를 가져오는 과정에서 RDD가 생성된다.
RDD = sc.textFile("파일경로")
RDD 연산
RDD를 제어하는 연산에는 두 개의 연산 타입이 있다. 첫 번째는 기존의 RDD에서 새로운 RDD를 만들어내
Transformation함수, 두 번째는 RDD를 이용한 작업결과를 드라이버에 반환하거나 파일시스템에 결과를 적는 Action함수이다.
Transformation이 RDD로부터 다른 RDD를 얻는 '데이터 가공'에 해당하는 조작이라면, Action은 RDD 내용을 바탕으로 데이터를 가공하지 않고 원하는 결과를 얻는 조작이다.
Transformation함수의 예시로는 map, filter, flatmap 등이 있고, Action함수의 예시로는 reduce, collect, count 등이 있다.