Purumir's Blog

Machine Learning, SW architect, Management, favorites

Kafka와 SparkStreaming을 통한 시계열 데이터 저장(on kubernetes)

다음과 같은 상황에서 시계열 데이터를 저장하는 과정을 기술하고자 합니다. 3개 지역의 온도 데이터를 kafka를 통해 수집하고, spark streaming으로 처리한후 이를 elasticsearch에 저장하는 과정입니다. 데이터의 수집 현황을 kibana를 통해서 실시간 모니터링합니다.


아키텍처 설명)

복잡한 처리를 중간에 하지는 않고 수집된 데이터를 바로 저장하는 단순한 패턴입니다. 추후 spark를 통한 시계열 분석은 진행할 예정입니다. 다만 Spark, ElasticSearch는 kubernetes상에서 배포합니다. Kafka는 istio환경에서 helm chart(https://github.com/helm/charts/tree/master/incubator/kafka) 를 이용해 배포시 kafka Pod가 zookeeper Pod (zookeeper-headless.es.svc.cluster.local로 access가 안됨) 에 access하지 못하는 현상이 있어 kafka는 kubernetes외부에 deploy한다는 가정입니다. 따라서, 아키텍처 컨셉은 다음과 같습니다.

Spark Client Mode로 Spark Job Submit을 kubernetes API Server를 통해서 수행하게 됩니다. 이를 위한 serviceaccount/clusterrolebinding 내용은 다음 글을 참조하시기 바랍니다. (https://purumir.github.io/2020/05/21/kubernetes로-spark-submit하기-spark-submit-to-kubernetes/)

spark submit을 하기 위한 python 코드는 다음과 같습니다. 기본 내용은 kafka에 저장된 데이터를 DStream을 통해서 각 row를 추출하며 이를 ECS(Elastic Common Schema)로 formatting하여 저장하는 것입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
temperature.py

import os
import json
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
############################################################
# spark configuration of spark #
############################################################
# If there is already spark running, you sould use getOrCreate() method.
sc = SparkContext().getOrCreate()
sc.setLogLevel("DEBUG")

# batch interval 1 second
ssc = StreamingContext(sc, 1)

kafkaStream = KafkaUtils.createStream(ssc, '<kafka host ip/kubernetes outside>:2181', 'spark-streaming', {'temperatureTopic':1})

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

############################################################
# code here elastic json format with ECS #
############################################################
def process_streaming(series_data):
import datetime
import json

# \ 오류 처리
series_data = series_data.replace("\"", "")
series_data = series_data.replace("\'", "\"")

series_data_json = json.loads(series_data)
date_time_obj = datetime.datetime.strptime(series_data_json.get("Record time"), "%Y-%m-%d %H:%M:%S.%f")

region = series_data_json.get("Region")

if region == 'Seoul':
region_temperature_degress = {
'seoul' : {
'temperature' : str(series_data_json.get("Temperature"))
}
}
hostname = 'sensor_seoul_01'
id = 'sensor_seoul_01'
elif region == 'Busan':
region_temperature_degress = {
'busan' : {
'temperature' : str(series_data_json.get("Temperature"))
}
}
hostname = 'sensor_busan_01'
id = 'sensor_busan_01'
elif region == 'Jeju':
region_temperature_degress = {
'jeju' : {
'temperature' : str(series_data_json.get("Temperature"))
}
}
hostname = 'sensor_jeju_01'
id = 'sensor_jeju_01'

elastic_data_json_with_ecs ={
'@timestamp' : date_time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f"),
'ecs' : {
'version' : '1.0.0'
},
'host' : {
'name' : 'sensor',
'hostname' : hostname,
'id' : id
},
'event' : {
'dataset' : 'thermometer.air_temperature',
'module' : 'thermometer',
'duration' : 10,
'original': series_data
},
'metricset' : {
'name' : 'air_temperature'
},
'service' : {
'type' : 'thermometer'
},
'thermometer' : {
'air_temperature' :
region_temperature_degress

}
}
return json.dumps(elastic_data_json_with_ecs)

# lines map 처리, 개별 timeseries데이터의 row를 elasticsearch에 적재
# ECS(Elastic Common Schema)로 formatting하여 저장함.
lines = lines.map(lambda item: process_streaming(item)).map(lambda x: ('key', x))
lines.pprint()

# RDD마다 elastic search 저장을 위한 function(saveAsNewAPIHadoopFile)를 호출합니다.
lines.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile( \
path='-', \
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
keyClass="org.apache.hadoop.io.NullWritable", \
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
conf={"es.nodes" : "<elastic search IP>", "es.port" : "9200", "es.resource" : "testindex", "es.input.json": "yes" }))


ssc.start()
ssc.awaitTermination()

Spark Job sumit 수행을 위한 command는 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
./bin/spark-submit \
--master k8s://https://<k8s master server ip>:6443 \
--deploy-mode client \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=<Spark Pod가 생성되는 kubernetes namespace> \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=<kubernetes service account> \
--conf spark.kubernetes.container.image=<spark 사이트 가이드 따라 빌드한 컨테이너 이미지> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfs-pvc-spark-master.options.claimName=nfs-pvc-spark-master \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfs-pvc-spark-master.mount.path=<NFS Mount Path> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfs-pvc-spark-master.mount.readOnly=false \
--conf spark.kubernetes.authenticate.oauthTokenFile=<Kubernetes Service Account oauth token file> \
--conf spark.kubernetes.authenticate.caCertFile=<Kubernetes Service Account ca crt file> \
--conf spark.kubernetes.executor.request.cores=500m \
--conf spark.kubernetes.driverEnv.memory=1g \
--conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false \ # side-car injection disable
--conf spark.executor.cores=1 \
--conf spark.executor.memory=700m \
--conf spark.worker.cores=1 \
--conf spark.driver.host=<spark job을 submit 하는 host ip> \
--conf spark.driver.port=7077 \
--jars elasticsearch-hadoop-7.6.1.jar \
--driver-memory 1g \
--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.5 \
temperature.py

오픈 소스 ETL 툴 비교(Apache NiFi vs StreamSets)

빅데이터, AI 시대에 데이터가 존재하는 origin으로부터 데이터를 가져오고 이를 처리후 적재하기 위한 ETL(Extract-Transformation-Load) Tools가 존재합니다. 대표적인 것이 Apache NiFi, Apache Airflow, StreamSets등이 존재합니다.

과거 분석을 위해서는 BI(Business Intelligence)와 이를 위한 저장소로 DW(DataWare House)가 사용되었습니다. 최근에는 이러한 아키텍처 보다는 데이터 orgin으로부터 데이터를 수집해 저장하고, R이나 머신러닝/딥러닝등을 이용한 Analyze가 주류를 이루고 있습니다.

구분 설명
Past Apps/RDBMS(계정계)-> ETL -> DW(Data WareHouse) -> ETL -> BI(Business Intelligence)(정보계)
Emerging
(Current)
Data Sources(Log/Device/Streams/Apps) -> Ingest -> Data Stores -> Analyze -> Data Consumers(R/ML, Search, BI, Apps)


이 글에서는 Apache NiFi와 StreamSets를 비교해 보고자 합니다.

Apache NiFi StreamSets Data Collector (SDC)
SW개발처 - donation : NSA (2014)
- current : Hortonworks
- California 기반의 startup(2014)
- open source ETL project
URLs https://nifi.apache.org/ https://github.com/streamsets/datacollector
개발에 사용된 Language Java Java
License Apache 2.0 license Apache 2.0 license
Usages Stream Data 혹은 Regular Periodic Batches를 위한 Long Running Job용도 Stream Data 혹은 Regular Periodic Batches를 위한 Long Running Job용도
데이터 FlowFile(original data + meta-information) record format
(데이터를 규격화된 포맷으로 변환 처리)
origin type CSV, Other Record-Based Data 등 CSV, Other Record-Based Data 등
Components [Data Provenance]
- dataflow내에서 거의 모든 것을 기록하는 Big Brother service
- dataflow 수행 history 기록

[Controller Service]
-processor를 위한 유용한 정보를 제공
- SSL certificates, JDBC 설정등,
schema definition
- 동일 configuration의 동시 설정을 지원

[Process Groups] (이것 자체가 dataflows가 됨)
- 개별 dataflow를 Process Group으로 묶는 것이 가능

[Processor Outputs]
- Original : FlowFile의 origin
- Failure : FlowFile이 처리 오류 발생시
- Success : FlowFilw이 정상적으로 처리시
[Origins]
- 데이터가 존재하는 external sources.
- dataflow내에서 한개의 origin만이 존재가능

[Processors]
- data transformers.

[Destinations]
- 데이터 저장 stores

[Executors]
- 다른 processor들에 의해 생성된 event처리
- Email처리 시 Error발생시 이를 처리 → EMail Executor
특징 - Processor간에 Queue가 존재하여 Flow Control
- 핸들링 하는 데이터 포맷에 맞춰 다른 버전의 Processor를 가져야 함.(CSV, XLS별도식)
- Processor간에 Queue가 존재하지 않음
- binary file을 processing가능함
- kakfa conumer가 processor에 binary를 그대로 전송 가능함
장단점 장점)
- dataflow programming concept을 효율적 구현
- binary data 처리 가능
Data Provenance
단점)
-Spartan User Interface
-record단위 모니터링, 디버깅, 통계 제공 안함.
장점)
- record단위 모니터링, 디버깅, 통계 제공
- record단위 data와 streaming을 위한 갈끔한 UI

단점)
- 단일 processor 수정을 위해 전체 dataflow의 stop필요
- No reusable configuration for processors

위 비교표에서 볼수 있듯이 큰 차이점은 StreamSets의 경우 한개의 processor의 configuration을 수정하기 위해 전체 dataflow를 정지시켜야 합니다. UI상 Stop버튼을 눌러야 수정을 할 수 있도록 구성 되어 있습니다.

StreamSets의 경우 Startup에서 지속적으로 관리하는 형태라 UI의 구성 자체가 NiFi대비 우수하다고 볼 수 있습니다. 또한, 설치도 docker 형태를 제공하고 있어 편리하게 구성후 테스트를 진행 할 수 있습니다.

Dark Knowledge (Knowledge Distillation)

이글은 Medium( https://medium.com/@mahendrakariya/dark-knowledge-in-neural-networks-467e5d699181 ) 글의 내용을 학습하면서 정리한 글입니다.


Neural Network는 그 목적이 각기 다름에도 불구하고, training과정에서 사용하는 network와 deployment과정에서 사용하는 network가 동일한 네트워크입니다. 이를 언급한 위 글에서는 애벌레와 성충이 되는 과정으로 설명하고 있습니다.

1) Dark Knowledge 개념

이러한 내용을 응용하여 neural network에 대한 model compression을 Bucila,.et al( https://www.cs.cornell.edu/~caruana/compression.kdd06.pdf )이 소개하였고, Geoffrey Hinton,.et al에 의해서 일반화( https://arxiv.org/abs/1503.02531 ) 되었습니다.

dark knowledge는 hidden knowledge라고도 하며, 이러한 dark knowledge를 추출하고 사용하는 것을 distillation이라고 합니다. 이를 위 글에서 언급한 내용을 토대로 살펴보면

multi-class classification problem(개, 고양이, 소, 자동차)을 살펴보면 hard-decision, normal softmax 적용후, softened softmax적용후 값의 변화는 아래와 같습니다.

Cow Dog Cat Car
hard targets(one-hot encoding of the gold class) 0 1 0 0
normal softmax 10^(-6) .9 .1 10^(-9)
softened softmax (softmax-temperature) .05 .3 .2 .005

hard decision을 적용 할 경우 True로 판정되는 label에만 1이 표기가 되며, normal softmax를 적용하면 False에 해당하는 Cow와 Car는 무시해도 될만큼(negligible) 값이 작다고 할 수 있습니다. 하지만 softened softmax(softmax-temperature) 를 적용할 경우 normal softmax보다 풍부한 정보를 나타낼수 있게 됩니다. 이는 Cat이 Cow로 mis-classifying될 가능성(.2와 .05)이 Cat이 Car로 mis-classifying될 가능성(.2와 .005)보다 크다고 할 수 있습니다. 이렇게 유실될 수 있는 정보를 보다 더 나타내는 것이 가능해 집니다.


[softmax- temperature]

Hinton에 의해서 소개된 distribution over the class를 계산하기 위한 방법으로, training시기에 student-teacher network에 동일한 temperature parameter 가 적용됩니다. inference시기에는 temperature parameter=1로 설정하여 standard softmax를 사용합니다. 이는 knowledge distillation을 통해 training시 student-teacher에 softened softmax를 사용하여 hidden information이 학습되도록 하고, inference시는 normal softmax를 사용한다는 의미입니다.


2) Dark Knwoledge(student teacher) Architecture

dark knowledge를 추출하여 사용하는 방식은 다음과 같습니다. Teacher에 해당하는 Lage Model을 train하고, 이 Neural Network로부터 dark knowledge(hidden knowledge)를 추출합니다. 이 dark knowledge기반으로 Student에 해당하는 Small Network를 학습시키는 방식입니다.

: [Dark Knowledge Transfering Architecture Concept]

위 Concept Architecture에서 확인할 수 있듯이 Teacher에서 Final Activation Layer(softmax)이전까지의 logits를 추출하고, 여기까지의 output에 softened softmax(softmax-temperature)를 적용하고, 이를 student network의 output으로 사용하게 됩니다. 이를 통해 dark knowledge 개념에서 살펴본것 처럼 rich information을 전달할 수 있게 됩니다.

이 그림에서 New Small Model이 우리가 학습시키는 Student Network가 됩니다. 이 모델은 먼저 Small Model을 만들어 Teacher Model에서 사용한 학습데이터(x_train, y_train)를 통해 학습을 하고 마진가지로 logits를 추출하여, New Small Model을 만들게 됩니다. 이 모델에 최종적으로 학습 데이터(x_train, softened_softmax_probaibilties)를 학습시키는 과정을 통해 dark knowledge를 tranfering하게 됩니다.

이러한 과정을 통해 이미지 분류를 위해서 통상적으로 사용하는 Convolution Network보다 훨씬 파라미터수가 적은 비교적 단순한 Dense Network를 통해서 이미지 분류 문제를 수행할 수 있게 됩니다. light-weight Model을 통해 Performance에 크게 손실이 없는 결과를 도출할 수 있게 됩니다.

출생아수 분석 #1(추세 그래프)

출생아수 저하는 경제력 저하로 이어진다는 말을 많이 들어왔습니다. 직접 분석을 위해서 출생아수 조사를 해보니 출생아수의 감소가 대부분의 광역자치단체에서 발생하고 있습니다. 이러한 추세를 한번 확인해 보고 그 의미를 분석해보고자 합니다.

먼저 이번 글에서는 출생아수(전국),각 광역 자치단체별 출생아수 데이터 현황을 ‘꺽은선형 차트’를 통해 확인해보고자 합니다.

  • 위 차트는 통계청에서 제공하는 출생아수 데이터를 Microsoft Power BI를 통해 월별로 데이터를 그리고, 이를 Power BI Cloud에 게시한후, 블로그 페이지에 embed하여 작성되었습니다.
  • 출생아수(전체)는 1981년~2019년(39년), 각 광역자치단체별 출생아수는 1997년~2017년(21년) 구간의 월별(month) 데이터입니다.

기억하시는 분들이 많겠지만 우리나라는 IMF구제 금융(1997.12~2001.08)시기를 겪었습니다. 차트에서 보면 이시기에 출산율이 확연하게 하강곡선을 그리게 됩니다. (차트상의 중간정도 부분). IMF시기가 인구구조에 있어서 급격한 변화를 초래하였다고도 볼 수 있습니다. 이후 횡보하던 패턴은 2015년경부터 다시 하강곡선을 그리게 됩니다. 비정규직 이슈와 청년 실업 문제가 본격적으로 대두가 되면서 출산율이 다시 하강 곡선을 그리기 시작한 것으로 보입니다.

정형데이터 분석에서의 BI툴의 사용

예측을 포함하지 않는 정형데이터 분석의 경우 Tableau, Power BI, 엑셀을 통해 분석하는것이 효율적이라 생각된다.배우다 보니 굳이 정형데이터의 경우 Pandas등을 쓸 필요가 있을까도 싶다. 그리고 이미 엑셀은 무한 row를 지원한다. 따라서 과거와 달리 코드를 짜서 대시보드를 만드는 것의 의미가 많이 희석되고 있다.

특히 이러한 도구는 프로그래밍에 스킬이 거의 없는 조직내 인원들도 분석의 대열에 합류하게 해 주는 도구라고 생각된다. 단, 데이터에 대한 해석력을 길러야 한다. 학교에서도 해석력에 대한 이야기를 많이 한다.

MS Power BI의 경우 보고서를 azure상에 publish하고 모바일 app으로 확인하여 조직내 다른 role을 가진 인원끼리의 데이터를 통한 소통이 가능해진다. 마케팅에서 이야기하는 Customer decision journey를 숫자 기반으로 분석시 이런 도구가 유용할듯 하다.

Tableau는 조금 월 subscription비용이 쎈 편이라 부담되지만 Power BI는 한화로 1만원을 조금 상회해서 경험하기에 적당하다고 생각한다. 개인적으로 google sheets(혹은 MS Office365)-Ms Power Bi-Azure-Power Bi app cycle구성을 해 보려한다. 매출액을 sheet형태로 구성하고 종합결과를 App으로 보는 식이다.

그런데 배우다보니 엑셀이 Power BI의 기능을 많이 흡수하고 있던데 이러한 기능상의 중첩을 MS가 의도하는 바는 무엇인지 의문이 생긴다.

엑셀은 지금도 발전중인듯 하다.

Moving_Average_수식_유도

Moving Average의 개념

Moving Average의 개념은 지난 post에서 언급한 글(https://purumir.github.io/2019/10/06/지수평활법-exponential-smoothing/ )의 내용을 참조하시기 바랍니다.


Moving Average(1) 수식유도

moving average(1)의 수식은 아래와 같이 정의될 수 있습니다.

White Noise성 데이터는 다음과 같은 성질을 가집니다.

Moving Average(1)의 수식 전개는 아래와 같습니다.

1) 기대값(Expectation) 전개

2) 분산(Variance) 전개

자기 자신에 대한 covariance는 분산(Variance)와 동일합니다.

3) 공분산(Covariance) 전개

4)

5) 자기상관계수(Auto Correlation Function) 전개

자기공분산(auto covariance function)을 통해서 자기상관계수(auto correlation function)을 구하게 됩니다.

위 수식에 따라서


Moving Average(2) 수식유도

moving average(2)의 수식은 아래와 같이 정의될 수 있습니다.

Moving Average(2)의 수식 전개는 아래와 같습니다.

1) 기대값(Expectation) 전개

2) 분산(Variance) 전개

3) 공분산(Covariance) 전개

4) 자기공분산(Auto Covariance Function) 전개

5) 자기상관계수(Auto Correlation Function) 전개

자기공분산(auto covariance function)을 통해서 자기상관계수(auto correlation function)을 구하게 됩니다.

위 수식에 따라서

Convolutional-Neural-Network-Part1

Convolutional Neural Network 개요

DMLP(Deep Multi Layer Perceptron) Convolutional Neural Network
network
structure
Fully-Connected Partially-Connected
Learning
speed
Very Low Low
장점 Model Complexity를 획기적으로 낮춤.
단점 Over-fitting 가능성 모든 feature추출을 해야 partially-connected 단점 극복가능함.
사용처 Image Processing, Signal Processing