Purumir's Blog

Machine Learning, SW architect, Management, favorites

(airflow) Continuous kudu table migration using the Airflow SparkOperator

Background

In my case, a big scale system was splitted into several small systems. But, some code values in a big scale system should be used at every small systems. Below diagram shows such a situation.

: a bigger system was spliitted into some smaller systems

In the big data ecosystem, data values can be updated in the Apache KUDU via the Apache Impala. The Apache Impala is the role of the bridge for the CRUD operation. In my case, some code values is inserted newly. For these reasons, the bigger DW system use the Apache KUDU which is bridged via the Apache Impala.

For the migration of the code values on every day, I have developed the SparkOperator on the circumstance of the Airflow. The concept of the migration is like below.

KUDU Table Creation

At the smaller DW systems, some KUDU tables which have same schema of the KUDU tables in the bigger DW system were created for managing the code values.

1
2
3
4
5
6
7
8
9
10
11
-- This is the example
CREATE TABLE code_table
(
code1 STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
code2 STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY(code 1)
)
STORED AS KUDU
TBLPROPERTIES(
'kudu.master_addresses'='<KUDU Master Host #1>,<KUDU Master Host #2>..<KUDU Master Host #N>'
)

SparkSubmitOperator Code

The Airflow provide the SparkSubmitOperator for the submitting the spark job. Additionally, some libraries were used for the submitting the spark job.

In below code, a spark code snippet is located in the line of ‘spark_scripts’.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable

class PullKuduTablesSparkSubmitOperator(SparkSubmitOperator):
@apply_defaults
def __init__(
self,
applications='',
conn_id='spark_default',
~
executor_memory='<Executor Memory Size>',
driver_memory='<Driver Memory Size>',
~
name='pull_kudu_tables',
~
spark_binary='<lib>/spark/bin/spark-submit',
*args,
**kwargs):

super(SparkSubmitOperator, self).__init__(*args, **kwargs)

self._application = application
~
self._spark_binary = spark_binary
self._hook = None
self._conn_id = conn_id

def execute(self, context):
spark_scripts = Variable.get('spark_scripts', deserialize_json=True)
self._application = spark_scripts # spark submit code

super(PullKuduTablesSparkSubmitOperator, self).execute(context)


class PullKuduTablesSparkSubmitPlugin(AirflowPlugin):
name = 'pull_kudu_tables_spark_submit_plugin'
operators = [ PullKuduTablesSparkSubmitOperator ]

Spark Scripts

The spark submit code snippet is very simple. Spark receives the host informations of the KUDU masters. The Apache KUDU is possible to have the server groups of the masters. In that cluster of the KUDU, one server has the role of a leader in the master servers, and the other servers have the roles of the followers. The following article describes such the Architectural Overview.

Cloudera KUDU Architectural Overview

Below code snippets show the spark code for the KUDU table migration.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys, os
import json
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from airflow.models import Variable

spark = SparkSession.builder.getOrCreate();

def main():
table_name = sys.args[1] # Kudu table name for the migration
from_kudu_masters = sys.args[2] # Source Kudu Masters
to_kudu_masters = sys.args[3] # Destination Kudu Masters

from_kudu_table_name = "impala::<source db>.{table_name}".format(table_name=table_name)
# load the data from kudu table of the source cluster to the spark dataframe
df = spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master", from_kudu_masters)
.option("kudu.table", from_kudu_table_name)
.load()

to_kudu_table_name = "impala::<destination db>.{table_name}".format(table_name=table_name)

# write the data from the spark dataframe to the kudu table of the destination cluster.
df.write.format("org.apache.kudu.spark.kudu")
.option("kudu.master", to_kudu_masters)
.option("kudu.table", to_kudu_table_name)
.mode("append")
.save()

if __name__ == "__main__":
main()
sys.exit(0)

Airflow DAG

The Variable feature (Menu Location : Airflow Admin > Variable) could be used for saving some kind of the static information. So, I have registered not only the address of the KUDU masters of the source cluster, but also the address of the KUDU masters of the destination cluster.

The SparkSubmitOpeator pull the KUDU table from the source cluster to the desination cluster per a table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from airflow import DAG
from airflow.models import Variable
from custom_operators.pull_kudu_tables_spark_submit_operator import PullKuduTablesSparkSubmitOperator
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
import sys

default_args ={
'owner': '<user name>',
'start_date' : datetime(2022, 10, 11),
'email': ['<EMAIL ADDRESS>'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=10)
}

# The KUDU Masters of the source cluster and the destination cluster
kudu_masters = Variable.get("kudu_masters", deserialize_json=True)
# The table name for the migration.
kudu_tables = Variable.get("kudu_tables", deserialize_json=True)

with DAG(
dag_id = 'pull_kudu_tables',
default_args=default_args,
schedule_interval='<Schedule Interval>',
tags=['KUDU', 'Migration']
) as dag:

process_list = []

for table in kudu_tables:
application_args = [table,
','.join(map(str, kudu_masters['source_cluster'])),
','.join(map(str, kudu_masters['destination_cluster']))
]

remove_detination_kudu_table
= BashOperator(
task_id = 'remove_kudu_table',
bash_command = """ env -i impala-shell -i <Impala Server Address> -q 'delete from <destination database name>.{table_name}' """.format(table_name=table),
queue='<Airflow Queue Name>',
dag=dag
)

pull_kudu_spark_operator
= PullKuduTablesSparkSubmitOperator(
task_id = 'pull_kudu_table_{table_name}'.format(table_name=table),
jars='<lib>/kudu/kudu_spark_<your version>.jar, <lib>/kudu/kudu_spark_tools_<your version>.jar',
application_args=application_args,
queue='<Airflow Queue Name>',
dag=dag
)

# after removing the data, the data is inserted newly.
remove_detination_kudu_table >> pull_kudu_spark_operator
process_list.append(remove_detination_kudu_table)

process_list # Airlow tasks execution for all tables.

These work fine through the Airflow SparkSubmitOperator. In the case of operation of the code system, the Apache KUDU is the good choice.