Spark读取ElasticSearch数据——聚合查询

本文主要介绍如何通过Spark读取ES中的数据,并对ES进行聚合查询。

我使用到的Scala、Spark和es的版本信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<properties>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.0-cdh6.3.1</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.5.4</version>
</dependency>
</dependencies>

下面是Spark读取es数据的测试代码,大家可以参考一下,主要有三种方法,用到了es的高阶API(EsSparkSQL、EsSpark.esRDD)和低阶API,建议使用方法一和方法三,方法二在测试中发现es的query语句不生效,测试了很多遍也没有通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import java.net.InetAddress
import java.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.max
import org.elasticsearch.action.search.SearchType
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.search.aggregations.AggregationBuilders
import org.elasticsearch.search.aggregations.metrics.max.InternalMax
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
import org.elasticsearch.transport.client.PreBuiltTransportClient

object SparkEsDemo {
def main(args: Array[String]): Unit = {
run()
}

def run(): Unit = {
val sparkSession = SparkSession
.builder()
.appName("SparkEsTestDemo")
.config("es.nodes", "xxx.xxx.xxx.xxx")
.config("es.port", 9200)
//重试5次(默认3次)
.config("es.batch.write.retry.count", "5")
//等待60秒(默认10s)
.config("es.batch.write.retry.wait", "60")
//es连接超时时间100s(默认1m)
.config("es.http.timeout", "200s")
.getOrCreate()

// 方法一:通过将es的数据读取为dataframe
val esResource = "esIndex/esType" // 换成你自己的索引和索引类型
val esDf = EsSparkSQL
.esDF(sparkSession,resource = esResource) // resource参数填写es的索引和索引类型
.agg(max("fieldName")) // 到了这里就相当于直接将es的数据转化为dataframe直接做计算了,很直观
.withColumnRenamed("max(fieldName)","maxFieldName")
.na.fill(0)
esDf.show(false)

// 方法三:通过编写Query查询语句查询es数据,返回的是rdd数据
val query = s"""{
| "query": {
| "bool": {}
| },
| "size": 0,
| "aggs": {
| "max_price": {
| "max": {
| "field": "price"
| }
| }
| }
|}""".stripMargin
val esRdd = EsSpark
.esRDD(sparkSession.sparkContext, esResource, query)
// https://github.com/elastic/elasticsearch-hadoop/issues/276
esRdd.take(10).foreach(println)
esRdd.take(10).foreach(f => {
val map:collection.Map[String,AnyRef] = f._2
for (s <- map) {
println(s._1 + s._2)
}
})

// 方法三:通过es客户端的低阶API实现聚合查询
// https://www.cnblogs.com/benwu/articles/9230819.html
val prop = new Properties()
val esClient = initClient(prop)
val searchResponse = esClient
.prepareSearch("recommend_smartvideo_spider_acgn")
.setTypes("smartvideo_type_user_irrelative")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setExplain(true)
.setSize(0)
.addAggregation(AggregationBuilders.max("max_cursor").field("inx_cursor"))
.setFetchSource(false)
.execute().actionGet()
val maxValue:InternalMax = searchResponse.getAggregations.get("max_cursor")
println(maxValue.value())
}

/**
* 初始化es客户端
* @param prop es配置类
* @return
*/
def initClient(prop: Properties): PreBuiltTransportClient = {
val setting = Settings.builder()
.put("cluster.name", prop.getProperty("es.cluster.name"))
.put("client.transport.sniff", true)
.build()
val client = new PreBuiltTransportClient(setting)
val hostAndPortArr = prop.getProperty("es.host&port").split(",")
for (hostPort <- hostAndPortArr) {
val hp = hostPort.split(":")
if (hp.length == 2) {
client.addTransportAddress(
new TransportAddress(InetAddress.getByName(hp(0)), hp(1).toInt)
)
} else {
client.addTransportAddress(
new TransportAddress(InetAddress.getByName(hp(0)), 9300)
)
}
}
client
}
}
------本文结束,欢迎收藏本站、分享、评论或联系作者!------
点击查看
赠人玫瑰 手有余香
0%