Purumir's Blog

Machine Learning, SW architect, Management, favorites

Kafka와 SparkStreaming을 통한 시계열 데이터 저장(on kubernetes)

다음과 같은 상황에서 시계열 데이터를 저장하는 과정을 기술하고자 합니다. 3개 지역의 온도 데이터를 kafka를 통해 수집하고, spark streaming으로 처리한후 이를 elasticsearch에 저장하는 과정입니다. 데이터의 수집 현황을 kibana를 통해서 실시간 모니터링합니다.


아키텍처 설명)

복잡한 처리를 중간에 하지는 않고 수집된 데이터를 바로 저장하는 단순한 패턴입니다. 추후 spark를 통한 시계열 분석은 진행할 예정입니다. 다만 Spark, ElasticSearch는 kubernetes상에서 배포합니다. Kafka는 istio환경에서 helm chart(https://github.com/helm/charts/tree/master/incubator/kafka) 를 이용해 배포시 kafka Pod가 zookeeper Pod (zookeeper-headless.es.svc.cluster.local로 access가 안됨) 에 access하지 못하는 현상이 있어 kafka는 kubernetes외부에 deploy한다는 가정입니다. 따라서, 아키텍처 컨셉은 다음과 같습니다.

Spark Client Mode로 Spark Job Submit을 kubernetes API Server를 통해서 수행하게 됩니다. 이를 위한 serviceaccount/clusterrolebinding 내용은 다음 글을 참조하시기 바랍니다. (https://purumir.github.io/2020/05/21/kubernetes로-spark-submit하기-spark-submit-to-kubernetes/)

spark submit을 하기 위한 python 코드는 다음과 같습니다. 기본 내용은 kafka에 저장된 데이터를 DStream을 통해서 각 row를 추출하며 이를 ECS(Elastic Common Schema)로 formatting하여 저장하는 것입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
temperature.py

import os
import json
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
############################################################
# spark configuration of spark #
############################################################
# If there is already spark running, you sould use getOrCreate() method.
sc = SparkContext().getOrCreate()
sc.setLogLevel("DEBUG")

# batch interval 1 second
ssc = StreamingContext(sc, 1)

kafkaStream = KafkaUtils.createStream(ssc, '<kafka host ip/kubernetes outside>:2181', 'spark-streaming', {'temperatureTopic':1})

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

############################################################
# code here elastic json format with ECS #
############################################################
def process_streaming(series_data):
import datetime
import json

# \ 오류 처리
series_data = series_data.replace("\"", "")
series_data = series_data.replace("\'", "\"")

series_data_json = json.loads(series_data)
date_time_obj = datetime.datetime.strptime(series_data_json.get("Record time"), "%Y-%m-%d %H:%M:%S.%f")

region = series_data_json.get("Region")

if region == 'Seoul':
region_temperature_degress = {
'seoul' : {
'temperature' : str(series_data_json.get("Temperature"))
}
}
hostname = 'sensor_seoul_01'
id = 'sensor_seoul_01'
elif region == 'Busan':
region_temperature_degress = {
'busan' : {
'temperature' : str(series_data_json.get("Temperature"))
}
}
hostname = 'sensor_busan_01'
id = 'sensor_busan_01'
elif region == 'Jeju':
region_temperature_degress = {
'jeju' : {
'temperature' : str(series_data_json.get("Temperature"))
}
}
hostname = 'sensor_jeju_01'
id = 'sensor_jeju_01'

elastic_data_json_with_ecs ={
'@timestamp' : date_time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f"),
'ecs' : {
'version' : '1.0.0'
},
'host' : {
'name' : 'sensor',
'hostname' : hostname,
'id' : id
},
'event' : {
'dataset' : 'thermometer.air_temperature',
'module' : 'thermometer',
'duration' : 10,
'original': series_data
},
'metricset' : {
'name' : 'air_temperature'
},
'service' : {
'type' : 'thermometer'
},
'thermometer' : {
'air_temperature' :
region_temperature_degress

}
}
return json.dumps(elastic_data_json_with_ecs)

# lines map 처리, 개별 timeseries데이터의 row를 elasticsearch에 적재
# ECS(Elastic Common Schema)로 formatting하여 저장함.
lines = lines.map(lambda item: process_streaming(item)).map(lambda x: ('key', x))
lines.pprint()

# RDD마다 elastic search 저장을 위한 function(saveAsNewAPIHadoopFile)를 호출합니다.
lines.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile( \
path='-', \
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
keyClass="org.apache.hadoop.io.NullWritable", \
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
conf={"es.nodes" : "<elastic search IP>", "es.port" : "9200", "es.resource" : "testindex", "es.input.json": "yes" }))


ssc.start()
ssc.awaitTermination()

Spark Job sumit 수행을 위한 command는 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
./bin/spark-submit \
--master k8s://https://<k8s master server ip>:6443 \
--deploy-mode client \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=<Spark Pod가 생성되는 kubernetes namespace> \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=<kubernetes service account> \
--conf spark.kubernetes.container.image=<spark 사이트 가이드 따라 빌드한 컨테이너 이미지> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfs-pvc-spark-master.options.claimName=nfs-pvc-spark-master \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfs-pvc-spark-master.mount.path=<NFS Mount Path> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfs-pvc-spark-master.mount.readOnly=false \
--conf spark.kubernetes.authenticate.oauthTokenFile=<Kubernetes Service Account oauth token file> \
--conf spark.kubernetes.authenticate.caCertFile=<Kubernetes Service Account ca crt file> \
--conf spark.kubernetes.executor.request.cores=500m \
--conf spark.kubernetes.driverEnv.memory=1g \
--conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false \ # side-car injection disable
--conf spark.executor.cores=1 \
--conf spark.executor.memory=700m \
--conf spark.worker.cores=1 \
--conf spark.driver.host=<spark job을 submit 하는 host ip> \
--conf spark.driver.port=7077 \
--jars elasticsearch-hadoop-7.6.1.jar \
--driver-memory 1g \
--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.5 \
temperature.py