这是我参与8月更文挑战的第6天,活动详情查看:8月更文挑战
5. 定义数据库配置类
public class TmallConfig {
/** * hbase 数据库 */
public static final String HBASE_SCHEMA = "TMALL_REALTIME";
/** * phoenix 连接地址 */
public static final String PHOENIX_SERVER = "jdbc:phoenix:hd1,hd2,hd3:2181";
/** * clickhouse 连接地址 */
public static final String CLICKHOUSE_URL="jdbc:clickhouse://hd1:8123/tmall_realtime";
}
6. 定义CDC操作类型枚举类
public enum CDCTypeEnum {
/** * CDC 中的 c 操作类型 转成 INSERT */
INSERT("c"),
/** * CDC 中的 u 操作类型 转成 UPDATE */
UPDATE("u"),
/** * CDC 中的 d 操作类型 转成 DELETE */
DELETE("d");
private static final Map<String, CDCTypeEnum> MAP = new HashMap<>();
static {
for (CDCTypeEnum cdcTypeEnum : CDCTypeEnum.values()) {
MAP.put(cdcTypeEnum.op, cdcTypeEnum);
}
}
String op;
CDCTypeEnum(String op) {
this.op = op;
}
public static CDCTypeEnum of(String c) {
return MAP.get(c);
}
}
7. 封装 Kafka 工具类
public class KafkaUtil {
private static final String KAFKA_SERVER = "hd1:9092,hd2:9092,hd3:9092";
/** * 获取 kafka 通用配置 * @return 配置类 */
private static Properties getProps() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
return properties;
}
/** * 通过 topic 和 groupId 创建一个 Kafka Source * @param topic * @param groupId * @return Kafka Source */
public static SourceFunction<String> ofSource(String topic, String groupId) {
Properties props = getProps();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
}
/** * 通过 topic 创建一个字符串序列化的 Kafka Sink * @param topic * @return Kafka Sink */
public static SinkFunction<String> ofSink(String topic) {
return new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), getProps());
}
/** * 通过 序列化器 创建一个 Kafka Sink * @param serializationSchema * @param <IN> * @return Kafka Sink */
public static <IN> SinkFunction<IN> ofSink(KafkaSerializationSchema<IN> serializationSchema) {
return new FlinkKafkaProducer<>("default_topic", serializationSchema, getProps(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
/** * 通过 topic 和 groupId 生成一个 Flink SQL 的 Kafka 连接信息 * @param topic * @param groupId * @return */
public static String getKafkaDDL(String topic, String groupId) {
return " 'connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'latest-offset' ";
}
}
8. 封装 redis 工具类,缓存DIM数据
public class RedisUtil {
private static volatile JedisPool jedisPool;
public static Jedis getJedis() {
if (jedisPool == null) {
synchronized (RedisUtil.class) {
if (jedisPool == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
//最大可用连接数
poolConfig.setMaxTotal(100);
//连接耗尽是否等待
poolConfig.setBlockWhenExhausted(true);
//等待时间
poolConfig.setMaxWaitMillis(2000);
//最大闲置连接数
poolConfig.setMaxIdle(5);
//最小闲置连接数
poolConfig.setMinIdle(5);
//取连接的时候进行一下测试 ping pong
poolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(poolConfig, "hd1", 6379, 1000, "密码");
}
}
}
return jedisPool.getResource();
}
}
9. 封装 Phoenix 工具类,查询 HBase
public class PhoenixUtil {
private static Connection connection;
/**
* 查询 一个集合数据
* @param sql sql
* @param clazz 返回集合的类型
* @param <T> 返回集合的类型
* @return 结果集合
*/
public static <T> List<T> queryList(String sql, Class<T> clazz) {
if (connection == null) {
initConnection();
}
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
ResultSet resultSet = preparedStatement.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
ArrayList<T> resList = new ArrayList<>();
while (resultSet.next()) {
T t = clazz.newInstance();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
BeanUtils.setProperty(t, metaData.getColumnName(i), resultSet.getObject(i));
}
resList.add(t);
}
resultSet.close();
return resList;
} catch (Exception e) {
e.printStackTrace();
}
return Collections.emptyList();
}
/**
* 初始化 Phoenix 连接
*/
@SneakyThrows
private static void initConnection() {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER);
}
}
10. 封装 DIM 查询 HBase 的工具类
public class DimUtil {
/** * 删除缓存 * @param tableName * @param id */
public static void delDimCache(String tableName, String id) {
StringBuilder cacheKey = new StringBuilder().append("dim:").append(tableName.toLowerCase()).append(":").append(id);
try (Jedis jedis = RedisUtil.getJedis()) {
jedis.del(cacheKey.toString());
}
}
/** * 更具 表名 和 id 查询一条数据 * @param tableName 表名 * @param id id * @return 数据 */
public static JSONObject getDimInfo(String tableName, String id) {
return getDimInfo(tableName, Tuple2.of("id", id));
}
/** * 更具 表名 和 多个条件 查询一条数据 * @param tableName 表名 * @param colAndValue 必须的 (条件,值) * @param colAndValues 可选多个 (条件,值) * @return 数据 */
@SafeVarargs
public static JSONObject getDimInfo(String tableName, Tuple2<String, String> colAndValue, Tuple2<String, String>... colAndValues) {
//缓存 key
StringBuilder cacheKey = new StringBuilder().append("dim:").append(tableName.toLowerCase()).append(":").append(colAndValue.f1);
for (Tuple2<String, String> cv : colAndValues) {
cacheKey.append("_").append(cv.f1);
}
try (Jedis jedis = RedisUtil.getJedis()) {
//查缓存
String str = jedis.get(cacheKey.toString());
if (StringUtils.isNotBlank(str)) {
return JSON.parseObject(str);
}
//拼接sql
StringBuilder sql = new StringBuilder();
sql.append("select * from ").append(TmallConfig.HBASE_SCHEMA).append(".").append(tableName)
.append(" where ").append(colAndValue.f0).append("='").append(colAndValue.f1).append("' ");
for (Tuple2<String, String> cv : colAndValues) {
sql.append("and ").append(cv.f0).append("='").append(cv.f1).append("' ");
}
// 查询
List<JSONObject> jsonObjectList = PhoenixUtil.queryList(sql.toString(), JSONObject.class);
if (!jsonObjectList.isEmpty()) {
JSONObject jsonObject = jsonObjectList.get(0);
jedis.setex(cacheKey.toString(), 60 * 60 * 24, jsonObject.toJSONString());
return jsonObject;
}
}
return null;
}
}
11. HBase主意事项
- 为了开启
hbase
的namespace
和phoenix
的schema
的映射,需要在hbase
以及phoenix
的hbase-site.xml
配置文件中
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
- 也需要将
hbase-site.xml
放到程序中
下期预告:DIM层 & DWD层 核心代码
关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻
今天的文章Flink 从0-1实现 电商实时数仓 – DWD(中)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/21129.html