진행하고 있는 프로젝트에서 큰 용량의 데이터셋을 Spark로 연산 시 OOM 에러가 뜨는것을 발견하였다.
프로젝트는 Spark 자체를 내장하고 local에서 4개의 executor를 띄워 연산하기에, JVM이 데이터의 용량을 버티지 못하게 된 것이다.
이 때문에 프로젝트와 Spark 엔진을 분리하고 Spark 자체가 K8s 에서 자원을 할당받아 연산을 수행하도록 구조를 수정하였다.
또한 Spark 클러스터에 간편하게 작업을 제출할 수 있도록 Apace Livy를 통해 작업을 관리할 수 있도록 구성 하였다.
Livy의 동작 구조는 다음과 같다.
Livy Client는 REST API 또는 Programmatic API를 통해 Livy Server에게 Spark 작업 수행 요청을 제출할 수 있다.
Livy Server는 YARN, Mesos, K8s 클러스터에서 클러스터에서 자원을 할당받아 Spark Driver를 생성하고, SparkContext를 관리한다.
마지막으로, Executor를 통해 연산된 결과값을 Livy server가 client에게 전달한다.
K8s 상의 Spark-Livy 환경 구성을 위해 Livy Server를 구성해보았다.
0. Prerequisite
Spark가 K8s에 자원을 요청할 때의 로직은 Spark의 버전마다 상이하다.
K8s가 상시로 업데이트 될 뿐만 아니라 지원되는 API자체가 다른 경우도 있기 때문이다.
사용한 버전은 다음과 같다.
- K8s 1.27
- Spark 3.5.1
- Livy 0.8.1-incubating
- Helm 3.13.2
다른 버전을 사용한다면 이미지를 직접 빌드해서 사용할 수 있다.
해당 레포를 참고하여 버전에 알맞는 도커 이미지를 생성하였다.
GitHub - JahstreetOrg/spark-on-kubernetes-docker: Spark on Kubernetes infrastructure Docker images repo
Spark on Kubernetes infrastructure Docker images repo - GitHub - JahstreetOrg/spark-on-kubernetes-docker: Spark on Kubernetes infrastructure Docker images repo
github.com
1. Livy server Helm chart
해당 레포를 참고하여 Livy Server를 배포하였다.
GitHub - JahstreetOrg/spark-on-kubernetes-helm: Spark on Kubernetes infrastructure Helm charts repo
GitHub - JahstreetOrg/spark-on-kubernetes-helm: Spark on Kubernetes infrastructure Helm charts repo
Spark on Kubernetes infrastructure Helm charts repo - GitHub - JahstreetOrg/spark-on-kubernetes-helm: Spark on Kubernetes infrastructure Helm charts repo
github.com
이 Helm 차트는 K8s 1.19 이하 버전에서 지원되기 때문에 자원 명세서를 조금 수정해준다.
또한 새로 빌드한 이미지 사용, Livy Server가 실행시킨 session들의 정보를 저장할 수 있는 PVC 생성, Livy on K8s에서 지원하지 않는 Livy Server Recovery 옵션 제거 등 몇가지를 수정한다.
예로, v1beta와 같은 deprecated 자원 API 경로 등이 있었다.
HDFS와 같은 원격 FS를 사용하지 않기 때문에 K8s의 PVC를 통해 Spark에 제출할 작업인 Jar 파일을 관리하기로 했다.
NFS를 통해 PVC를 생성하고, Livy의 세션 데이터들이 저장되는 /root/.livy-sessions 디렉토리를 Driver/Executer Pod와 공유하기로 하였다.
livy-jars-pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: livy-jars
labels:
app.kubernetes.io/name: {{ include "livy.name" . }}
helm.sh/chart: {{ include "livy.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20G
storageClassName: "nfs-client"
driver-pod-template.yaml
apiVersion: v1
kind: Pod
spec:
containers:
- volumeMounts:
- mountPath: /root/.livy-sessions
name: livy-external-jars-share
volumes:
- name: livy-external-jars-share
persistentVolumeClaim:
claimName: livy-jars
친절하게도, JahStreetOrg에서 제공하는 이미지에는 Livy와 Spark 설정을 entrypoint.sh를 통해 간편하게 구성할 수 있도록 제공하였다.
0dash1uppercase 파싱을 사용하여 spark.kubernetes.driver.podTemplateFile 변수를 설정해주었다.
values.yaml(Livy Server의 Helm chart)
...
LIVY_SPARK_KUBERNETES_DRIVER_POD1TEMPLATE1FILE: {value: "/root/.livy-sessions/driver.yaml"}
LIVY_SPARK_KUBERNETES_EXECUTOR_POD1TEMPLATE1FILE: {value: "/root/.livy-sessions/driver.yaml"}
...
2. Job 코드 생성 및 JAR 빌드
Livy는 Java에서 LivyClient와 Job 등의 클래스를 통해 Programmatic API를 지원한다.
LivyClientBuilder를 통해 인스턴스를 생성하면 Livy Server에서 Spark Driver를 생성해주는 방식이다.
public static LivyClient livyClient() throws IOException, URISyntaxException {
return new LivyClientBuilder()
.setURI(URI.create("http://<livy server>:<port(default 80)>"))
.setConf("livy.spark.master", "k8s:https://<k8s control-plane>:<port>")
.setConf("spark.app.name", "livy-test")
.setConf("spark.executor.instances", "2")
.setConf("spark.kubernetes.file.upload.path", "/root/.livy-sessions")
.setConf("spark.driver.extraClassPath", "/root/.livy-sessions")
.setConf("spark.executor.extraclasspath", "/root/.livy-sessions")
.build();
}
livyClient 인스턴스의 uploadJar() API를 통해 Spark 작업 Jar 파일을 업로드할 수 있다. 이후, submit() API를 통해 spark-submit을 수행할 수 있다.
public Row[] process()
throws ExecutionException, InterruptedException, IOException, URISyntaxException {
LivyClient client = LivyConfig.livyClient();
File jarFile = new File("/path/to/jar-which-includes-SQLJob-class/livy.jar");
client.uploadJar(jarFile).get();
Row[] rows = client.submit(new SQLJob("SELECT * FROM dummy", 2)).get();
for (Row row : rows) {
System.out.println(row.get(0));
System.out.println(row.get(1));
System.out.println(row.get(2));
}
client.stop(true);
return new ResponseEntity<>(rows, HttpStatus.OK);
}
Spark에 submit한 작업은 미리 정의한 SQLJob 클래스인데, 이는 Livy의 Job 인터페이스를 구현한 작업 클래스이다.
다음과 같이 간단하게 Spark SQL을 사용할 수 있도록 구성해두었다.
public class SQLJob implements Job<Row[]> {
private final String sql;
private final int number;
public SQLJob(String sql, int number) {
this.sql = sql;
this.number = number;
}
public Row[] call(JobContext ctx) throws Exception {
SparkSession sparkSession = ctx.sqlctx().sparkSession();
Dataset<Row> dummyDataframe = sparkSession
.createDataFrame(
this.getDummyData(),
getDummySchema()
);
dummyDataframe.createTempView("dummy");
Dataset<Row> df = ctx.sqlctx().sql(this.sql);
requireNonNull(df, "dataset");
Row[] rows = df.takeAsList(number).toArray(new Row[0]);
return rows;
}
private List<Row> getDummyData() {
Row row1 = RowFactory.create(1, "Alice", 25);
Row row2 = RowFactory.create(2, "Bob", 30);
Row row3 = RowFactory.create(3, "Charlie", 22);
return Arrays.asList(row1, row2, row3);
}
private static StructType getDummySchema() {
return DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false)
});
}
}
이를 Jar 파일로 빌드한 후, Livy server의 공유 디렉토리에 업로드하여 driver와 executor가 Jar 파일을 공유받을 수 있도록 하였다.
3. Livy submit 테스트
Livy server를 통해 위의 작업을 제출해보았다.
작업을 제출하자 driver와 executor pod가 실행되며
작업이 종료되면 모든 pod이 제거되고 driver의 로그만 남게 된다.
'개발 > 각종이슈' 카테고리의 다른 글
Spark Standalone, Spark on K8s with Livy (feat. Ozone) (0) | 2024.01.16 |
---|---|
Kubernetes 클러스터 서버 이전 (0) | 2023.09.14 |