Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql

前言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

今天为大家带来Flink的一个综合应用案例:Flink数据写入Kafka+从Kafka存入Mysql

第一部分:写数据到kafka

public static void writeToKafka() throws Exception{
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", CONST_SERIALIZER);
        props.put("value.serializer", CONST_SERIALIZER);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //构建User对象,在name为data后边加个随机数
        int randomInt = RandomUtils.nextInt(1, 100000);
        User user = new User();
        user.setName("data" + randomInt);
        user.setId(randomInt);
        //转换成JSON
        String userJson = JSON.toJSONString(user);
        //包装成kafka发送的记录
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER, partition,
                null, userJson);
        //发送到缓存
        producer.send(record);
        System.out.println("向kafka发送数据:" + userJson);
        //立即发送
        producer.flush();
    }

重点:

//发送到缓存
        producer.send(record);

为了增强代码的Robust,我们将常量单独拎出来:

//本地的kafka机器列表
    public static final String BROKER_LIST = "192.168.88.161:9092";
    //kafka的topic
    public static final String TOPIC_USER = "USER";
    //kafka的partition分区
    public static final Integer partition = 0;
    //序列化的方式
    public static final String CONST_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //反序列化
    public static final String CONST_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";

main方法如下:

public static void main(String[] args) {
        while(true) {
            try {
                //每三秒写一条数据
                TimeUnit.SECONDS.sleep(3);
                writeToKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

第二部分:从kafka获取数据

KafkaRickSourceFunction.java

import com.hy.flinktest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@Slf4j
public class KafkaRickSourceFunction extends RichSourceFunction<String>{
    //kafka
    private static Properties prop = new Properties();
    private boolean running = true;
  //作静态化处理,增强robust
    private static Integer partition = WritedatatoKafka.partition;
    static {
        prop.put("bootstrap.servers",WritedatatoKafka.BROKER_LIST);
        prop.put("zookeeper.connect","192.168.88.161:2181");
        prop.put("group.id",WritedatatoKafka.TOPIC_USER);
        prop.put("key.deserializer",WritedatatoKafka.CONST_DESERIALIZER);
        prop.put("value.deserializer",WritedatatoKafka.CONST_DESERIALIZER);
        prop.put("auto.offset.reset","latest");
        prop.put("max.poll.records", "500");
        prop.put("auto.commit.interval.ms", "1000");
    }
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        //创建一个消费者客户端实例
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(prop);
        //只消费TOPIC_USER 分区
        TopicPartition topicPartition = new TopicPartition(WritedatatoKafka.TOPIC_USER,partition);
        long offset =0; //这个初始值应该从zk或其他地方获取
        offset = placeOffsetToBestPosition(kafkaConsumer, offset, topicPartition);
        while (running){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            if(records.isEmpty()){
                continue;
            }
            for (ConsumerRecord<String, String> record : records) {
                //record.offset();
                //record.key()
                String value = record.value();
                sourceContext.collect(value);
            }
        }
    }

然后 返回最合适的offset

/**
     * 将offset定位到最合适的位置,并返回最合适的offset。
     * @param kafkaConsumer consumer
     * @param offset offset
     * @param topicPartition partition
     * @return the best offset
     */
    private long placeOffsetToBestPosition(
            KafkaConsumer<String, String> kafkaConsumer,
            long offset, TopicPartition topicPartition) {
        List<TopicPartition> partitions = Collections.singletonList(topicPartition);
        kafkaConsumer.assign(partitions);
        long bestOffset = offset;
        if (offset == 0) {
            log.info("由于offset为0,重新定位offset到kafka起始位置.");
            kafkaConsumer.seekToBeginning(partitions);
        } else if (offset > 0) {
            kafkaConsumer.seekToBeginning(partitions);
            long startPosition = kafkaConsumer.position(topicPartition);
            kafkaConsumer.seekToEnd(partitions);
            long endPosition = kafkaConsumer.position(topicPartition);
            if (offset < startPosition) {
                log.info("由于当前offset({})比kafka的最小offset({})还要小,则定位到kafka的最小offset({})处。",
                        offset, startPosition, startPosition);
                kafkaConsumer.seekToBeginning(partitions);
                bestOffset = startPosition;
            } else if (offset > endPosition) {
                log.info("由于当前offset({})比kafka的最大offset({})还要大,则定位到kafka的最大offset({})处。",
                        offset, endPosition, endPosition);
                kafkaConsumer.seekToEnd(partitions);
                bestOffset = endPosition;
            } else {
                kafkaConsumer.seek(topicPartition, offset);
            }
        }
        return bestOffset;
    }
    @Override
    public void cancel() {
        running = false;
    }
}

第三部分

主类:从kafka读取数据写入mysql

//1.构建流执行环境 并添加数据源
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.addSource(new KafkaRickSourceFunction());
//2.从kafka里读取数据,转换成User对象
DataStream<User> dataStream = dataStreamSource.map(lines -> JSONObject.parseObject(lines, User.class));
//3.收集5秒钟的总数
dataStream.timeWindowAll(Time.seconds(5L)).
        apply(new AllWindowFunction<User, List<User>, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<User> iterable, Collector<List<User>> out) throws Exception {
                List<User> users = Lists.newArrayList(iterable);
                if(users.size() > 0) {
                    System.out.println("5秒内总共收到的条数:" + users.size());
                    out.collect(users);
                }
            }
        })
        //sink 到数据库
                .addSink(new MysqlRichSinkFunction());
        //打印到控制台
        //.print();

第四部分:

写入到目标数据库sink

MysqlRichSinkFunction.java

@Slf4j
public class MysqlRichSinkFunction extends RichSinkFunction<List<User>> {
    private Connection connection = null;
    private PreparedStatement ps = null;
    @Override
    public void open(Configuration parameters) throws Exception {
       // super.open(parameters);
        log.info("获取数据库连接");
        connection = DbUtil.getConnection();
        String sql = "insert into user1(id,name) values (?,?)";
        ps = connection.prepareStatement(sql);
    }
    public void invoke(List<User> users, Context ctx) throws Exception {
        //获取ReadMysqlResoure发送过来的结果
        for(User user : users) {
            ps.setLong(1, user.getId());
            ps.setString(2, user.getName());
            ps.addBatch();
        }
        //一次性写入
        int[] count = ps.executeBatch();
        log.info("成功写入Mysql数量:" + count.length);
    }
    @Override
    public void close() throws Exception {
        //关闭并释放资源
        if(connection != null) {
            connection.close();
        }
        if(ps != null) {
            ps.close();
        }
    }
}

总结

以上便是Flink数据写入Kafka+从Kafka存入Mysql

如果有帮助,给manor一键三连吧~~


目录
相关文章
|
1天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
9 0
|
1天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 1
|
1天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之用CTAS从mysql同步数据到hologres,改了字段长度,报错提示需要全部重新同步如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
34 8
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 5
|
5天前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
516 2
2024年了,如何更好的搭建Kafka集群?
|
5天前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
49 0
|
7月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
279 0
|
5天前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
73 0
|
5天前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
158 0
http://www.vxiaotou.com