1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| temperature.py
import os import json from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext().getOrCreate() sc.setLogLevel("DEBUG")
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, '<kafka host ip/kubernetes outside>:2181', 'spark-streaming', {'temperatureTopic':1})
lines = kafkaStream.map(lambda x: x[1]) lines.pprint()
def process_streaming(series_data): import datetime import json
series_data = series_data.replace("\"", "") series_data = series_data.replace("\'", "\"") series_data_json = json.loads(series_data) date_time_obj = datetime.datetime.strptime(series_data_json.get("Record time"), "%Y-%m-%d %H:%M:%S.%f") region = series_data_json.get("Region")
if region == 'Seoul': region_temperature_degress = { 'seoul' : { 'temperature' : str(series_data_json.get("Temperature")) } } hostname = 'sensor_seoul_01' id = 'sensor_seoul_01' elif region == 'Busan': region_temperature_degress = { 'busan' : { 'temperature' : str(series_data_json.get("Temperature")) } } hostname = 'sensor_busan_01' id = 'sensor_busan_01' elif region == 'Jeju': region_temperature_degress = { 'jeju' : { 'temperature' : str(series_data_json.get("Temperature")) } } hostname = 'sensor_jeju_01' id = 'sensor_jeju_01'
elastic_data_json_with_ecs ={ '@timestamp' : date_time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f"), 'ecs' : { 'version' : '1.0.0' }, 'host' : { 'name' : 'sensor', 'hostname' : hostname, 'id' : id }, 'event' : { 'dataset' : 'thermometer.air_temperature', 'module' : 'thermometer', 'duration' : 10, 'original': series_data }, 'metricset' : { 'name' : 'air_temperature' }, 'service' : { 'type' : 'thermometer' }, 'thermometer' : { 'air_temperature' : region_temperature_degress } } return json.dumps(elastic_data_json_with_ecs)
lines = lines.map(lambda item: process_streaming(item)).map(lambda x: ('key', x)) lines.pprint()
lines.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile( \ path='-', \ outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \ keyClass="org.apache.hadoop.io.NullWritable", \ valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \ conf={"es.nodes" : "<elastic search IP>", "es.port" : "9200", "es.resource" : "testindex", "es.input.json": "yes" }))
ssc.start() ssc.awaitTermination()
|