Purumir's Blog

Machine Learning, SW architect, Management, favorites

Flask Multitenancy Class Routing

Flask를 통해서 Multitenancy를 구현하는 방법에 대해서 이야기 해보고자 합니다. 제가 사용한 방법보다 더 효과적인 방법이 있을수 있으며, 이 방식은 그 중 하나의 방법 정도로 살펴봐 주시면 좋을듯 합니다. 이 문서는 몇개의 데이터 소스를 사전 등록하고 이 안에서 class routing을 tenant에 따라서 수행하는 방법을 다룹니다.

Multitenancy Pattern

https://learn.microsoft.com/en-us/azure/azure-sql/database/saas-tenancy-app-design-patterns?view=azuresql

위 글에서 제시하는 패턴 중 “D. Multitenant app with database-per-tenant”에 대해서 다루고자 합니다.

Flask-SQLAlchemy에서 Multi-datasource 설정

https://flask-sqlalchemy.palletsprojects.com/en/3.1.x/binds/

위 Flask-SQLAlchemy의 가이드 문서를 보면 Flask에 다중 데이터 소스를 등록할 수 있습니다. SQLALCHEMY_DATABASE_URI는 서비스에서 사용하는 공통 database로 사용하고, SQLALCHEMY_BINDS는 각 tenant별 데이터 소스를 등록합니다.

config.py에서의 데이터 소스

1
2
3
4
5
6
7
# MAIN Datasource
SQLALCHEMY_DATABASE_URI = "postgresql:///{db_username}:{db_password}@{db_host}:5432/{db_name}"
# Per Tenant Datasource
SQLALCHEMY_BINDS = {
"tenant1_ds": "postgresql:///{db_username}:{db_password}@{db_host}:5432/{db_name}",
"tenant2_ds": "postgresql:///{db_username}:{db_password}@{db_host}:5432/{db_name}",
"tenant3_ds": "postgresql:///{db_username}:{db_password}@{db_host}:5432/{db_name}",

Multitenancy Model Routing

Flask App기동시 Model Initialize 최초 수행하기

1
2
3
# Flask App 기동시 최초 1회 수행합니다.
with app.app_context():
initialize_multi_tenant_model()
1
2
3
4
5
6
7
class Tenants(Enum):
"""
# tenant 목록을 Enum으로 선언합니다.
"""
TENANT1 = 'tenant1'
TENANT2 = 'tenant2'
TENANT3 = 'tenant3'
1
2
3
4
5
# Flask가 기동하면서 로딩할 base model class를 지정합니다.
base_models = [
# base model을 지정합니다.
InBoundClassInfo
]
1
2
3
4
5
6
7
8
9
10
11
12
class InBoundClassInfo(db.Model):
"""
__abstract__ 를 넣어야 tenant별로 class를 동적으로 생성가능합니다.
"""
__abstract__ = True
__tablename__ = 'in_bound_class_info'

field1 = db.Column(db.String(50), primary_key=True)
field2 = db.Column(db.String(50), nullable=True)

def __repr__(self):
return '<%r %r>' % (self.__tablename__, self.field1)
1
2
3
4
5
6
7
8
9
10
def initialize_multi_tenant_model():
"""
Flask App Context에서 이 메소드를 실행하여 tenant별로 class를 생성하여 등록합니다.
"""
tenants= [tenant.value for tenant in tenants]

for tenant in tenants:
for base_model in base_models:
# camel_to_snake는 base_model의 CamelCase를 Snake Case로 만드는 함수를 만드시면 됩니다.
initialize_model(tenant, base_model, camel_to_snake(base_model.__name__))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def initialize_model(tenant_nm, base_cls, tbl_nm):
"""
파라미터로 전달된 정보를 기반으로 Model을 생성하여 dict에 등록합니다.
"""
# tenant1_svc_in_bound_class_info -> Tenant1SvcInBoundClassInfo로 리턴하는 함수(string_transform)를 사용합니다.
cls_nm = string_transform(tenant_nm + '_svc_' + tbl_nm )
Model = type(cls_nm, (base_cls,),{
# SQLALCHEMY_BINDS 에서 선언한 datasource와 일치하게 됩니다.
'__bind_key__': tenant_nm + '_ds',
# InBoundClassInfo의 경우 in_bound_class_info로 테이블이 생성되었다고 가정합니다.
'__tablename__': tbl_nm,
# 해당 datasource에서 schema 정보를 기술합니다.
'__table_args__': {'schema': tenant_nm + '_ds'}
})
# Tenant1SvcInBoundClassInfo, Tenant2SvcInBoundClassInfo, Tenant3SvcInBoundClassInfo라는 이름으로 모델을 dict에 등록합니다.
flask_mdl_dict[cls_nm] = Model
1
2
3
4
5
6
7
8
def get_model_class_per_tenant(tenant, base_cls_nm):
"""
해당 tenant + base model 정보를 기반으로 해당 tenant의 클래스를 리턴합니다.
"""
cls_nm = string_transform(tenant_nm + '_svc_' + tbl_nm )
model_cls = flask_mdl_dict.get(cls_nm)

return model_cls

위와 같이 설정하고 나서 Flask의 서비스 클래스 등에서 다음과 같이 호출하여 db model을 사용하면 됩니다.

1
2
model_cls = get_model_class_per_tenant(tenant, "in_bound_class_info")
in_bound_class_info = db.session.query(model_cls).filter(model_cls.field1 == 'ONE').one()

database-per-tenant에 해당하는 Flask에서 구성가능한 Multienancy 패턴을 기술해 보았습니다. 위와 같이 구성할 경우 테넌트별로 각각 db Model을 별도로 선언하고 코딩해야 하는 코드 중복을 줄일 수 있습니다.

PROMPTHON EXPERIENCE

Prototype Building with LLMs

Design thinking is a process that follows the steps of Empathize, Define, Ideate, Prototype and Test. In this process, listening to the Voice of the Customer (VoC) is more important than any other step.

For this reason, my team in the prompthon has thought that Large Language Models (LLMs) are effective tools for quickly building prototypes. Traditional coding requires time for developers to construct the logic. However, with prompt engineering, there’s no need to code the logic directly. I simply provided the user’s question, the query based on the user’s question, information about the table and my prompt. Using them, the LLM was able to return the result in an HTML table format.

By integrating the HTML response with a no-code tool component, my team has demonstrated that it’s possible to create prototypes without the time-consuming coding typically required, especially for prototypes that might be discarded later.

langchain + Prompt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_core.prompts import ChatPromtTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

promt_query = """
Based on the table schema below,
Write a PostgreSQL query that answer the user's question:
{schema}

Question: {question}
SQL Query:"""

db = SQLDatabase.from_uri("postgresql://~~")

def get_schema(_):
return db.get_table_info()

promt_query = ChatPromptTemplate.form_template(promt_query)
sql_query_chain = (
RunnablePassthrough.assign(schema=get_schema)
| promt_query
| llm.bind(stop=["\nSQLResult:"])
| StrOutputParser()
)
sql_respone = sql_response_chain.invoke({"question": "<USER QUESTION>"})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
prompt_response = """
Based on the table schema, question, sql query and sql response,
write a respone which is the korean language.

Schema : {schema}
Question : {question}
SQL Query : {query}
SQL Response : {response}

If the sql response is not empty, please just follow the below step.
- The test_item, review_criteria, test_method of the sql response must be included in the response.
- When you return the reponse, return the html table code which contains the test_item, review_criteria, test_method.
- And the text of the table header must be written in the korean language.

If the sql response is empty, please just write a below respone.
- "Refer to ~~~"
"""

prompt_response = ChatPromptTemplate.form_template(prompt_response)

full_chain = (
RunnablePassthrough.assign(query=sql_query_chain)
| RunnablePassthrough.assign(
schema=get_schema,
resposne=lanmbda x: db.run(x['query'])
)
| prompt_response
| llm
| StrOutputParser()
)

full_chain.invoke({"question": "<USER QUESTION>"})

Prompt Flow Concept

(airflow) Continuous kudu table migration using the Airflow SparkOperator

Background

In my case, a big scale system was splitted into several small systems. But, some code values in a big scale system should be used at every small systems. Below diagram shows such a situation.

: a bigger system was spliitted into some smaller systems

In the big data ecosystem, data values can be updated in the Apache KUDU via the Apache Impala. The Apache Impala is the role of the bridge for the CRUD operation. In my case, some code values is inserted newly. For these reasons, the bigger DW system use the Apache KUDU which is bridged via the Apache Impala.

For the migration of the code values on every day, I have developed the SparkOperator on the circumstance of the Airflow. The concept of the migration is like below.

KUDU Table Creation

At the smaller DW systems, some KUDU tables which have same schema of the KUDU tables in the bigger DW system were created for managing the code values.

1
2
3
4
5
6
7
8
9
10
11
-- This is the example
CREATE TABLE code_table
(
code1 STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
code2 STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY(code 1)
)
STORED AS KUDU
TBLPROPERTIES(
'kudu.master_addresses'='<KUDU Master Host #1>,<KUDU Master Host #2>..<KUDU Master Host #N>'
)

SparkSubmitOperator Code

The Airflow provide the SparkSubmitOperator for the submitting the spark job. Additionally, some libraries were used for the submitting the spark job.

In below code, a spark code snippet is located in the line of ‘spark_scripts’.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable

class PullKuduTablesSparkSubmitOperator(SparkSubmitOperator):
@apply_defaults
def __init__(
self,
applications='',
conn_id='spark_default',
~
executor_memory='<Executor Memory Size>',
driver_memory='<Driver Memory Size>',
~
name='pull_kudu_tables',
~
spark_binary='<lib>/spark/bin/spark-submit',
*args,
**kwargs):

super(SparkSubmitOperator, self).__init__(*args, **kwargs)

self._application = application
~
self._spark_binary = spark_binary
self._hook = None
self._conn_id = conn_id

def execute(self, context):
spark_scripts = Variable.get('spark_scripts', deserialize_json=True)
self._application = spark_scripts # spark submit code

super(PullKuduTablesSparkSubmitOperator, self).execute(context)


class PullKuduTablesSparkSubmitPlugin(AirflowPlugin):
name = 'pull_kudu_tables_spark_submit_plugin'
operators = [ PullKuduTablesSparkSubmitOperator ]

Spark Scripts

The spark submit code snippet is very simple. Spark receives the host informations of the KUDU masters. The Apache KUDU is possible to have the server groups of the masters. In that cluster of the KUDU, one server has the role of a leader in the master servers, and the other servers have the roles of the followers. The following article describes such the Architectural Overview.

Cloudera KUDU Architectural Overview

Below code snippets show the spark code for the KUDU table migration.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys, os
import json
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from airflow.models import Variable

spark = SparkSession.builder.getOrCreate();

def main():
table_name = sys.args[1] # Kudu table name for the migration
from_kudu_masters = sys.args[2] # Source Kudu Masters
to_kudu_masters = sys.args[3] # Destination Kudu Masters

from_kudu_table_name = "impala::<source db>.{table_name}".format(table_name=table_name)
# load the data from kudu table of the source cluster to the spark dataframe
df = spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master", from_kudu_masters)
.option("kudu.table", from_kudu_table_name)
.load()

to_kudu_table_name = "impala::<destination db>.{table_name}".format(table_name=table_name)

# write the data from the spark dataframe to the kudu table of the destination cluster.
df.write.format("org.apache.kudu.spark.kudu")
.option("kudu.master", to_kudu_masters)
.option("kudu.table", to_kudu_table_name)
.mode("append")
.save()

if __name__ == "__main__":
main()
sys.exit(0)

Airflow DAG

The Variable feature (Menu Location : Airflow Admin > Variable) could be used for saving some kind of the static information. So, I have registered not only the address of the KUDU masters of the source cluster, but also the address of the KUDU masters of the destination cluster.

The SparkSubmitOpeator pull the KUDU table from the source cluster to the desination cluster per a table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from airflow import DAG
from airflow.models import Variable
from custom_operators.pull_kudu_tables_spark_submit_operator import PullKuduTablesSparkSubmitOperator
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
import sys

default_args ={
'owner': '<user name>',
'start_date' : datetime(2022, 10, 11),
'email': ['<EMAIL ADDRESS>'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=10)
}

# The KUDU Masters of the source cluster and the destination cluster
kudu_masters = Variable.get("kudu_masters", deserialize_json=True)
# The table name for the migration.
kudu_tables = Variable.get("kudu_tables", deserialize_json=True)

with DAG(
dag_id = 'pull_kudu_tables',
default_args=default_args,
schedule_interval='<Schedule Interval>',
tags=['KUDU', 'Migration']
) as dag:

process_list = []

for table in kudu_tables:
application_args = [table,
','.join(map(str, kudu_masters['source_cluster'])),
','.join(map(str, kudu_masters['destination_cluster']))
]

remove_detination_kudu_table
= BashOperator(
task_id = 'remove_kudu_table',
bash_command = """ env -i impala-shell -i <Impala Server Address> -q 'delete from <destination database name>.{table_name}' """.format(table_name=table),
queue='<Airflow Queue Name>',
dag=dag
)

pull_kudu_spark_operator
= PullKuduTablesSparkSubmitOperator(
task_id = 'pull_kudu_table_{table_name}'.format(table_name=table),
jars='<lib>/kudu/kudu_spark_<your version>.jar, <lib>/kudu/kudu_spark_tools_<your version>.jar',
application_args=application_args,
queue='<Airflow Queue Name>',
dag=dag
)

# after removing the data, the data is inserted newly.
remove_detination_kudu_table >> pull_kudu_spark_operator
process_list.append(remove_detination_kudu_table)

process_list # Airlow tasks execution for all tables.

These work fine through the Airflow SparkSubmitOperator. In the case of operation of the code system, the Apache KUDU is the good choice.

[airflow] TriggerDagRunOperator와 ExternalTaskSensor를 통한 배치 흐름 제어

수백개의 배치를 그룹으로 구성하고 그 그룹간의 Dependency를 둘 수 없을까?

수백개가 넘는 배치를 하나의 DAG에 넣는 것은 유지보수도 어렵게 하고, 동시 작업성 측면에서도 좋지 않다고 생각되어 이를 그룹화(각 DAG를 그룹화의 단위로 정의)하였습니다. 이 DAGs간의 dependency를 줄 수 있는 방법에 대해서 고민하게 되었습니다. 처음에는 후속 DAG에서 앞의 DAG의 마지막 Task의 결과를 확인하고 데이터가 생성되어 있지 않으면 생성이 될 때까지 대기하는 방법을 사용하였습니다.(hdfs sensor를 통해서 _SUCCESS 같은 마커가 생성될때까지 대기하도록 구성)

보다 개선된 방법으로 airflow에서 ExternalTaskSensor를 제공하는 것을 확인하여 이를 병행하는 방식으로 DAG간의 dependency를 Task 센싱을 통해서 처리하도록 구성하였습니다.

개념적으로 다음과 같은 형태로 구성을 하였습니다.

위 그림에서 선행 DAG의 마지막 Task를 센싱하는 ExternalTaskSensor를 후행 DAG의 앞에 둠으로써 ExternalTaskSensor가 선행 DAG의 마지막 Task의 완료를 센싱할때까지 후행 DAG의 실행(triggering)을 하지 않도록 하는 구조입니다. ExternalTaskSensor는 선행 DAG의 마지막 Task가 완료가 되면 이를 센싱하고 후행 DAG를 TriggerDagRunOperator를 통해서 triggering하게 됩니다.

이러한 방식의 장점은 각 DAG의 스케줄링 시간을 정의하고 time interval에 의해서 실행간 제어를 할 경우 선행 DAG가 완료되지 않았음에도 후행 DAG를 실행하는 문제점에 대한 대안으로 사용 할 수 있고, _SUCCESS 같은 마커를 통한 흐름제어보다 개선된 방식이라고 할 수 있습니다.

1
2
3
4
5
6
7
8
## TriggerDagRunOperator를 통한 pre-DAG triggering
trigger_predag_task = TriggerDagRunOperator(
task_id = f'trigger_pre-DAG', # DAG-A 를 triggering함.
trigger_dag_id = 'pre-DAG',
execution_date = 'YYYYMMDD', # triggering시 execution date
queue = 'queue name',
reset_dag_run = True # 이미 존재하는 dag run이 있을 경우 이를 clear하고 실행할지 여부
)
1
2
3
4
5
6
7
8
9
10
## ExternalTaskSensor 예시
predag_sensor_task = ExternalTaskSensor(
task_id = f'sensor_pre-DAG',
external_dag_id = 'pre-DAG',
external_task_id = 'pre-DAG-final-Task',
execution_date = 'YYYYMMDD', # 센싱을 원하는 execution_date
mode = 'reschedule',
timeout = 3600, # 60 분
queue = 'queue_name'
)
1
2
3
4
5
6
7
8
## TriggerDagRunOperator를 통한 DAG-A triggering
trigger_daga_task = TriggerDagRunOperator(
task_id = f'trigger_DAG-A', # DAG-A 를 triggering함.
trigger_dag_id = 'DAG-A',
execution_date = 'YYYYMMDD', # triggering시 execution date
queue = 'queue name',
reset_dag_run = True # 이미 존재하는 dag run이 있을 경우 이를 clear하고 실행할지 여부
)
1
2
## 'pre-DAG'의 'pre-DAG-final-Task'가 성공하면, DAG-A를 triggering한다.
trigger_predag_task >> predag_sensor_task >> trigger_daga_task

[airflow] execution_date에 따른 특정 일자 task실행 처리

execution_date의 요일에 따른 Task 분기 처리

월요일~일요일까지의 요일중에서 특정 요일에만 Task를 실행하고, 다른 요일에는 Task실행을 하지 않거나 DummyTask 실행을 하는 상황이 있습니다. 주 1회 실행하는 배치등의 경우 이러한 케이스에 해당합니다.

airflow에서 주입하는 execution_date의 요일(weekday)를 계산할 수 있다면 특정 요일(여기서는 일요일)에만 실행하도록 처리할수 있을 것입니다. 이러한 분기(branch)처리는 BranchPythonOperator를 통해서 처리 할수 있습니다.

코드를 살펴보면 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def weekly_branch_shell(dag : DAG, task_id : str):
"""
주 1회(일요일) 실행되는 배치에 대한 Branch 처리
execution_date를 기준으로 일요일만 Task 실행(배치 실행), 나머지 요일은 DummyOperator 실행
"""
# 0 : Monday ~ 6 : Sunday로 인덱싱
tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]

def get_weekday(**kwargs):
execution_date = kwargs['ti'].execution_date
# 요일에 해당하는 숫자 (0 : Monday ~ 6: Sunday)를 XCom에 담습니다.
kwargs['ti'].xcom_push(key='execution_date_weekday', value=execution_date.weekday())

def weekday_branch(**kwargs):
# XCom에서 요일정보를 추출합니다.
execution_date_weekday = kwargs['ti'].xcom_pull(task_ids='week_day_'+task_id, key='execution_date_weekday')

# sunday
if execution_date_weekdy == 6:
return task_id
else:
# 요일과 실행하려는 Task명을 붙여서 월~토까지 DummyTask명을 만듭니다.
return "DummyTask_for_" + tabDays[execution_date_weekday] + "-" + task_id

# get_weekday를 호출하여 요일정보를 XCom에 담습니다.
get_weekday_oper = PythonOperator(
task_id = 'weekday_' + task_id,
python_callable = get_weekday,
provide_context = True,
queue = '{사용하는 airflow queue 이름}',
dag = dag
)

fork_oper = BranchPythonOperator(
task_id = 'branching_' + task_id,
python_callable = weekday_branch,
provide_context = True,
queue = '{사용하는 airflow queue 이름}',
dag = dag
)

get_weekday_oper.set_downstream(fork_oper)

# 분기후 실행하여야 하는 Task, DummyTask를 연결합니다.
for day in range(0, 7):
if day == 6:
batch_shell_oper = BashOperator(task_id = task_id,
bash_command = '{실행하려는 쉘 정보}',
queue = '{사용하는 airflow queue 이름}',
dag = dag
)
# 분기 branch에 이어 붙입니다.
fork_oper.set_downstream(batch_shell_oper)
else:
fork_oper.set_downstream(DummyOperator(task_id='DummyTask_for_' + tabDays[day] + "-" + task_id, dag=dag)
# 구성이 된 operators를 리턴
return get_weekday_oper

airflow UI Task의 구조

위 코드를 airflow UI에서 확인하면 아래와 같은 형태로 일요일만 해당 쉘을 실행하도록 Task 구조가 생성됩니다. 다른 요일에는 DummyTask가 실행됩니다.

2021년 서울지하철을 통해 본 사람들의 이동-(노선별) 일별 승하차

사람들은 수 많은 지하철역중에 어느 라인(호선)을 이용할까?

11개월의 데이터( 이전포스팅 에서 이야기 한대로 data.go.kr 에 공개된 데이터가 2021.01~2021.11까지 공개되어 있습니다.) 를 가지고 , 본 포스팅에서는 일별 흐름을 중심으로 살펴보겠습니다. 1시간마다 기록되어 있는 승하차 인원중 전체를 살펴보고, 이후 출근 시간(06:00~10:00, 4시간) 과 퇴근시간(17:00~21:00, 4시간)을 중점적으로 살펴보겠습니다.

사용된 툴은 일종의 BI툴이라고 할수 있는 Google Analytics 제품군의 일부인 data studio ( https://datastudio.google.com/ )와 Google Sheet(데이터셋을 업로드) 사용하였습니다.


LINE(호선)별 전체 합계 / 일별 평균 / 시간별 평균 승하차

지난 2021년 01월~2021년 11월까지 1호선~9호선(2~3단계)까지 전체 승하차 인원은 24억 1천만명에 달합니다. 2호선과 1호선(승하차 순위 7위)의 이용객수 차이는 6.14배가 넘습니다. 또한, 2호선이 7호선(승하차 순위 2위) 대비 1.94배가 많아 2호선의 압도적인 수송 규모를 확인할 수 있습니다.

순환 노선인 2호선이 다른 호선에 비해서 압도적으로 많은 이용객수(7억 1천 9백만)가 있다고 볼수 있고, 서울을 북동 <-> 서로 가로지르는 7호선(3억 6천 8백만) 과 북서 <-> 동으로 가로지르는 5호선(3억 3천 6백만)이 뒤를 이어 이용객수가 많습니다.

일별 평균을 보면 하루평균 약 721만명이 지하철을 이용하고 있고, 2호선은 일평균 2백만명이상(시간당 11만명이상), 7호선은 110만명(시간당 5만 8천명이상), 5호선은 100만명(시간당 5만 3천명이상)이 일일 평균 이용함을 알수 있습니다. 다른 노선들은 일평균 82만명을 하회하고 있습니다. 2호선의 혼잡도가 압도적으로 높다는 것을 수치로 확인 할 수 있습니다.

: 11개월동안 전체 승하차 인원표

승차(승차/환승승차) 와 하차(하차/환승하차)를 각각 분리하여 보면 다음과 같습니다. 승차 인원은 전체 12억 5백9십만정도이고, 하차 인원은 12억 4백6십만 정도입니다. 승차를 했으면 하차를 해야 하는데 이 수치가 130만정도가 차이가 나는데, 이것은 9호선 1~2단계 구간(민자구간)과 중앙선, 분당선, 신분당선등 다른 노선으로 하차한 승객들 때문일 것으로 보입니다. 이 비중은 1%안쪽으로 비중이 크지 않습니다.

: 11개월동안 전체 승차인원표

: 11개월동안 전체 하차 인원표


출퇴근 시간대 Trend

[출근 시간대-승하차 Trend]

11개월동안의 일별 승하차 trend를 출근시간대로 한정하여 살펴보면 아래와 같습니다.

: 일자별(date)로 본 06~07시 승하차 노선별 현황

: 일자별(date)로 본 07~08시 승하차 노선별 현황

: 일자별(date)로 본 08~09시 승하차 노선별 현황

: 일자별(date)로 본 09~10시 승하차 노선별 현황

위 그래프에서 약 4시간(06:00~10:00) 출근 시간동안 비슷한 패턴을 보이는데 2->7->5->3->4->6->8->1->9 호선 같은 순서로 사람들이 승하차를 많이 함을 알수 있습니다. 앞선 포스팅에서 이야기 하였듯이 9호선은 2~3단계 노선의 승하차 인원만 존재하여 가장 순위가 낮지만 실제로는 상위순위일 것으로 예상됩니다.

위 그래프에서 출근시간대에 7월~8월 중순까지 그래프가 움푹 파인 구간이 있는데 이 구간은 여름휴가 기간동안 승하차 인원이 감소하는 것으로 생각할 수 있습니다.

[퇴근 시간대-승하차 Trend]

11개월동안의 일별 승하차 trend를 퇴근시간대로 한정하여 살펴보면 아래와 같습니다.

: 일자별(date)로 본 17~18시 승하차 노선별 현황

: 일자별(date)로 본 18~19시 승하차 노선별 현황

: 일자별(date)로 본 19~20시 승하차 노선별 현황

: 일자별(date)로 본 20~21시 승하차 노선별 현황

퇴근시간은 출근시간대와 달리 1호선이 8호선 보다 승하차 인원이 많아 순위가 바뀐 2->7->5->3->4->6->1->8->9 호선 같은 순서로 사람들이 승하차를 많이 합니다. 출근시간대와 마찬가지로 퇴근시간대에 7월~8월 중순까지 그래프가 움푹 파인 구간이 있는데 이 구간 역시 여름휴가 기간동안 오전 시간대와 마찬가지로 승하차 인원이 감소하는 것으로 생각할 수 있습니다.


출퇴근 시간대 합계

[출퇴근 시간대-승하차 합계]

아래표는 11개월동안 출근 시간 구간의 승하차 인원의 전체 합계입니다.

: 오전 출근시간동안 승하차 인원표

약 4시간에 달하는 출근 시간동안 전체 승하차인원(24억 1천만)의 약 27%(6억 4천 1백만)가 집중되고 있습니다. 08:00~09:00시간대에 승하차인원(2억 6천 2백만, 11%)이 가장 많음을 알수 있습니다. 09:00~10:00까지도 이러한 경향은 계속된다고 볼수 있습니다.

아래표는 11개월동안 퇴근 시간 구간의 승하차 인원의 전체 합계입니다.

: 오후 퇴근시간동안 승하차 인원표

약 4시간에 달하는 퇴근 시간동안 전체 승하차인원(24억 1천만)의 약 30%(7억 1천 7백만)가 집중되고 있습니다. 18:00~19:00시간대에 승하차 인원(2억 6천 1백만, 11%)이 집중되고 있습니다. 19:00~20:00 보다 17:00~18:00의 수치가 더 높아 이때 퇴근이 더 많을 것이라고 예상할 수 있습니다.

위 표들은 승하차인원의 합계이기 때문에, 이를 상세화 하면 다음과 같이 승하차를 구분지어 생각할 수 있습니다.

  • 아침 출근 시간대
    • 승차 : 거주지 근처 역에서 승차하는 경우 , 버스에서 내린 후 지하철에 승차하는 경우
    • 하차 : 주로 일을 하게 되는 사무실 주변역에서 하차하는 경우
  • 저녁 퇴근 시간대
    • 승차 : 주로 일을 하게 되는 사무실 주변역에서 하차하는 경우
    • 하차 : 거주지 근처역에서 하차하는 경우 , 버스를 타기 위해 지하철에서 하차 하는 경우

위와 같은 관점으로 승차, 하차를 분리하여 확인해 보겠습니다.

[출퇴근 시간대-승차 합계]

출근 시간대 승차만을 살펴보면 전체 승하차와 비교했을때 3호선(합계 2천 9백만) 과 4호선(합계 3천 3백만) 의 순위가 바뀌고, 1호선(합계 6백 2십만)과 9호선(합계 7백 6십만)의 순위가 바뀌게 됩니다. 4호선이 3호선보다 출근을 위해 거주지 근처 지하철을 이용하는 사람이 많다고 볼수 있습니다. 9호선은 2~3단계 구간임에도 승차 인원이 1호선보다 많다는 것은 김포공항 방향에서 출발하여 출근하는 인원이 많다는 것을 알수 있습니다.

퇴근 시간대 승차만을 살펴보면 1호선의 순위가 출근시간대 9위(합계 6백 2십만)에서 퇴근시간대 6위(합계 2천 2백 4십만)가 되는 것은 1호선이 거주보다는 직장에서 퇴근을 위해 승차하는 사람이 상대적으로 많다는 것으로 추정할 수 있습니다.

출근 시간대, 퇴근시간대 전체 승차 인원은 각각 3억 7백만, 3억 7천 1백만으로 전체 승하차 인원(24억 1천만)의 12.74%, 15.42%를 점유합니다.

[출퇴근 시간대-하차 합계]

앞서 출퇴근 시간대 승차인원을 통해 언급했던 “1호선은 거주보다는 직장에서 퇴근을 위해 승차를 많이 한다.”는 내용은 출퇴근 시간대 하차에서도 확인 할 수 있습니다. 1호선의 출근 시간대 하차 순위가 6위(합계 2천 6십만) 였다가, 퇴근시간대 하차 순위는 8위(합계 9백 1십만)로 떨어지는 것을 볼수 있습니다. 아침시간대 출근을 위해 하차 한 인원들이 상대적으로 많다고 볼수 있습니다.

출근 시간대, 퇴근시간대 전체 하차 인원은 각각 3억 3천 4백만, 3억 4천 5백만으로 전체 승하차 인원(24억 1천만)의 13.88%, 14.34%를 점유합니다.


전체적으로 2, 7, 5 호선은 출근시간대의 승차 순위(2->7->5), 퇴근시간대의 하차 순위(2->7->5)가 전체 승하차 순위(2->7->5)와 동일하여 거주지가 집중되고, 직장도 많이 분포한다고 추정할 수 있습니다. 다른 라인에서는 특히 1호선이 거주 보다는 직장이 분포된 라인이라고 추정할 수 있습니다.

지하철 승하차 순위와 이용 분포를 통해서 주택가격이 이러한 지하철 이용객 분포와 연결이 되지 않을까 가정(실제 영향을 크게 미치겠지만)할 수 있는데 이는 향후 추가적으로 살펴보도록 하겠습니다.

- 다음 포스팅에 이어집니다. -

2021년 서울지하철을 통해 본 사람들의 이동- 목표 그리고 데이터셋

우리는 언제(When) 어디(Where) 를 통해 어디(Where)로 이동하는가

2021년 한해동안 사람들은 어떤 식으로 움직였을지에 대해서 분석할수 없을까라는 생각을 하게 되었습니다. 우리는 도보, 자전거등 개인이동수단(비동력), 자동차(자가용, 택시, 공유 차량), 버스, 지하철, 비행기등의 수단을 통해 이곳에서 저곳으로 다시 저곳에서 또 다른 곳으로 계속적인 이동을 합니다.

하지만 경제 생활 인구의 대부분은 아침에 직장이 위치하는 곳으로 이동하고, 저녁에는 다시 집으로 돌아오는 반복적인 패턴을 보일 것입니다.

이러한 여러가지 이동 수단중 가장 많은 비중을 차지할 것으로 예상되는 서울지하철을 통해서 사람들이 어떻게 이동하는지 이동 패턴이 궁금해졌습니다. 이를 위해 공공 데이터 포털( https://data.go.kr )에 공개된 데이터를 찾아보았고, 1~9호선의 승하차 데이터를 찾을수 있었습니다.

사용된 데이터셋은 다음과 같습니다. 두 데이터셋이 거의 포맷이 유사하여, 약간의 정제를 통해 데이터셋을 하나로 병합하였습니다. 아쉬운 점은 공개된 데이터셋이 11월 데이터까지만을 포함하여 12월 패턴을 볼수 없다는 점입니다.

데이터셋 형태는 아래와 같이 일자, 지하철호선번호 , 지하철역번호 , 지하철역명 , 승차 혹은 하차 / 9호선은 일부 환승승차/환승 하차가 포함된 형태로 구성되어 있습니다. 시간은 06시 이전, 06시~24시까지 1시간 단위로 승하차 인원 수치가 기록되어 있습니다.

이번 조사를 통해서 9호선이 민자 구간이 있다는 것은 익히 알고 있었지만, 2~3단계(언주역~종합운동장역/2단계, 종합운동장역~중앙보훈병원역/3단계)는 공공이 참여하여 서울 교통공사가 운영한다는 사실을 알 수 있었습니다. 이 구간의 데이터는 공공 데이터 포탈에 공개가 되어 있습니다.

- 다음 포스팅에 이어집니다. -

airflow hdfs sensor 설정과 사용법

airflow에는 HDFS상에서 파일의 존재유무를 체크하는(poke)하는 hdfs_sensor라는 기능이 있습니다.


hdfs sensor를 위한 필요 패키지 설치

  • pip install apache-airflow-providers-apache-hdfs
  • pip install snakebite-py3


hdfs sensor 사용을 위한 connection 정보 설정

  • airflow > Admin > Connections 에서 hdfs sensor에서 접근할 hdfs 정보를 입력
    • hdfs_connection / IP / Port 정보
    • HDFS Name Node 정보를 입력 (IP, Port 8020)


코드에서 HdfsSensor 모듈 import를 통한 sensor 기능 사용

1
2
3
4
5
6
7
8
9
10
11
from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor

hdfs_sensor = HdfsSensor(
task_id = "hdfs_path_sensor",
filepath = "모니터링 하고자 하는 파일의 full path(파일명포함)",
hdfs_conn_id = "hdfs_connection", # airflow > Admin > Connections 에 설정한 hdfs connection 명
queue ="queue", # airflow사용 queue,
poke_interval=30, # 지정한 interval이 지난후 체크
timeout=11400, # 지정한 timeout 시간동안 체크를 지속
dag=dag
)


아래 그림과 같이 Task간에 선행 작업에서 필수적인 파일을 생성했는지 여부를 체크하여, 후행 작업의 시작을 파일이 생성될때까지 순연시키는 효과를 거둘수 있습니다.

ResNet

Deep Network는 Low/Mid/High level features를 추출하여 이를 통해 여러가지 작업을 수행한다. network depth는 이 경우 중요한 요소가 되는데 network가 깊어 질수록 degradation problem이 발생한다.

with the network depth increasing, accuracy gets saturated (which might be uprising) and then degrades rapidly.

[출처] “Deep Residual Learning for Image Recognition”

위 정의와 같이 network depth가 증가할 수록 accuracy가 일정 수준까지 증가후 급격히 감소하는 현상이 발생하게 된다.

network depth가 높으면 train error, test error가 모두 높은 현상이 발생하게 된다. 이는 deeper network를 통해 얻고자 하는 목적과 배치되게 된다.

ResNet은 이 degradation problem을 deep residual learning framework를 통해서 해결하고자 한다. Residual Network는 plan network를 기반으로 shortcut connections를 추가한것이다.

[출처] “Deep Residual Learning for Image Recognition”

위 stacked nonlinear layers에 의한 매핑을 H(x)라고 하면, 다른 stacked nonlinear layers mapping을 F(x):=H(x)-x라 할수 있다. 즉. H(x)=F(x)+x로 표현이 가능하다. 이를 통해 H(x) 대신에 Residual Function인 F(x)를 알아냄으로서 문제를 해결할수 있다. 이것이 Residual Learning의 개념이다.


[Identity Mapping by Shortcuts]

input, output의 dimension이 동일한 identity mapping의 경우 위 그림을 수식으로 좀 더 구체적으로 표현하면 다음과 같다.

위와 같은 identity mapping의 경우 추가적인 parameter 혹은 computational complexity를 유발하지 않는다. 학습에 있어서도 SGD with backpropagation을 통해서 학습이 가능하다.

Residual Net은 깊이가 증가함에 따라 accuracy도 같이 증가하여 기존의 network들보다 깊이가 깊어져도 성능저하를 예방할수 있다.

이 논문에서는 plain network와 residual network간 parameter, depth, width and computational cost가 같으므로 상대적인 성능을 비교할수 있어 이를 비교하고 있다.

[Projection Mapping by Shortcuts]

input, output의 dimension이 다른 경우는 다음과 같이 linear projection을 통해 연산을 한다.

{작성중}


kubernetes로 spark-submit하기(spark-submit to kubernetes)

spark 공식 사이트( https://spark.apache.org/docs/latest/running-on-kubernetes.html ) 에 spark-submit을 kubernetes상에서 수행하는 방법에 대해서 나와 있습니다. 가이드에 따라서 container를 빌드하고 이를 이용해 spark-submit을 하는 상황입니다. 이 작업의 컨셉 다이어 그램은 아래와 같습니다.


제가 겪은 상황은 spark-submit을 kubernetes cluster상에 namespace를 생성하고, 해당 namespace에 istio-injection:enabled를 했을 경우 겪게 되었던 “Initial Job has not accepted any resources: check your cluster UI to ensure that workers are registered and have sufficient resources”에 대한 트러블 슈팅에 대한 내용입니다. 특히 kubeflow를 설치시 istio관련 많은 컴포넌트들이 설치되는데 이로 인해서 위와 같은 에러 상황을 마주하게 되었습니다.

해결책은 spark-submit시 istio-injection을 false처리하는 것입니다. 이를 통해서 spark-submit시 (특히 client mode시) spark driver와 spark executor의 communication이 되지 않아 발생하는 “Initial Job has not accepted any resources: check your cluster UI to ensure that workers are registered and have sufficient resources” 의 트러블 슈팅이 가능합니다.

kubernetes에서 spark-submit을 수행하기 위해서는 다음과 같은 절차를 따릅니다.

1) Namespace 생성 & Service Account 생성 & ClusterRoleBinding 수행

  • Namespace 생성 & istio-injection enable
1
2
kubectl create namespace data-platform
kubectl label namespace data-platform istio-injection=enabled
  • Service Account & ClusterRoleBinding 생성
1
2
3
Using kubectl)
kubectl create serviceaccount spark --namespace=data-platform
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=data-platform:spark --namespace=data-platform
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: data-platform
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spark-role
namespace: data-platform
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: edit
subjects:
- kind: ServiceAccount
name: spark
namespace: data-platform

2) spark-submit시 “—conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false” 옵션을 추가하는데, 이는 istio환경에서 POD를 구동시 istio가 side-car를 injection하면서 kubernetes cluster 외부에 있는 spark driver와 kubernetes cluster내부에 있는 spark client의 communication을 blocking하기 때문에 side-car injection을 하지 않게 하는 것입니다.

“—conf spark.kubernetes.authenticate.oauthTokenFile=“과 “—conf spark.kubernetes.authenticate.caCertFile=“은 위에서 생성한 service account의 secret에 있는 token값과 ca.crt내용입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
./bin/spark-submit \
--master k8s://https://<k8s master IP>:6443 \
--deploy-mode client \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=data-platform \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=<spark official site 가이드 따라 빌드한 컨테이너 이미지> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.<Persistent Volume Claim Name>.options.claimName=<Persistent Volume Claim Name> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.<Persistent Volume Claim Name>.mount.path=<storage class path> \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.<Persistent Volume Claim Name>.mount.readOnly=false \
--conf spark.kubernetes.authenticate.oauthTokenFile=<file location> # service account(spark) token file
--conf spark.kubernetes.authenticate.caCertFile=<file location> # service account(spark) ca.crt파일
--conf spark.kubernetes.executor.request.cores=500m \
--conf spark.kubernetes.driverEnv.memory=1g \
--conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false \ #istio 환경에서 Initial Job Resource 부족 에러를 해결하기 위한 방법(driver-executor의 통신을 가능하게 함.)
--conf spark.executor.cores=1 \
--conf spark.executor.memory=700m \
--conf spark.worker.cores=1 \
--conf spark.driver.host=<이 spark-submit실행 host ip address> \
--conf spark.driver.port=7077 \
--jars elasticsearch-hadoop-7.6.1.jar \
--driver-memory 1g \
--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.5 \
/Users/hikari/hanwharnd/ai_platform/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.5.jar

위와 같은 command를 수행시 아래 처럼 해당 namespace(data-platform)에서 spark용 Pod가 생성되는 것을 확인할 수 있습니다. 아래의 Pod는 spark-submit을 중지하면 expire됩니다.