Spring Boot集成Flink实现ES数据同步
【代码】Spring Boot集成Flink实现ES数据同步。
·
Spring Boot集成Flink实现ES数据同步
添加依赖
在pom.xml中配置核心依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
配置Elasticsearch连接
创建Elasticsearch配置类定义传输参数:
@Configuration
public class ElasticsearchConfig {
@Value("${es.host}") private String host;
@Value("${es.port}") private int port;
@Bean
public TransportClient transportClient() throws UnknownHostException {
return new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
}
}
实现数据同步逻辑
构建Flink流处理作业进行数据转换和写入:
@Service
public class FlinkESService {
@Autowired private TransportClient esClient;
public void startSync() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据源(实际替换为Kafka/MySQL等连接器)
DataStream<String> sourceStream = env.addSource(new SourceFunction<String>() {
@Override public void run(SourceContext<String> ctx) {
ctx.collect("{\"id\":1,\"name\":\"test\"}");
}
@Override public void cancel() {}
});
// 数据转换处理
DataStream<Map<String, Object>> processedStream = sourceStream
.map(JSON::parseObject)
.map(obj -> {
Map<String, Object> doc = new HashMap<>();
doc.put("id", obj.getInteger("id"));
doc.put("name", obj.getString("name"));
return doc;
});
// 写入Elasticsearch
List<HttpHost> httpHosts = Collections.singletonList(
new HttpHost(esClient.settings().get("host"), 9200, "http"));
processedStream.addSink(ElasticsearchSink.<Map<String,Object>>builder(
httpHosts,
(element, ctx, indexer) -> indexer.add(
Requests.indexRequest()
.index("target_index")
.source(element)
)
).build());
env.execute("ES Data Sync Job");
}
}
启动类配置
主启动类添加Flink作业触发点:
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired private FlinkESService flinkESService;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
flinkESService.startSync();
}
}
关键配置参数
application.yml需包含以下参数:
es:
host: 127.0.0.1
port: 9300
flink:
parallelism: 4
注意事项
- 生产环境需替换模拟数据源为实际源连接器(如KafkaConsumer)
- Elasticsearch版本需与
flink-connector版本匹配 - 批量写入建议开启
bulk.flush.max.actions参数 - 异常处理需实现
ActionRequestFailureHandler接口
更多推荐
所有评论(0)