2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

前言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

这次是上篇文章的续集,最新的Flink版本大大简化了之前复杂的写法~

之前的文章

首先准备模拟数据:

//1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Kafka的一系列配置,可以从官网直接copy过来@~@~

然后正式生产模拟数据:

//2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
        Random random = new Random();
        while (true){
            //随机生成分类和金额
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//获取的随机分类
            double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)
            CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis());
            String data = JSON.toJSONString(categoryPojo);
            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data));
            System.out.println("数据是"+data);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

这里的实体类用Lombok,比较简单:

这是之前写的Lombok用法文章

@Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double price;//该分类总销售额
        private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }

有了数据写入Kafka,我们开始消费“她”:

设置一下Flink运行环境:

//TODO 1.设置环境env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //并行度为1,表示不分区
        env.setParallelism(1);

配置Kafka相关并从哪里开始读offset

//TODO 2设置Kafka相关参数
        Properties props = new Properties();
        //kafka的地址,消费组名
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");
        //Flink设置kafka的offset,从最新的开始
         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "myDemo",
                new SimpleStringSchema(),
                props
        );
        consumer.setStartFromLatest();
        consumer.setCommitOffsetsOnCheckpoints(true);

第3步解析数据源并测试:

DataStreamSource<String> source = env.addSource(consumer);
         SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                Order order = JSON.toJavaObject(jsonObject, Order.class);
                return order;
            }
        });
        //测试一下
        mapDS.print();

success!

最后存入Mysql

//sink输出到Mysql
        result.addSink(JdbcSink.sink(
                "INSERT INTO t_order(category,price,time) values(?,?,?)",
                (ps,order)->{
                    ps.setString(1,order.category);
                    ps.setDouble(2,order.price);
                    ps.setLong(3,order.time);
                },
                //批处理
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc
                .withUsername("root")   //配置用户名
                .withPassword("123456") //密码
                .withDriverName("com.mysql.jdbc.Driver") //驱动类
                .build()
        ));
        env.execute();

以上就是全部内容了,感谢您的阅读!

另外补充一些不成熟的代码:双流Join

//双流Join
        SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark());
        SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark());
        //商品ID=订单ID
        final DataStream<Order> result = order1watermark.join(order2watermark)
                .where(o1 -> o1.category)
                .equalTo(o2 -> o2.category)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply((o1, o2) -> {
                    Order order = new Order();
                    order.setCategory(o1.category);
                    order.setPrice(o2.price);
                    order.setTime(o2.time);
                    return order;
                });
//        result.print();

水印机制,简化了直接使用系统时间

//水印机制
    public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element,recordTimestamp)->System.currentTimeMillis();
        }
    }
    public static class OrderItem1WaterMark implements WatermarkStrategy<Order> {
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }

好了,终于完成了?

双流join不怎么会写,慢慢来吧,

毕竟对于考60分的人,下一次考80分已经是极大的进步~~

总结

以上便是Flink数据写入Kafka+从Kafka存入Mysql(二)~

喜欢的小伙伴欢迎一键三连!!!

我是manor,一枚相信技术改变世界的码农,我们下期再见~


目录
相关文章
|
8天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之写doris,mysql字段变更,重新提交才能同步新字段数据吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4天前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现
|
5天前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
|
8天前
|
SQL 关系型数据库 MySQL
解决向MySQL中导入文件中的 数据时出现的问题~
解决向MySQL中导入文件中的 数据时出现的问题~
|
8天前
|
SQL 关系型数据库 MySQL
mysql插入500条数据sql语句
【5月更文挑战第12天】
|
8天前
|
消息中间件 Kafka 分布式数据库
实时计算 Flink版产品使用合集之如何批量读取Kafka数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8天前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何配置可以实现实时同步多张MySQL源表时只读取一次binlog
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8天前
|
监控 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何开启mysql的binglog
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
14天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之使用DTS从RDSMySQL数据库同步数据到云Kafka,增量同步数据延迟时间超过1秒。如何诊断问题并降低延迟
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

http://www.vxiaotou.com