日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

Python如何把Spark數(shù)據(jù)寫入ElasticSearch

瀏覽:32日期:2022-07-29 14:37:53

這里以將Apache的日志寫入到ElasticSearch為例,來演示一下如何使用Python將Spark數(shù)據(jù)導(dǎo)入到ES中。

實際工作中,由于數(shù)據(jù)與使用框架或技術(shù)的復(fù)雜性,數(shù)據(jù)的寫入變得比較復(fù)雜,在這里我們簡單演示一下。

如果使用Scala或Java的話,Spark提供自帶了支持寫入ES的支持庫,但Python不支持。所以首先你需要去這里下載依賴的ES官方開發(fā)的依賴包包。

下載完成后,放在本地目錄,以下面命令方式啟動pyspark:

pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,請設(shè)置環(huán)境變量:

export PYSPARK_PYTHON=/usr/bin/python3理解如何寫入ES的關(guān)鍵是要明白,ES是一個JSON格式的數(shù)據(jù)庫,它有一個必須的要求。數(shù)據(jù)格式必須采用以下格式

{ 'id: { the rest of your json}}

往下會展示如何轉(zhuǎn)換成這種格式。

解析Apache日志文件我們將Apache的日志文件讀入,構(gòu)建Spark RDD。然后我們寫一個parse()函數(shù)用正則表達式處理每條日志,提取我們需要的字

rdd = sc.textFile('/home/ubuntu/walker/apache_logs')regex=’^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] '(S+)s?(S+)?s?(S+)?' (d{3}|-) (d+|-)s?'?([^']*)'?s?'?([^']*)?'?$’

p=re.compile(regex)def parse(str): s=p.match(str) d = {} d[’ip’]=s.group(1) d[’date’]=s.group(4) d[’operation’]=s.group(5) d[’uri’]=s.group(6) return d

換句話說,我們剛開始從日志文件讀入RDD的數(shù)據(jù)類似如下:

[’83.149.9.216 - - [17/May/2015:10:05:03 +0000] 'GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1' 200 203023 'http://semicomplete.com/presentations/logstash-monitorama-2013/' 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36'’]

然后我們使用map函數(shù)轉(zhuǎn)換每條記錄:

rdd2 = rdd.map(parse)

rdd2.take(1)

[{’date’: ’17/May/2015:10:05:03 +0000’, ’ip’: ’83.149.9.216’, ’operation’: ’GET’, ’uri’: ’/presentations/logstash-monitorama-2013/images/kibana-search.png’}]

現(xiàn)在看起來像JSON,但并不是JSON字符串,我們需要使用json.dumps將dict對象轉(zhuǎn)換。

我們同時增加一個doc_id字段作為整個JSON的ID。在配置ES中我們增加如下配置“es.mapping.id”: “doc_id”告訴ES我們將這個字段作為ID。

這里我們使用SHA算法,將這個JSON字符串作為參數(shù),得到一個唯一ID。計算結(jié)果類似如下,可以看到ID是一個很長的SHA數(shù)值。

rdd3.take(1)

[(’a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c’, ’{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'doc_id': 'a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}’)]

現(xiàn)在我們需要制定ES配置,比較重要的兩項是:

“es.resource” : ‘walker/apache’: 'walker'是索引,apache是類型,兩者一般合稱索引 “es.mapping.id”: “doc_id”: 告訴ES那個字段作為整個文檔的ID,也就是查詢結(jié)果中的_id

其他的配置自己去探索。

然后我們使用saveAsNewAPIHadoopFile()將RDD寫入到ES。這部分代碼對于所有的ES都是一樣的,比較固定,不需要理解每一個細節(jié)

es_write_conf = { 'es.nodes' : 'localhost', 'es.port' : '9200', 'es.resource' : ’walker/apache’, 'es.input.json': 'yes', 'es.mapping.id': 'doc_id' } rdd3.saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf)rdd3 = rdd2.map(addID)def addId(data): j=json.dumps(data).encode(’ascii’, ’ignore’) data[’doc_id’] = hashlib.sha224(j).hexdigest() return (data[’doc_id’], json.dumps(data))

最后我們可以使用curl進行查詢

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*{ '_index' : 'walker', '_type' : 'apache', '_id' : '227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2', '_score' : 1.0, '_source' : { 'date' : '17/May/2015:10:05:32 +0000', 'ip' : '91.177.205.119', 'operation' : 'GET', 'doc_id' : '227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2', 'uri' : '/favicon.ico' }

如下是所有代碼:

import jsonimport hashlibimport redef addId(data): j=json.dumps(data).encode(’ascii’, ’ignore’) data[’doc_id’] = hashlib.sha224(j).hexdigest() return (data[’doc_id’], json.dumps(data))def parse(str): s=p.match(str) d = {} d[’ip’]=s.group(1) d[’date’]=s.group(4) d[’operation’]=s.group(5) d[’uri’]=s.group(6) return d regex=’^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] '(S+)s?(S+)?s?(S+)?' (d{3}|-) (d+|-)s?'?([^']*)'?s?'?([^']*)?'?$’p=re.compile(regex)rdd = sc.textFile('/home/ubuntu/walker/apache_logs')rdd2 = rdd.map(parse)rdd3 = rdd2.map(addID)es_write_conf = { 'es.nodes' : 'localhost', 'es.port' : '9200', 'es.resource' : ’walker/apache’, 'es.input.json': 'yes', 'es.mapping.id': 'doc_id' } rdd3.saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf)

也可以這么封裝,其實原理是一樣的

import hashlibimport jsonfrom pyspark import Sparkcontextdef make_md5(line): md5_obj=hashlib.md5() md5_obj.encode(line) return md5_obj.hexdigest()def parse(line): dic={} l = line.split(’t’) doc_id=make_md5(line) dic[’name’]=l[1] dic[’age’] =l[2] dic[’doc_id’]=doc_id return dic #記得這邊返回的是字典類型的,在寫入es之前要記得dumpsdef saveData2es(pdd, es_host, port,index, index_type, key): ''' 把saprk的運行結(jié)果寫入es :param pdd: 一個rdd類型的數(shù)據(jù) :param es_host: 要寫es的ip :param index: 要寫入數(shù)據(jù)的索引 :param index_type: 索引的類型 :param key: 指定文檔的id,就是要以文檔的那個字段作為_id :return: ''' #實例es客戶端記得單例模式 if es.exist.index(index): es.index.create(index, ’spo’) es_write_conf = { 'es.nodes': es_host, 'es.port': port, 'es.resource': index/index_type, 'es.input.json': 'yes', 'es.mapping.id': key } (pdd.map(lambda _dic: (’’, json.dumps(_dic)))) #這百年是為把這個數(shù)據(jù)構(gòu)造成元組格式,如果傳進來的_dic是字典則需要jdumps,如果傳進來之前就已經(jīng)dumps,這便就不需要dumps了 .saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf) )if __name__ == ’__main__’: #實例化sp對象 sc=Sparkcontext() #文件中的呢內(nèi)容一行一行用sc的讀取出來 json_text=sc.textFile(’./1.txt’) #進行轉(zhuǎn)換 json_data=json_text.map(lambda line:parse(line)) saveData2es(json_data,’127.0.01’,’9200’,’index_test’,’index_type’,’doc_id’) sc.stop()

看到了把,面那個例子在寫入es之前加了一個id,返回一個元組格式的,現(xiàn)在這個封裝指定_id就會比較靈活了

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標簽: Python 編程
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日韩福利在线观看| 日产午夜精品一线二线三线| 欧美日一区二区| 成人在线免费观看网站| 国产精品**亚洲精品| 国产精品久久久久av蜜臀| 日韩高清中文字幕一区| 青青青国产精品| 日韩极品在线观看| 国产精品白丝一区二区三区| 精品国产不卡一区二区| 捆绑调教日本一区二区三区| 久久婷婷激情| 精品91久久久久| 亚洲精品字幕| 欧美一级二级视频| 精品国产一区二区三区性色av| 精品国产aⅴ| 亚洲1234区| 国产精品美女久久久| 日本在线不卡视频| 国产精品美女午夜爽爽| 久久精品九色| 91精品国产乱码久久久久久久 | 荡女精品导航| 日韩毛片在线| 蜜桃视频一区二区三区在线观看| 婷婷综合一区| 麻豆精品在线| 亚洲大全视频| 成人台湾亚洲精品一区二区| 麻豆精品久久久| 97精品在线| 欧美日韩一二| 精品在线91| 国产精品videossex久久发布| 国产精品男女| 久久麻豆视频| 日韩天堂av| 奇米色欧美一区二区三区| 麻豆国产欧美日韩综合精品二区| 久久精品亚洲人成影院| 亚洲一区二区日韩| 国产精品chinese| 久久青草久久| 日韩福利视频一区| 中文字幕在线官网| 天堂av在线一区| 国产日韩视频| 最新中文字幕在线播放| 丝袜美腿亚洲一区| 精品一区二区三区中文字幕| 欧美高清不卡| 欧美欧美黄在线二区| 热三久草你在线| 亚洲精品九九| 日韩一区欧美| 爽好久久久欧美精品| 亚洲欧美日韩国产| 国产精品久久久久9999高清| 欧美.日韩.国产.一区.二区| 欧美有码在线| 欧美中文一区二区| 国产精品一区二区免费福利视频| 日韩国产一区二区| 日韩精品一页| 91精品一区国产高清在线gif| 日本高清久久| 欧美日韩在线二区| 国产精品久久久久久久久久妞妞| 婷婷六月综合| 国产一区国产二区国产三区| 亚洲永久精品唐人导航网址| 日韩欧美自拍| 欧美激情在线精品一区二区三区| 亚洲欧美日韩国产| 蜜桃视频在线网站| 国产乱码精品一区二区亚洲| 亚洲一区二区三区高清| 麻豆中文一区二区| 在线观看一区| 国产综合精品一区| 国产一区国产二区国产三区| 日韩一二三区在线观看| 婷婷综合社区| 欧美日韩国产观看视频| 国产精品日本一区二区不卡视频 | 欧美一区久久久| 国产情侣一区| 美女日韩在线中文字幕| 亚洲最新无码中文字幕久久| 国产欧美日韩一区二区三区四区| 国产视频一区免费看| 婷婷综合六月| 成人在线免费观看网站| 国产精品男女| 欧美日本精品| 亚洲精品视频一二三区| 一区视频在线| 欧美.日韩.国产.一区.二区| 日韩免费福利视频| 日韩88av| 国产成人久久精品麻豆二区| 国产乱码精品一区二区亚洲| 日韩中文字幕视频网| 亚洲深爱激情| 尤物在线精品| 欧美日韩精品免费观看视频完整| 久久裸体视频| 91精品国产调教在线观看| 亚洲精品在线影院| 欧洲一区二区三区精品| 久草免费在线视频| 97人人精品| а√天堂8资源在线| 精品视频久久| 精品一区av| 精品美女久久| 国产一区二区精品久| 九九久久国产| 日本一二区不卡| 日韩一区二区在线免费| 桃色一区二区| 久久精品一区二区不卡| 91精品国产91久久久久久黑人| 久久蜜桃资源一区二区老牛| 99久精品视频在线观看视频| 欧美亚洲激情| 午夜日韩福利| 一区二区三区网站| 日韩区欧美区| 国产麻豆一区二区三区精品视频| 日韩高清欧美激情| 国产精品一国产精品| 国产精品99久久免费| 久久99免费视频| 成人午夜在线| 亚洲特色特黄| 天堂av在线一区| 欧美日韩调教| 国产一区一一区高清不卡| 在线观看精品| 免费日韩视频| 欧美一区不卡| 国产一区二区三区探花| 91精品推荐| 亚洲天堂免费| 国产精品白浆| 日韩电影免费网址| 午夜国产精品视频| 最近国产精品视频| 国产精品白丝久久av网站| 在线看片国产福利你懂的| 久久久久免费av| 久久夜色精品| 国产精品2023| 亚洲性图久久| 日韩欧美中文字幕一区二区三区 | 日韩成人午夜精品| 麻豆精品在线播放| 亚洲大片在线| 亚洲精品自拍| 久久69成人| 久久中文字幕av| 亚洲尤物av| 久久伊人国产| 红桃视频国产一区| 欧美日本不卡高清| 天堂а√在线最新版中文在线| 亚洲黄色在线| 国产精品一站二站| 久久激情中文| 日韩精品午夜视频| 久久男人av资源站| 久久国产成人| 久久久精品国产**网站| 免费观看久久av| 国产乱码精品一区二区亚洲| 久久精品青草| 欧美精品影院| 午夜视频精品| 久久精品女人| 日本大胆欧美人术艺术动态| 国产精品一线| 五月婷婷亚洲| 国产高清日韩| 亚洲免费播放| 国产精久久久| 视频一区二区中文字幕| 精品国产乱码久久久久久樱花| 宅男噜噜噜66国产日韩在线观看| 久久av资源| 国产精品美女久久久| 久久天堂影院| 亚洲欧美久久精品| 欧美不卡高清一区二区三区| 日韩欧美中文字幕电影| 亚洲v在线看| 精品黄色一级片| 午夜久久av |