Spring Boot集成Flink实现ES数据同步

添加依赖

pom.xml中配置核心依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
配置Elasticsearch连接

创建Elasticsearch配置类定义传输参数:

@Configuration
public class ElasticsearchConfig {
    @Value("${es.host}") private String host;
    @Value("${es.port}") private int port;
    
    @Bean
    public TransportClient transportClient() throws UnknownHostException {
        return new PreBuiltTransportClient(Settings.EMPTY)
            .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
    }
}
实现数据同步逻辑

构建Flink流处理作业进行数据转换和写入:

@Service
public class FlinkESService {
    @Autowired private TransportClient esClient;

    public void startSync() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 模拟数据源(实际替换为Kafka/MySQL等连接器)
        DataStream<String> sourceStream = env.addSource(new SourceFunction<String>() {
            @Override public void run(SourceContext<String> ctx) {
                ctx.collect("{\"id\":1,\"name\":\"test\"}");
            }
            @Override public void cancel() {}
        });

        // 数据转换处理
        DataStream<Map<String, Object>> processedStream = sourceStream
            .map(JSON::parseObject)
            .map(obj -> {
                Map<String, Object> doc = new HashMap<>();
                doc.put("id", obj.getInteger("id"));
                doc.put("name", obj.getString("name"));
                return doc;
            });

        // 写入Elasticsearch
        List<HttpHost> httpHosts = Collections.singletonList(
            new HttpHost(esClient.settings().get("host"), 9200, "http"));
        
        processedStream.addSink(ElasticsearchSink.<Map<String,Object>>builder(
            httpHosts,
            (element, ctx, indexer) -> indexer.add(
                Requests.indexRequest()
                    .index("target_index")
                    .source(element)
            )
        ).build());

        env.execute("ES Data Sync Job");
    }
}
启动类配置

主启动类添加Flink作业触发点:

@SpringBootApplication
public class Application implements CommandLineRunner {
    @Autowired private FlinkESService flinkESService;
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        flinkESService.startSync();
    }
}
关键配置参数

application.yml需包含以下参数:

es:
  host: 127.0.0.1
  port: 9300
flink:
  parallelism: 4
注意事项
  • 生产环境需替换模拟数据源为实际源连接器(如KafkaConsumer)
  • Elasticsearch版本需与flink-connector版本匹配
  • 批量写入建议开启bulk.flush.max.actions参数
  • 异常处理需实现ActionRequestFailureHandler接口
Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐