Flink 从0-1实现 电商实时数仓 – DWD(中)

Flink 从0-1实现 电商实时数仓 – DWD(中)5. 定义数据库配置类 6. 定义CDC操作类型枚举类 7. 封装 Kafka 工具类 8. 封装 redis 工具类,缓存DIM数据 9. 封装 Phoenix 工具类,查询 HBase 10. 封

这是我参与8月更文挑战的第6天,活动详情查看:8月更文挑战

5. 定义数据库配置类
public class TmallConfig {

    /** * hbase 数据库 */
    public static final String HBASE_SCHEMA = "TMALL_REALTIME";

    /** * phoenix 连接地址 */
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hd1,hd2,hd3:2181";

    /** * clickhouse 连接地址 */
    public static final String CLICKHOUSE_URL="jdbc:clickhouse://hd1:8123/tmall_realtime";

}
6. 定义CDC操作类型枚举类
public enum CDCTypeEnum {

    /** * CDC 中的 c 操作类型 转成 INSERT */
    INSERT("c"),
    /** * CDC 中的 u 操作类型 转成 UPDATE */
    UPDATE("u"),
    /** * CDC 中的 d 操作类型 转成 DELETE */
    DELETE("d");

    private static final Map<String, CDCTypeEnum> MAP = new HashMap<>();

    static {
        for (CDCTypeEnum cdcTypeEnum : CDCTypeEnum.values()) {
            MAP.put(cdcTypeEnum.op, cdcTypeEnum);
        }
    }

    String op;

    CDCTypeEnum(String op) {
        this.op = op;
    }

    public static CDCTypeEnum of(String c) {
        return MAP.get(c);
    }
}
7. 封装 Kafka 工具类
public class KafkaUtil {

   private static final String KAFKA_SERVER = "hd1:9092,hd2:9092,hd3:9092";

   /** * 获取 kafka 通用配置 * @return 配置类 */
   private static Properties getProps() {
       Properties properties = new Properties();
       properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
       return properties;
   }

   /** * 通过 topic 和 groupId 创建一个 Kafka Source * @param topic * @param groupId * @return Kafka Source */
   public static SourceFunction<String> ofSource(String topic, String groupId) {
       Properties props = getProps();
       props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
   }

   /** * 通过 topic 创建一个字符串序列化的 Kafka Sink * @param topic * @return Kafka Sink */
   public static SinkFunction<String> ofSink(String topic) {
       return new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), getProps());
   }

   /** * 通过 序列化器 创建一个 Kafka Sink * @param serializationSchema * @param <IN> * @return Kafka Sink */
   public static <IN> SinkFunction<IN> ofSink(KafkaSerializationSchema<IN> serializationSchema) {
       return new FlinkKafkaProducer<>("default_topic", serializationSchema, getProps(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
   }

   /** * 通过 topic 和 groupId 生成一个 Flink SQL 的 Kafka 连接信息 * @param topic * @param groupId * @return */
   public static String getKafkaDDL(String topic, String groupId) {
       return " 'connector' = 'kafka', " +
               " 'topic' = '" + topic + "'," +
               " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
               " 'properties.group.id' = '" + groupId + "', " +
               " 'format' = 'json', " +
               " 'scan.startup.mode' = 'latest-offset' ";
   }
}
8. 封装 redis 工具类,缓存DIM数据
public class RedisUtil {

   private static volatile JedisPool jedisPool;

   public static Jedis getJedis() {
       if (jedisPool == null) {
           synchronized (RedisUtil.class) {
               if (jedisPool == null) {
                   JedisPoolConfig poolConfig = new JedisPoolConfig();
                   //最大可用连接数
                   poolConfig.setMaxTotal(100);
                   //连接耗尽是否等待
                   poolConfig.setBlockWhenExhausted(true);
                   //等待时间
                   poolConfig.setMaxWaitMillis(2000);
                   //最大闲置连接数
                   poolConfig.setMaxIdle(5);
                   //最小闲置连接数
                   poolConfig.setMinIdle(5);
                   //取连接的时候进行一下测试 ping pong
                   poolConfig.setTestOnBorrow(true);
                   jedisPool = new JedisPool(poolConfig, "hd1", 6379, 1000, "密码");
               }
           }
       }
       return jedisPool.getResource();
   }

}
9. 封装 Phoenix 工具类,查询 HBase
public class PhoenixUtil {

   private static Connection connection;

   /**
    * 查询 一个集合数据
    * @param sql sql
    * @param clazz 返回集合的类型
    * @param <T> 返回集合的类型
    * @return 结果集合
    */
   public static <T> List<T> queryList(String sql, Class<T> clazz) {
       if (connection == null) {
           initConnection();
       }
       try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
           ResultSet resultSet = preparedStatement.executeQuery();
           ResultSetMetaData metaData = resultSet.getMetaData();
           ArrayList<T> resList = new ArrayList<>();
           while (resultSet.next()) {
               T t = clazz.newInstance();
               for (int i = 1; i <= metaData.getColumnCount(); i++) {
                   BeanUtils.setProperty(t, metaData.getColumnName(i), resultSet.getObject(i));
               }
               resList.add(t);
           }
           resultSet.close();
           return resList;
       } catch (Exception e) {
           e.printStackTrace();
       }
       return Collections.emptyList();
   }

   /**
    * 初始化 Phoenix 连接
    */
   @SneakyThrows
   private static void initConnection() {
       Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
       connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER);
   }

}
10. 封装 DIM 查询 HBase 的工具类
public class DimUtil {

   /** * 删除缓存 * @param tableName * @param id */
   public static void delDimCache(String tableName, String id) {
       StringBuilder cacheKey = new StringBuilder().append("dim:").append(tableName.toLowerCase()).append(":").append(id);
       try (Jedis jedis = RedisUtil.getJedis()) {
           jedis.del(cacheKey.toString());
       }
   }

   /** * 更具 表名 和 id 查询一条数据 * @param tableName 表名 * @param id id * @return 数据 */
   public static JSONObject getDimInfo(String tableName, String id) {
       return getDimInfo(tableName, Tuple2.of("id", id));
   }

   /** * 更具 表名 和 多个条件 查询一条数据 * @param tableName 表名 * @param colAndValue 必须的 (条件,值) * @param colAndValues 可选多个 (条件,值) * @return 数据 */
   @SafeVarargs
   public static JSONObject getDimInfo(String tableName, Tuple2<String, String> colAndValue, Tuple2<String, String>... colAndValues) {
       //缓存 key
       StringBuilder cacheKey = new StringBuilder().append("dim:").append(tableName.toLowerCase()).append(":").append(colAndValue.f1);
       for (Tuple2<String, String> cv : colAndValues) {
           cacheKey.append("_").append(cv.f1);
       }
       try (Jedis jedis = RedisUtil.getJedis()) {
           //查缓存
           String str = jedis.get(cacheKey.toString());
           if (StringUtils.isNotBlank(str)) {
               return JSON.parseObject(str);
           }
           //拼接sql
           StringBuilder sql = new StringBuilder();
           sql.append("select * from ").append(TmallConfig.HBASE_SCHEMA).append(".").append(tableName)
                   .append(" where ").append(colAndValue.f0).append("='").append(colAndValue.f1).append("' ");
           for (Tuple2<String, String> cv : colAndValues) {
               sql.append("and ").append(cv.f0).append("='").append(cv.f1).append("' ");
           }
           // 查询
           List<JSONObject> jsonObjectList = PhoenixUtil.queryList(sql.toString(), JSONObject.class);
           if (!jsonObjectList.isEmpty()) {
               JSONObject jsonObject = jsonObjectList.get(0);
               jedis.setex(cacheKey.toString(), 60 * 60 * 24, jsonObject.toJSONString());
               return jsonObject;
           }
       }
       return null;
   }

}
11. HBase主意事项
  • 为了开启 hbasenamespacephoenixschema的映射,需要在hbase以及phoenixhbase-site.xml配置文件中
<property>
    <name>phoenix.schema.isNamespaceMappingEnabled</name>
    <value>true</value>
</property>

<property>
    <name>phoenix.schema.mapSystemTablesToNamespace</name>
    <value>true</value>
</property>
  • 也需要将 hbase-site.xml 放到程序中 image.png

下期预告:DIM层 & DWD层 核心代码

关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻

今天的文章Flink 从0-1实现 电商实时数仓 – DWD(中)分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/21129.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注